Map Reduce - Implementing Master/Co-ordinator (Part 2/3)

Previous Article In Series: Map Reduce - Parallel Processing At Scale (Part 1/3)


https://github.com/hamdaankhalid/Map-Reduce-Implementation


The last article I wrote gave a brief overview and went over the flow of data across the nodes while running map reduce on a cluster. Click here to give that a read! Before I dive into the details I am going to give a quick recap of the master node and how it fits in the big picture. In a cluster of servers running map reduce, one node is appointed as a Master node, and the rest are called Workers. The Master node is responsible for storing the state of unprocessed data, unfinished steps, and distributing work when requested from Workers. The Master node also monitors for Workers that asked for a task and then went silent (failed to complete). All the Workers ask for a task, execute the task, and notify the master, then ask for another task from the Master. The Master is also our source of truth for finding the current state of execution of a running map reduce job.



Since the whole data flow between Master and Workers is unidirectional (Workers ask for tasks and notify Master, the Master does not need to request anything from Worker), it made sense for me to make my Master node a server with public endpoints exposing 3 routes:

  1. Get a task (called by a worker to ask for a task to execute)
  2. Post a task completion (called by a worker once it has finished executing a task it was running)
  3. Get status of task execution (queried by me, to see what tasks are running, what have not started, and if everything has finished)

I wanted to spend some time working with Java since I haven't touched it since college, so my language and choice for implementing the Master Server was Java with Springboot.


My server maintains the state of tasks using in memory data structure of a HashMap, and exposes the 3 endpoints mentioned above. The code has a very typical structure used in Springboot of Controllers, Services, Entities, and Repositories. I will be dividing my code walk through of the implementation in 2 layers: Services & Persistence, and Web (Controllers & DTO's). The Web layer is very thin and uses Rest controllers to expose public endpoints that use MasterService. I will not be diving into showing the code for the web layer, however you may look into the github repo and inside the master to do so here https://github.com/hamdaankhalid/Map-Reduce-Implementation


Note: Since there was a good amount of unknowns while implementing this I only used tests to validate the parts I was sure of that had to make sense. Which is why my code in github will not have 100% test coverage.



Services & Persistence

I am starting with this since this is where the core logic lives. I have a MasterService that has the following shape:


@Service
public class MasterService {

    @Autowired
    private IMapTaskStore mapTaskStore;

    @Autowired
    private IReduceTaskStore reduceTaskStore;

    @Autowired
    private StoreInitializer storeInit;

    @PostConstruct
    public void readyMapsAndReduces() {
        mapTaskStore.setStatusToTask(storeInit.initializeMapStore());
        reduceTaskStore.setStatusToTask(storeInit.initializeReduceStore());
    }
// the other functions are declared below this but this gives you an idea of the shape.....

The Master Service has access to 2 stores a mapTaskStore and a reduceTaskStore, it expects them to follow an interface which exposes very simple public functions that hide persistence logic. The reason for making them follow an interface was because now if I want to ever move from an in-memory structure for persisting the state fo map and reduce tasks between requests I an do so without touching the Master Service class. The MapTaskStore and the ReduceTaskStores live as attributes in this class and are responsible for storing the unstarted, running, and completed map tasks and reduce tasks respectively. We also have an attribute for the StoreInitializer this StoreInitializer is responsible for interacting with the file system (currently our local filesystem but the implementation for the filesystem interaction is abstracted and can be switched to use a distributed file system eventually as well) and populating the unstarted map and reduce tasks for the stores, after reading them for a directory it has access to as a string variable.


The public functions exposed by the MasterService are as following:

public TaskResponse getTaskToExecute() {
    if (!mapTaskStore.areTasksFinished()) {
        MapTask unstartedTask = mapTaskStore.getUnstartedTask();
        if (unstartedTask != null) {
            mapTaskStore.updateTaskStatus(unstartedTask, TaskStatus.IN_PROGRESS);
        }
        return new TaskResponse(unstartedTask, null, false);
    } else if (!reduceTaskStore.areTasksFinished()) {
        Integer unstartedTask = reduceTaskStore.getUnstartedTask();
        if(unstartedTask != null) {
            reduceTaskStore.updateTaskStatus(unstartedTask);
        }
        return new TaskResponse(null, unstartedTask, false);
    } else {
        return new TaskResponse(null, null, true);
    }
}

This method is responsible for finding us a task that is yet to be given to a worker, it check the map store for unstarted tasks, if it finds one it sends that task as a response, if it finds none, then it check if reduce tasks are unfinished, if it finds one it sends it to as a response, otherwise if both task phases are finished it responds with a message indicating all tasks are completed.



public StatusResponse getStatus() {
    Map<TaskStatus, Set<MapTask>> mapTasks = mapTaskStore.mapStatus();
    Map<TaskStatus, Set<Integer>> reduceTasks = reduceTaskStore.reduceStatus();

    boolean isFinished = mapTaskStore.areTasksFinished() && reduceTaskStore.areTasksFinished();
    return new StatusResponse(isFinished, mapTasks, reduceTasks);
}

This is a very simple method it gathers the status of maps and reduce from their respective stores and sends them as a response. This used to query the state of execution.


public void markTaskComplete(TaskCompletedRequest taskCompletedRequest) {
    if (taskCompletedRequest.isMapTask()) {
        mapTaskStore.updateTaskStatus(taskCompletedRequest.getMapTask(), TaskStatus.COMPLETED);
    } else {
        // is a reduce task
        reduceTaskStore.updateTaskStatus(taskCompletedRequest.getReduceTask());
    }
}

This method is used by the controller when a worker wants to notify a task completion. In that case, we update the respective store.



The code for the repositories is given below along with DataTypes:

MapTask: This is the Object stored in a MapTaskStore, it represents a single map task.

public class MapTask {
    private String fileName;
    private TaskStatus status;

    public MapTask(String filename, TaskStatus status) {
        this.fileName = filename;
        this.status = status;
    }

    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }

    public TaskStatus getStatus() {
        return status;
    }

    public void setStatus(TaskStatus status) {
        this.status = status;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }

        if (! (obj instanceof MapTask)) {
            return false;
        }

        return fileName.equals(((MapTask) obj).fileName);
    }

    @Override
    public int hashCode() {
        return Objects.hash(fileName);
    }

    @Override
    public String toString() {
        return "( "+ fileName + ", " + status +")";
    }
}


