Vision

In this project, we plan to implement the MapReduce paradigm and demonstrate it using a couple of sample applications. It has: a client, a master and multiple workers (mappers/reducers). The client sends the task to the master, which then partitions the dataset and assigns it to the mappers and reducers to produce the final result. The user provides the input dataset as a file on the NFS(network file system). We plan to provide a GUI which depicts the state of the MapReduce system. Also fault tolerance is provided to handle failures. To validate our mapreduce API, we plan to test it with a couple of applications viz word count, inverted index.

Goals

  • To design and implement MapReduce API that enables building MapReduce applications.
  • To demonstrate the feasibility of our system using a couple of sample applications.

Use Case

This project implements an API that can be used by application developers. The applications where this Map Reduce API could be used include:

  • Text tokenization, indexing and search. Ex : distributed grep, distributes sort, inverted index creation.
  • Data mining and Machine learning.
  • Traversal of data structures like graphs for ex: shortest path, generating web graph

API Tutorial

The API provides a TaskBase class which has two methods:

  • public List<MapperResult> map(Object key, Object value);
  • public ReducerResult reduce(Object key, List<Object> values);

TaskBase class has a constructor that specifies the input file name and output file name.

  • public TaskBase(String inputFile, String outputFile);

The application developer should develop an application specific task class that inherits the TaskBase class and implements map and reduce functions. Depending on different applications, one of the two functions can be empty. But at least one of the two functions should be implemented.

Example: Word Count

WordCountTask.java

package tasks;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import system.MapperResult;
import system.ReducerResult;

import api.TaskBase;

public class WordCountTask extends TaskBase{

    private static final long serialVersionUID = 1L;

    public WordCountTask(String inputDir, String outputDir) {
        super(inputDir, outputDir);
    }

    @Override
    public List<MapperResult> map(Object key, Object value) {
        // TODO Auto-generated method stub
        BufferedReader input = null;
        List<MapperResult> keyValueList = new ArrayList<MapperResult>();

        try {
            input =  new BufferedReader(new FileReader((String)key));
            System.out.println("reading: " + key);
            String line = null; //not declared within while loop
            Map<String, Integer> hashMap = new HashMap <String, Integer>();

            while (( line = input.readLine()) != null){
                 if (hashMap.containsKey(line)){
                     hashMap.put(line,hashMap.get(line) + 1);
                 }else{
                     hashMap.put(line, 1);
                 }
            }

            for(Map.Entry<String, Integer> entry: hashMap.entrySet()){
                keyValueList.add(new MapperResult(entry.getKey(), 
                                                  entry.getValue()));
            }
            input.close();
        }
        catch (IOException ex){
          ex.printStackTrace();
        }
        finally {
            try {
                input.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

      return keyValueList;
    }

    @Override
    public  ReducerResult reduce(Object key, List<Object> values) {
        // TODO Auto-generated method stub
        int sum = 0;
        for(Object aValue:values){
            sum += Integer.parseInt((String)aValue);
        }
        return new ReducerResult(key, new Integer(sum));

    }

    @Override
    public void shuffle(Object key, List<Object> values) {
        // TODO Auto-generated method stub

    }
}

The WordCountTask class extends the TaskBase class. The input of the map function is a key-value pair, where the key is a filename, value is null. The map function counts the appearing times of each word in the file, and generates a key-value pair, where the key is a word, and the value is the appearing times of the word. Each output key-value pair is encapsulated in a MapperResult class. The map function return a list of MapperResult objects.

The input of the reduce function is a key and a list of values corresponding to that key. The system is responsible for grouping the same key together, and generates a list of values correspondingly. In Word Count, the key is a word, values is the count of the word from one file. The reduce function adds the list of counts together, and produces a word and its total counts as a key-value pair. This output key-value pair is encapsulated in a ReducerResult class, and returned.

WordCountClient.java

package tasks;

import java.rmi.NotBoundException;
import java.rmi.RemoteException;
import java.rmi.registry.LocateRegistry;
import java.rmi.registry.Registry;

import api.Client2Master;

public class WordCountClient {
    public static void main(String[] args){
        String masterName = args[0];

        WordCountTask task = new WordCountTask("in", "out");

        if (System.getSecurityManager() == null) {
            System.setSecurityManager(new SecurityManager());
        }

        try {
            Registry registry = LocateRegistry.getRegistry(masterName);
            Client2Master master = 
               (Client2Master)registry.lookup(Client2Master.SERVICE_NAME);
            master.addTask(task);
            master.finish();
            System.out.println("DONE!!!");

        } catch (RemoteException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (NotBoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

In WordCountClient.java, a WordCountTask is created using:

  • WordCountTask task = new WordCountTask("in", "out");

In WordCountClient.java, a WordCountTask is created using:

  • WordCountTask task = new WordCountTask("in", "out");

Then the task is put onto the Master by using:

  • master.addTask(task);

And the client keeps waiting until the task is finished.