Thursday, December 27, 2012

Creating a Simple Map and Reduce with Java Concurrency


Definition
========
MapReduce is a framework for processing parallelizable problems across huge datasets using a large number of computers (nodes). Computational processing can occur on data stored either in a filesystem (unstructured) or in a database (structured). MapReduce can take advantage of locality of data, processing data on or near the storage assets to decrease transmission of data.
"Map" step: The master node takes the input, divides it into smaller sub-problems, and distributes them to worker nodes. A worker node may do this again in turn, leading to a multi-level tree structure. The worker node processes the smaller problem, and passes the answer back to its master node.
"Reduce" step: The master node then collects the answers to all the sub-problems and combines them in some way to form the output – the answer to the problem it was originally trying to solve.


Map and Reduce using Hadoop(Word Count Example)
As most of you know the most famous Word count example where the mappers read different portion of the document and basically maps all the input words with occurrence and finally the reducer merges all the result to one and basically outputs the word count of the entire document.If you need more details you can look at this link  http://hadoop.apache.org/docs/r0.20.2/mapred_tutorial.html#Example%3A+WordCount+v1.0

Using JAVA concurrency api to perform Map and Reduce
Java introduced the concurrency package (java.util.concurrent ) as part of JDK 5.0 and it was a major step  towards  build high-performance parallel executing processes.If you need more details about the API you can visit the blog written by Brian Goetz  http://www.ibm.com/developerworks/java/tutorials/j-concur/.
In this blog I am going to show how we can achieve Map and Reduce using Java Concurrency API's.We are going to implement the classic Word Count Example.We are going to read a sample document and then create a list of all the input lines of the document and then we are going to process this list in parallel using multiple threads -Callable Tasks .

Each Callable Task is going to take as an input a sublist from this main document list and then process the entries tokenize them and then perform a basic mapping of the words along with the counts .Each Callable Task produces a map of the words with counts and the result comes back as an output map and finally there is a reducer process that basically combines all these intermediate maps and aggregates the counts and produces a final map with all the word counts.
The full code is available on GitHub at the following location

https://github.com/raghuramgururajan/JavaMapAndReduce/


Step1:Define the class that is basically going to process the input
WordCountProcessor.java

/**

 * This class is basically used to read the sample file and then configure the thread pools,determine number of threads available and 
 * then delegate the work to WordCountMapper and WordCountReducer and display the final output of wordcount
 * @author raghuramgururajan
 *
 */

package com.example.java.concurrency.wordcount;



import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;