InMemoryMapTaskStore:

Note: We store the status inside a maptTask and inside the reduce taskStore as well, the reason for this is that when we are updating a maptask we are using a maptask object sent by the worker with the updated status, using this we can map to where it needs to be moved from to next in a very concise manner. Althought I will porbably refactor this and only store the status in one place eventually. For now we just need everything to work first.

@Primary
@Repository
public class InMemoryMapTaskStore implements IMapTaskStore {
    private Map<TaskStatus, Set<MapTask>> statusToTask;

    public Map<TaskStatus, Set<MapTask>> mapStatus() {
        return this.statusToTask;
    }

    public void setStatusToTask(Map<TaskStatus, Set<MapTask>> statusToTask) {
        this.statusToTask = statusToTask;
    }

    public MapTask getUnstartedTask() {
        if (statusToTask.get(NOT_STARTED).size() == 0) {
            return null;
        }
        MapTask taskToRun = statusToTask.get(NOT_STARTED).stream().findFirst().get();
        return taskToRun;
    }

    public boolean areTasksFinished() {
        return statusToTask.get(NOT_STARTED).size() == 0 &&
                statusToTask.get(IN_PROGRESS).size() == 0;
    }

    public MapTask updateTaskStatus(MapTask taskToBeUpdated, TaskStatus status) {

        statusToTask.get(taskToBeUpdated.getStatus()).remove(taskToBeUpdated);


        // remove the map task from the list corresponding to previous status, and add the new map task with updated
        // status to new status task list
        MapTask updatedTask = new MapTask(taskToBeUpdated.getFileName(), status);
        statusToTask.get(status).add(updatedTask);
        return updatedTask;
    }

}


InMemoryReduceTaskStore

@Primary
@Repository
public class InMemoryReduceTaskStore implements IReduceTaskStore {
    private Map<TaskStatus, Set<Integer>> statusToTask;

    public void setStatusToTask(Map<TaskStatus, Set<Integer>> statusToTask){
        this.statusToTask = statusToTask;
    }

    public Map<TaskStatus, Set<Integer>> reduceStatus() {
        return statusToTask;
    }

    public boolean areTasksFinished() {
        return statusToTask.get(NOT_STARTED).size() == 0 &&
                statusToTask.get(IN_PROGRESS).size() == 0;
    }

    public int getUnstartedTask() {
        if (statusToTask.get(NOT_STARTED).size() != 0) {
            int taskToRun = statusToTask.get(NOT_STARTED).stream().findFirst().get();
            return taskToRun;
        }
        return -1;
    }

    public TaskStatus updateTaskStatus(int taskToBeUpdated) {

        TaskStatus lastStatus = null;
        for (TaskStatus taskStatus: TaskStatus.values()) {
            if (statusToTask.get(taskStatus).contains(taskToBeUpdated)) {
                lastStatus = taskStatus;
                break;
            }
        }

        if (lastStatus == TaskStatus.COMPLETED) {
            return TaskStatus.COMPLETED;
        }

        statusToTask.get(lastStatus).remove(taskToBeUpdated);

        if (lastStatus == TaskStatus.NOT_STARTED) {
            statusToTask.get(TaskStatus.IN_PROGRESS).add((taskToBeUpdated));
            return TaskStatus.IN_PROGRESS;
        } else if (lastStatus == TaskStatus.IN_PROGRESS) {
            statusToTask.get(TaskStatus.COMPLETED).add((taskToBeUpdated));
            return TaskStatus.COMPLETED;
        }

        return TaskStatus.COMPLETED;
    }
}


StoreInitializer

@Component
public class StoreInitializer {

    private String INPUT_DIRECTORY;
    private int REDUCE_NUMBER;

    @PostConstruct
    void setInputDirAndReduce() {
        INPUT_DIRECTORY = "/Users/hamdaankhalid/Desktop/MapReduce/data/test/";
        REDUCE_NUMBER = 4;
    }

    public Map<TaskStatus, Set<MapTask>> initializeMapStore() {
        File[] inputFiles = new File(INPUT_DIRECTORY).listFiles();
        Map<TaskStatus, Set<MapTask>> result = new HashMap<>();
        result.put(TaskStatus.NOT_STARTED, new HashSet<>());
        result.put(TaskStatus.IN_PROGRESS, new HashSet<>());
        result.put(TaskStatus.COMPLETED, new HashSet<>());

        for(File inputFile: inputFiles) {
            if (inputFile.getName().substring(0, 5).equals("input")) {
                result.get(TaskStatus.NOT_STARTED).add(new MapTask(inputFile.getName(), TaskStatus.NOT_STARTED));
            }
        }

        return result;
    }

    public Map<TaskStatus, Set<Integer>> initializeReduceStore() {
        Map<TaskStatus, Set<Integer>> result = new HashMap<>();
        result.put(TaskStatus.NOT_STARTED, new HashSet<>());
        result.put(TaskStatus.IN_PROGRESS, new HashSet<>());
        result.put(TaskStatus.COMPLETED, new HashSet<>());

        for (int i  = 0; i < REDUCE_NUMBER; i++) {
                result.get(TaskStatus.NOT_STARTED).add(i);
        }

        return result;
    }

}



I have not posted the code for the tests or the DTO's here, because I do not think they really provide much value in explaining the workings of the Master node. This project is also a way of getting me more familiar with Java and Springboot as well so please forgive any nits in the code (the null checks shoul have been handled with optionals!).


Next Part: Map Reduce - Implementing Worker Nodes (Part 3/3)


Comments

Add a comment: