Map Reduce - Implementing Worker Nodes (Part 3/3)

https://github.com/hamdaankhalid/Map-Reduce-Implementation



Previous parts of the series you should read before this!:

  1. Map Reduce - Parallel Processing At Scale (Part 1/3)
  2. Map Reduce - Implementing Master/Coordinator (Part 2/3)


In my previous article I went over my implementation of the Master node / Coordinator node that run in a map reduce cluster. In this Article I will be going over my implementation and explanation of the Worker nodes in Map Reduce.



It makes sense to start with a brief recap of how the worker fits in the overall picture. A cluster running map reduce will have one node assigned as a Master and the rest assigned as Worker nodes. The Master maintains state of map tasks and reduce tasks, tracking which ones have not been started, which have failed, which are in progress and which have completed. Multiple workers ask Master for a task, if there are tasks available the Worker is responded to with either a Map Task to execute or a Reduce task to execute. Based on the response the worker runs a user defined map function or the user defined reduce function. It is the Worker node's responsibility to handle file reading and writing for intermediate and final outputs and notifying the server of task completion as well. Master node will also monitor for Workers that asked for a task and failed to respond. The tasks that the Master not receive a response from Worker from within a certain time are usually considered as failed and are reassigned to be executed to other Workers requesting a task. Once all the maps and reduces are marked as complete we will N output files where N responds to N reduce tasks that the framework has been configured to produce.



The Driver code for the worker explains the logic in a concise manner.

public class Main {
    public static void main(String[] args) {
        IFileInteractor fileSystemInteraction = new InMemoryFileInteraction("/Users/hamdaankhalid/Desktop/MapReduce/data/test", 4);

        // CHANGE THESE TO WHATEVER MAP AND REDUCE YOU WANT TO RUN
        IMapTask mapFunc = new WordCountMap();
        IReduceTask reduceFunc = new WordCountReduce();

        MapRunner mapExecutor = new MapRunner(mapFunc , fileSystemInteraction);
        ReduceRunner reduceExecutor = new ReduceRunner(reduceFunc, fileSystemInteraction);
        MasterClient masterClient = new MasterClient();

        while (true) {
            try {
                RequestTaskResponse requestTaskResponse = masterClient.requestTask();

                System.out.println("Task sent by master " + requestTaskResponse);

                if (requestTaskResponse.isFinished()) {
                    System.out.println("All maps and reduces have been processed. Exiting worker process!");
                    break;
                } else if (requestTaskResponse.getMapTask() == null && requestTaskResponse.getReduceTask() == null) {
                    System.out.println("Master responded with wait... resting for 5 seconds");
                    Helper.SLEEP(5);
                } else if (requestTaskResponse.getMapTask() != null) {
                    System.out.println("Executing map task");
                    // run map task, and notify
                    MapTask mapTask = mapExecutor.map(requestTaskResponse.getMapTask());
                    System.out.println("Map Task Executed "+ mapTask);
                    masterClient.notifyTaskCompleted(
                            new CompletedTaskBody(true, mapTask, null)
                    );
                } else {
                    System.out.println("Executing reduce task");
                    // run reduce task, and notify
                    Integer reducedOn = reduceExecutor.reduce(requestTaskResponse.getReduceTask());
                    System.out.println("Reduce Task Executed for "+ reducedOn);
                    masterClient.notifyTaskCompleted(
                            new CompletedTaskBody(false, null, reducedOn)
                    );
                }
            } catch (Exception e) {
                System.out.println("Error in execution"+ e);
                System.out.println(e.getMessage());
                Helper.SLEEP(3);
            }
        }
    }
}


We run a loop asking the Master for tasks to execute, if all tasks are finished we exit this loop and end the worker process, otherwise we make a decision based on the type of task (Map or Reduce) to call the MapRunner or the ReduceRunner accordingly.



The MapRunner and ReduceRunner are injected with user defined map and reduce functions that follow the typical Map and Reduce function signature as described by the Google Paper. map (k1,v1) → list(k2,v2) && reduce (k2,list(v2)) → (k2, v3).The responsibility of the MapRunner and ReduceRunner is to abstract away the reading, manipulation/reshaping, and writing of data such that the user defined function does not need to be aware of it.

Interface that a user defined map function should follow as expressed within my code:

public interface IMapTask {
    List<KeyValuePair> map(String content);
}


