/ castmapr

Writing a Hazelcast / CastMapR MapReduce Task in Java

Hazelcast is a distributed In-Memory-Datagrid written in Java. In addition to the internal features like EntryProcessors and queries you can write MapReduce tasks using the CastMapR projects which adds MapReduce capabilities on top of Hazelcast 3.x.

To make it comparable to other MapReduce frameworks we will try to reimplement the business case from [1] but instead of using the CSV files directly we first load the data inside our Hazelcast In-Memory-Datagrid - storing them inside of a com.hazelcast.core.MultiMap which is very similar to the Google Guavas Multimaps [2].

For now we start with creating our project. Using Maven this is very easy and you just have to include two dependencies into the Maven project.

<dependency>
    <groupid>com.hazelcast</groupid>
    <artifactid>hazelcast</artifactid>
    <version>3.0</version>
</dependency>
<dependency>
    <groupid>com.noctarius.castmapr</groupid>
    <artifactid>castmapr</artifactid>
    <version>1.0.0</version>
</dependency>

Now we have all the dependencies we need to start Hazelcast nodes and build our MapReduce tasks in Java.
The full sources can be found here [3] so we will leave out everything that is not needed to fulfill the MapReduce task.

So let's move on with the Mapper implementation. We want to search for all translations containing the searchTerm which in case is an English word. Here's the Mapper implementation which will be distributed around in the cluster.

public class DictionaryMapper extends
  Mapper<..> implements DataSerializable {
 
  private String searchTerm;
 
  public DictionaryMapper() {
  }
 
  public DictionaryMapper(String searchTerm) {
    if (searchTerm == null)
      throw new NullPointerException(
        "searchTerm must not be null");
    this.searchTerm = searchTerm.toLowerCase();
  }
 
  @Override
  public void map(String key, DictionaryEntry value,
      Collector<..> collector) {
 
    if (key == null)
      return;
    if (key.toLowerCase().contains(this.searchTerm)) {
      collector.emit(key, value.getValue());
    }
  }
 
  @Override
  public void writeData(ObjectDataOutput out)
      throws IOException {
 
    out.writeUTF(searchTerm);
  }
 
  @Override
  public void readData(ObjectDataInput in)
      throws IOException {
 
    searchTerm = in.readUTF();
  }
}

The Mapper is initialized with the search term we're searching for and and the map-method looks for if searchTerm is contained in the current entries key, if so we emit the value using the key which means one search term can be found for different keys.
Next we need a Reducer which will reduce all found translations together to one string (just for convience).

public class DictionaryReducer implements Reducer<..> {
 
  @Override
  public String reduce(String key, Iterator<..> values) {
    StringBuilder sb = new StringBuilder();
    while (values.hasNext()) {
      String value = values.next();
      sb.append(value).append("|");
    }
    String result = sb.toString();
    return result.substring(0, result.length() - 1);
  }
}

The last thing we need to do is to build a MapReduceTask and eventually retrieve the results.

MapReduceTaskFactory factory = MapReduceTaskFactory
  .newInstance(hz);
MapReduceTask<..> task = factory.build(dictionary);
 
Map<..> result = task
  .mapper(new DictionaryMapper(searchTerm))
  .reducer(new DictionaryReducer()).submit();
 
if (result.size() == 0) {
  System.out.println("No translation found for '"
      + search + "'");
} else {
  System.out.println("Translations found for '"
      + search + "':");
  for (Entry<..> entry : result.entrySet()) {
    System.out.println(entry.getKey() + ": "
        + entry.getValue());
  }
}

So we emit a MapReduce search for searchTerm by instantiating a Mapper and Reducer and configuring the MapReduceTask. Finally call submit() to use the blocking version of the task execution and look for results after returning the result.
To start the example we need to commandline options, the first one tells the program to wait for x nodes to come up (for a simple test this should be 1) and the second one is the searchTerm to search for.

As seen above it's easy to create and execute MapReduce tasks using CastMapR on Hazelcast 3.x. Full sourcecode is mostly containing the code to startup Hazelcast nodes, retrieving translation files from the Internet and filling up the MultiMap. In general Hazelcast instances are already running and data won't need to be filled in into the Datagrid but are already available so this code is not needed.

For further features and questions just have a look at the Github project [4].

[1] http://www.javacodegeeks.com/2013/08/writing-a-hadoop-mapreduce-task-in-java.html
[2] https://code.google.com/p/guava-libraries/wiki/NewCollectionTypesExplained#Multimap
[3] https://www.sourceprojects.org/default/files/dictionary.zip
[4] https://github.com/noctarius/castmapr