public class WordCountProcessor {


Properties config = null;

private static Logger log = Logger.getLogger(WordCountProcessor.class
.getName());

/**
* This method basically reads the input file and calls mapper to process
* the data.There are multiple threads that basically process the input and
* one reducer that basically finally merges the input
* @param inputFileName
* @return
*/
public Map<String, Integer> processMapAndReduce(String inputFileName) {
BufferedReader br = null;
FileReader fr = null;
Map<String, Integer> wordCountMap = null;
try {
File inputFile = new File(
(this.getClass().getClassLoader()
.getResource(inputFileName)).toURI());
fr = new FileReader(inputFile);
// URLConnection yc = oracle.openConnection();
br = new BufferedReader(fr);

long contentLength = inputFile.length();
// Number of threads are determined from run time
int numberOfThreads = Runtime.getRuntime().availableProcessors();
// Thread pool executor initialized
ThreadPoolExecutor threadpoolExecutor = new ThreadPoolExecutor(
numberOfThreads, numberOfThreads, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>());
// Thread pool executor invoked
wordCountMap = invokeThreadExecutor(threadpoolExecutor,
contentLength, numberOfThreads, br);

} catch (IOException e) {
log.error("IOException occured in WordCountProcessor:: processMapAndReduce"
+ e.toString());
} catch (URISyntaxException e) {
// TODO Auto-generated catch block
log.error("URISyntaxException occured in WordCountProcessor:: processMapAndReduce"
+ e.toString());
}

finally {
IOUtils.closeQuietly(br);
IOUtils.closeQuietly(fr);

}
return wordCountMap;

}

/**
* Method to actually create and invoke multiple threads to process the
* input word set
* @param threadpoolExecutor
* @param contentLength
* @param numberOfThreads
* @param in
* @return
* @throws IOException
*/
private Map<String, Integer> invokeThreadExecutor(
ThreadPoolExecutor threadpoolExecutor, long contentLength,
int numberOfThreads, BufferedReader br) throws IOException {
// TODO Auto-generated method stub

List<Callable<WordCountStatus>> wordCountTasks = new ArrayList<Callable<WordCountStatus>>();
int beginOffset = 0;
int endOffset = 0;
Map<String, Integer> wordCountMap = null;
List<Future<WordCountStatus>> wordCountFutureList = null;
try {

List<String> inputWordsList = getInputList(br);

int contentLengthPerThread = getContentLengthPerThread(
numberOfThreads, inputWordsList);

endOffset = contentLengthPerThread;
System.out
.println("::::::::::::::::::::::::::::::::::Calling Mapper to map the words::::::::::::::::::::::::::::::::");
for (int i = 0; i < numberOfThreads; i++) {
System.out.println((((float) (i + 1) / numberOfThreads) * 100)
+ "%  mapping complete");
if (!inputWordsList.isEmpty()) {

List<String> subListInputWords = inputWordsList.subList(
beginOffset, endOffset);
WordCountMapper wcm = new WordCountMapper(subListInputWords);
wordCountTasks.add(wcm);
beginOffset = endOffset;
endOffset = endOffset + contentLengthPerThread;
if (beginOffset >= contentLength
|| beginOffset == endOffset
|| endOffset > inputWordsList.size()) {
break;
}
if (endOffset > contentLength) {
endOffset = (int) contentLength;
}

}
}

wordCountFutureList = threadpoolExecutor.invokeAll(wordCountTasks);
WordCountReducer wcr = new WordCountReducer();
System.out
.println("::::::::::::::::::::::::::::::::Calling reducer to reduce the words::::::::::::::::::::::::::::::::");
wordCountMap = wcr.reduce(wordCountFutureList);
threadpoolExecutor.shutdown();

} catch (IndexOutOfBoundsException e) {
// TODO Auto-generated catch block
log.error("Exception occured while invoking the mapper threads"
+ e.toString());

// System.out.println("ContentLength:::"+contentLengthPerThread);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
log.error("Exception occured while invoking the mapper threads"
+ e.toString());
}

return wordCountMap;
}

private int getContentLengthPerThread(int numberOfThreads,
List<String> inputWordsList) {
// TODO Auto-generated method stub
int contentLengthPerThread = 0;
if (numberOfThreads == 0) {
contentLengthPerThread = 1;
numberOfThreads = 1;
} else {
contentLengthPerThread = inputWordsList.size() / numberOfThreads;
}
if (contentLengthPerThread == 0) {
contentLengthPerThread = inputWordsList.size();
}
return contentLengthPerThread;
}

private List<String> getInputList(BufferedReader br) throws IOException {
// TODO Auto-generated method stub
ArrayList<String> inputWordsList = new ArrayList<String>();
String tempInputWord = null;
while ((tempInputWord = br.readLine()) != null) {
inputWordsList.add(tempInputWord);
}
return inputWordsList;
}

/**
* Method to display word counts
* @param wordCountMap
*/
public void displayWordCounts(Map<String, Integer> wordCountMap) {
Iterator wordCountsIter = wordCountMap.keySet().iterator();

System.out.println("WordName" + " :: :: " + "WordCount");

while (wordCountsIter.hasNext()) {
String inputWord = (String) wordCountsIter.next();

System.out.println(inputWord + " :: :: "
+ wordCountMap.get(inputWord));
}
}

public static void main(String[] args) {
WordCountProcessor wcp = new WordCountProcessor();
wcp.processMapAndReduce("sample.txt");

}

}

Step2:Define the WordCountMapper to Map the words

WordCountMapper.java




package com.example.java.concurrency.wordcount;

import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.StringTokenizer;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * This class is used to map the input words by creating a map of all input words along with their 
 * count of occurence.
 * @author raghuramgururajan
 *
 */
public class WordCountMapper implements Callable<WordCountStatus> {
private static final CharSequence PATTERN_COMMA = ",";
private static final String PATTERN_SPACE = " ";
private static final CharSequence PATTERN_EMPTY_STRING = "";
private static final CharSequence PATTERN_DOT = ".";
private static final Integer FIRST_OCCURENCE = 1;
private static final int LENGTH_ONE = 1;
private List<String> inputWordsList=null;
public WordCountMapper(List<String> inputWordsList)
{
this.inputWordsList=inputWordsList;
}
public WordCountStatus call() throws Exception {
// TODO Auto-generated method stub
WordCountStatus ws=new WordCountStatus();
Map<String,Integer> inputWordsMap=new HashMap<String,Integer>();
//Iterate over the inputWordsList and create a intermediate map with counts
for(String inputWords:inputWordsList)
{
StringTokenizer inputWordsTokens=new StringTokenizer(inputWords,PATTERN_SPACE);
while(inputWordsTokens.hasMoreTokens())
{
String inputWordMapKey=inputWordsTokens.nextToken();
//Filter and remove commas and empty spaces
if(inputWordMapKey!=null && inputWordMapKey.length() > LENGTH_ONE)
{
if(inputWordMapKey.contains(PATTERN_COMMA))
{
inputWordMapKey=inputWordMapKey.replace(PATTERN_COMMA,PATTERN_EMPTY_STRING);
}
if(inputWordMapKey.contains(PATTERN_DOT))
{
inputWordMapKey=inputWordMapKey.replace(PATTERN_DOT,PATTERN_EMPTY_STRING);
}
//If the map is not empty and if the map already contains the word increment
//the counter
inputWordMapKey=inputWordMapKey.toLowerCase();
if(!inputWordsMap.isEmpty() && inputWordsMap.containsKey(inputWordMapKey))
{
int wordCount=inputWordsMap.get(inputWordMapKey);
wordCount++;
inputWordsMap.put(inputWordMapKey, wordCount);
}
else
{
inputWordsMap.put(inputWordMapKey, FIRST_OCCURENCE);
}
}
}
}
//Validate the map to see if its empty or not
if(!inputWordsMap.isEmpty())
{
ws.setInputWordsMap(inputWordsMap);
}
return ws;
}

}


Step3:Define the WordCountStatus to hold the return type of WordCountMapper


package com.example.java.concurrency.wordcount;

import java.util.HashMap;
import java.util.Map;

/**
 * Class to store the status of all the intermediate tasks and act
 * as an input for reducer for final word count computation
 * @author raghuramgururajan
 *
 */
public class WordCountStatus {
private Map<String,Integer> inputWordsMap=null;

public Map<String, Integer> getInputWordsMap() {
return inputWordsMap;
}

public void setInputWordsMap(Map<String, Integer> inputWordsMap) {
this.inputWordsMap = inputWordsMap;
}

}


Step4:Define the WordCountReducer to reduce the map and produce the final word count 


package com.example.java.concurrency.wordcount;


import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.log4j.Logger;
/**
 * Class to basically reduce all the intermediate output to produce final output
 * @author raghuramgururajan
 *
 */
public class WordCountReducer {

private static final Integer FIRST_OCCURENCE = 1;
private static Logger log = Logger.getLogger(
WordCountReducer.class.getName());
/**
* Method to basically reduce all the intermediate output to produce final output
* @param wordCountFutureList
* @return
*/
public Map<String,Integer> reduce(List<Future<WordCountStatus>> wordCountFutureList) {
// TODO Auto-generated method stub
Map<String,Integer> wordCountMap=new HashMap<String,Integer>();
try {
for(Future<WordCountStatus> future:wordCountFutureList)
{
Map<String,Integer> tempWordCountMap=future.get().getInputWordsMap();
Iterator wordMapIter=tempWordCountMap.keySet().iterator();
while(wordMapIter.hasNext())
{
String inputWord=(String) wordMapIter.next();
if(wordCountMap.isEmpty()|| !wordCountMap.containsKey(inputWord))
{
wordCountMap.put(inputWord, FIRST_OCCURENCE);
}
else
{
   
int occurence=wordCountMap.get(inputWord);
occurence++;
wordCountMap.put(inputWord, occurence);
}
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
log.error("Error occured in WordCountReducer"+e.toString());
} catch (ExecutionException e) {
// TODO Auto-generated catch block
log.error("Error occured in WordCountReducer"+e.toString());

}
return wordCountMap;
}
}

Step5:Sample Output of WordCount


::::::::::::::::::::::Calling Mapper to map the words::::::::::::::::::::::::::::::::
12.5%  mapping complete
25.0%  mapping complete
37.5%  mapping complete
50.0%  mapping complete
62.5%  mapping complete
75.0%  mapping complete
87.5%  mapping complete
100.0%  mapping complete
::::::::::::::::::::Calling reducer to reduce the words::::::::::::::::::::::::::::::::
::::::::::::::::::::The word count is ::::::::::::::::::::
WordName :: :: WordCount

bandanna :: :: 1
forth :: :: 1
need :: :: 1
holding :: :: 2
mentonsolicitor :: :: 1
for :: :: 1
coffey :: :: 1
round :: :: 1
defunct :: :: 1
him :: :: 2
must :: :: 1
didn't :: :: 1
of :: :: 4
twisted :: :: 1
looks :: :: 1
sleepily :: :: 1
bachelor's :: :: 1
she :: :: 1
poppies :: :: 1
her :: :: 1
chaplain :: :: 1
portly :: :: 1
awfully :: :: 1
heart :: :: 1
once :: :: 1
how :: :: 1
oaths :: :: 1
he :: :: 1
27 :: :: 1
toadbellied :: :: 1
and :: :: 2
that :: :: 2
now :: :: 1
bearing :: :: 1
affidavits :: :: 1
lines :: :: 1
tied :: :: 1
walk :: :: 1
wall :: :: 1
satisfy :: :: 1
buttermilk :: :: 1
john :: :: 1
wrynecked :: :: 1
nightcap :: :: 1
father :: :: 1
was :: :: 2
staff :: :: 1
caretaker :: :: 1
figure :: :: 1
me :: :: 1
bunch :: :: 1
hard :: :: 1
animal :: :: 1
hypertrophied :: :: 1
with :: :: 2
is :: :: 1
it :: :: 1
mr :: :: 1
keys :: :: 1
surplice :: :: 1
poor :: :: 1
the :: :: 3
o'connell :: :: 1
j :: :: 1
bottle :: :: 1
in :: :: 2
employ :: :: 1
h :: :: 1
up :: :: 1
crape :: :: 1
beside :: :: 1
stands :: :: 2
am :: :: 1
wife :: :: 1
an :: :: 1
off :: :: 1
lamp :: :: 1
commissioner :: :: 1
sherry :: :: 1
cut :: :: 1
agree :: :: 1
keep :: :: 1