Interface that a user defined reduce function should follow as expressed within my code:

public interface IReduceTask {
    KeyValuePair reduce(String key, List<String> values);
}



MapRunner. Reads in the input file to run the map on and runs the user defined map function on it.

public class MapRunner {
    private IMapTask mapFunc;
    private IFileInteractor fileSystemInteraction;

    public MapRunner(IMapTask mapFunc, IFileInteractor fileSystemInteraction) {
        this.mapFunc = mapFunc;
        this.fileSystemInteraction = fileSystemInteraction;
    }

    public MapTask map(MapTask mapTask) throws IOException {
        String filename = mapTask.getFileName();
        String content = fileSystemInteraction.readFromFile(filename);
        String mapTaskNum = filename.substring("input".length());
        List<KeyValuePair> output = mapFunc.map(content);
        if (output!=null) {
            fileSystemInteraction.serializeMapToDisk(output, mapTaskNum);
        }
        // send it as in progress / it will be marked as completed by server
        return new MapTask(filename, TaskStatus.IN_PROGRESS);
    }
}



Reduce Runner:

The reduce runner is more complicated than the map runner. It has to consolidate/sort all the intermediate files that were created to be reduced by the particular reduce task, after which it needs to iterate over each Key, List<Values> to run the user defined reduce on and reduce the list of values, and serialize to disk.

public class ReduceRunner {
    private IReduceTask reduceFunc;
    private IFileInteractor fileSystemInteraction;

    public ReduceRunner(IReduceTask reduceFunc, IFileInteractor fileSystemInteraction) {
        this.reduceFunc = reduceFunc;
        this.fileSystemInteraction = fileSystemInteraction;
    }

    public Integer reduce(Integer reduceTask) throws IOException {
        List<String> intermediateFiles = fileSystemInteraction.getIntermediateFiles(reduceTask);

        System.out.println(intermediateFiles);

        Map<String, List<String>> sortedIntermediate = new HashMap<>();
        for (String filename: intermediateFiles) {
            List<String> contentByLine = fileSystemInteraction.readLinesFromFile(filename);
            for (String line: contentByLine) {
                String[] keyVal = line.split(":");
                if(keyVal.length != 2) {
                    continue;
                }
                String key = keyVal[0];
                String val = keyVal[1];
                if (sortedIntermediate.containsKey(key)) {
                    sortedIntermediate.get(key).add(val);
                } else {
                    sortedIntermediate.put(key, new ArrayList<>(List.of(val)));
                }
            }
        }

        System.out.println("REDUCE: " + reduceTask);
        for(String key: sortedIntermediate.keySet()) {
            System.out.println(key + " : " + sortedIntermediate.get(key));
            KeyValuePair reduced = reduceFunc.reduce(key, sortedIntermediate.get(key));
            fileSystemInteraction.serializeReduceOutputToDisk(reduced, reduceTask);
        }
        return reduceTask;
    }
}



You may notice the runners contain an attribute fileSystemInteraction. This attribute follows an interface that allows us to read/write/query files in a defined manner. My current implementation of map reduce makes use of the FileSystem on my single computer as opposed to a DFS like S3 or HDFS. The reason for this was to be able to ship a product asap, while still giving extensibility to move this onto a DFS if ever wanted. Here is the code for the implementation of the interface.

public class InMemoryFileInteraction implements  IFileInteractor {
    private String dataDirectory;
    private int reduceCount;

    public InMemoryFileInteraction(String dataDirectory, int reduceCount) {
        this.dataDirectory = dataDirectory;
        this.reduceCount = reduceCount;
    }


    public void serializeMapToDisk(List<KeyValuePair> kvPairs, String mapTaskId) throws IOException {
        System.out.println("Serializing Kv list from map of length " + kvPairs.size() + " to disk");

        for(KeyValuePair keyValuePair: kvPairs) {
            // write to a different file each of the results
            try {
                int reduceBucket = Helper.MOD_FILE_WRITE_DECIDE(keyValuePair.getKey(), reduceCount);
                String intermediateFileName = "reduce_" + mapTaskId + "_"+ reduceBucket+ "_intermediate";
                if (Files.notExists(Path.of(dataDirectory + "/" + intermediateFileName))) {
                    new File(dataDirectory+"/"+intermediateFileName).createNewFile();
                }
                FileWriter filewriter = new FileWriter(dataDirectory+"/"+intermediateFileName, true);
                BufferedWriter bw = new BufferedWriter(filewriter);
                bw.append(keyValuePair.getKey()+":"+ keyValuePair.getValue());
                bw.newLine();
                bw.close();
            } catch (Exception e) {
                System.out.println("ERROR While serializing kv pair to disk"+ keyValuePair.getKey() + e);
            }
        }

        System.out.println("Serialized map to disk successfully for map task for file " + mapTaskId);
    }

    public void serializeReduceOutputToDisk(KeyValuePair reduceOutput, int reduceTask) throws IOException {
        String outputFileName = "Output_"+reduceTask;
        if (Files.notExists(Path.of(dataDirectory + "/" + outputFileName))) {
            new File(dataDirectory+"/"+outputFileName).createNewFile();
        }
        FileWriter filewriter = new FileWriter(dataDirectory+"/"+outputFileName, true);
        BufferedWriter bw = new BufferedWriter(filewriter);
        bw.append(reduceOutput.getKey()+":"+reduceOutput.getValue());
        bw.newLine();
        bw.close();
        System.out.println(outputFileName);
    }

    public String readFromFile(String filename) throws IOException {
        String content = Files.readString(Paths.get(dataDirectory + "/" + filename));
        return content;
    }

    public List<String> readLinesFromFile(String filename) throws IOException {
        List<String> content = Files.readAllLines(Paths.get(dataDirectory + "/" + filename));
        return content;
    }

    public List<String> getIntermediateFiles(Integer reduceTask) {
        List<String> intermediateFiles = new ArrayList<String>();
        File[] allFiles = new File(this.dataDirectory).listFiles();
        for(File file: allFiles) {
            String filename = file.getName();
            if (!filename.substring(0, "reduce".length()).equals("reduce")) {
                continue;
            }

            String reduceTaskNum = filename.split("_")[2];

            if (reduceTaskNum.equals(String.valueOf(reduceTask))) {
                intermediateFiles.add(filename);
            }
        }
        return intermediateFiles;
    }
}


All these components come together to run across multiple processes on a single machine to replicate MapReduce Framework that lets me run a word count on multiple input files. I am also attaching the Example tasks I used to test run my code!


Map Task:

public class WordCountMap implements IMapTask {
    public List<KeyValuePair> map(String content) {
       // given the content as string of one file count occurences of words and create list of key val pairs
        String[] words = content.split("\\s+");

        if (words.length == 0) {
            return null;
        }
        Set<String> uniqueWords = new HashSet<>(List.of(words));
        List<KeyValuePair> result = new ArrayList<>();
        for (String word : uniqueWords) {
            int count = 0;
            for (String occurence: words) {
                count += occurence.equals(word) ? 1 : 0;
            }
            result.add(new KeyValuePair(word, String.valueOf(count)));
        }
        return result;
    }
}


Reduce Task:

public class WordCountReduce implements IReduceTask {
    public KeyValuePair reduce(String key, List<String> values) {
        int countSum = 0;
        for (String count: values) {
            try {
                countSum += Integer.valueOf(count);
            } catch (Exception e) {
                System.out.println("Error while reducing" + key + " =>" +values);
                System.out.println(e);
                throw e;
            }

        }
        return new KeyValuePair(key, String.valueOf(countSum));
    }
}


Handling Failures!

I had initially thought of separating this out into it's own article but I think we can squeeze this in here as well! An obvious observation in this case is that my Master has no clue of worker nodes crashing, and in a distributed environment with hundred of nodes per cluster your likelihood of workers crashing becomes a certainty, and so does your network's reduced dependability.


To tackle failures of workers, Master nodes expect a response from the workers within a certain period, if that is not the case the uncompleted tasks are moved back to un-started state, and handed over to other workers in the cluster. This can be achieved with using something like multithreading or even background jobs checking for timers on completion of requested tasks that a Master is maintaining state of. Apart from this workers are aimed to be atomic in nature either the entire process succeeds and is serialized to disk and marked as completed or not, we have to be able to avoid non atomic completion of tasks. With all the error handling being setup our Master remain a single point of failure, and a crash would render out entire MapReduce process as useless, in these cases the task has to be restarted from the top!


Thank you for taking the timeout and following my journey in implementing map reduce!

Comments

Commenter: A

Neat af

Add a comment: