Patrick Jaromin bio photo

Patrick Jaromin

Patrick is the Software Engineering Director of the Ad Tech group at Conversant. His team of software engineers works with Java, Scala, Hadoop, HBase, Spark, Storm, Kafka and more to provide low-latency decisioning and persistence to the core ad server stack.

Email Twitter Github

Mara logoSome time back a colleague and I were discussing our respective MapReduce frameworks. Although very different, they both aimed to simplify the mundane task of configuring Jobs through the standard Hadoop boilerplate. “I’ve always thought it would be great to use annotations for developing MapReduce jobs,” he said. I must admit the thought had never occurred to me - but it certainly piqued my interest.

Could it be done? And more importantly, would it actually make things easier, faster, and more fun?

Design Goals

The primary design goal is simple: eliminate as much of the boilerplate configuration method calls as possible, and replace them with a combination of explicit annotations and logical defaults. We should do as much by convention or through reflection as possible to keep things uncluttered and clear.

Secondary goals include lessening or eliminating the need to explicitly subclass framework components, and removal of the main method in drivers through the use of a standard container. It would be also useful to simplify the distributed cache mechanism, if possible leveraging it to pass objects into MapReduce components.

Replacing the Boilerplate

The majority of the annotations are replacements for the job configuration methods used by nearly all MapReduce jobs to specify the various components, inputs, outputs, etc. The canonical “WordCount” job includes several of these boilerplate calls. We also call static methods on the FileInputFormat and FileOutputFormat classes for configuring our input and output paths.

Using Mara, however, the simplest MapReduce driver would look something like

@Driver
public class MyDriver {
    @JobInfo(numReducers="0")
    private Job;
}

Admittedly this wouldn’t do much useful work - it would use the default Hadoop Mapper class to output your input into separate part files, one for each mapper. However, you would have a fully deployable, debuggable, production-ready job with a default context requring input and output cli arguments. Add in a simple Mapper and Reducer implementation, and you’ll have a production-ready wordcount job… (I’m not including the mapper/reducer code for clarity. See examples for full source.)

@Driver
public class WordCount {
    @JobInfo
    @MapperInfo(WordCountMapper.class)
    @ReducerInfo(WordCountReducer.class)
    private Job;
}

A Standard Driver Container

The annotations are enabled by a standard runtime container ‘RunJob’ that bootstraps and submits jobs while providing base features common to all jobs. The container searches the packages on the classpath specified in the manifest for all drivers and, if none is specified, produces a list of all available jobs

[pjaromin@hdp01 mara]$ yarn jar mara-examples-job.jar com.conversantmedia.mapreduce.tool.RunJob
15/05/18 11:02:02 INFO reflections.Reflections: Reflections took 106 ms to scan 2 urls, producing 13 keys and 66 values
==============================================================================================================================================
                                                            A V A I L A B L E    D R I V E R S
==============================================================================================================================================
Name                          Description                           Ver       Class
----------------------------  ------------------------------------- --------  ----------------------------------------------------------------
annotated-wordcount-listener                                        201501.0  com.conversantmedia.mapreduce.example.AnnotatedWordCountWithListener
annotated-wordcount-v1                                              201501.0  com.conversantmedia.mapreduce.example.AnnotatedWordCount
avro-input-output-example                                           201501.0  com.conversantmedia.mapreduce.example.avro.AvroInputOutputExample
avro-multiple-output-example                                        201501.0  com.conversantmedia.mapreduce.example.avro.AvroMultipleOutputExample
distributed-blacklist         Demonstrates distributing an object   201501.0  com.conversantmedia.mapreduce.example.distribute.DistributedObjectExample
namedOutputsExample                                                 201501.0  com.conversantmedia.mapreduce.example.NamedOutputsExample
prepare-inputs                                                      201501.0  com.conversantmedia.mapreduce.example.PrepareInputsExample
wordcount-blacklist                                                 201501.0  com.conversantmedia.mapreduce.example.WordCountWithBlacklist
wordcount-minimum                                                   201501.0  com.conversantmedia.mapreduce.example.WordCountWithMinimum

By default it supports the flag --dryRun which will initialize the driver but skip submission of the job to the scheduler. Use with the --dump option to see the full list of confguration options and option values.

The Context Bean

Command line options are specfied using a context bean. If none is specified in the driver, the container will create a default instance including --input, --output, and --archive options. When you require additional, or different, command line options you can provide your own custom context. To provide a context, create a bean with the properties required and mark up the driver field containing this context with the @DriverContext annotation:

@Driver
public class MyDriver {
   @DriverContext
   MyContextBean context
   
   // Simple context adding a command-line --myArg argument
   public static class MyContextBean { @Option String myArg; }
}

Any properties on the context bean annotated with the @Option annotation will be initialized by the framework as a command line option. The container will populate these properties with the values provided on the command line, handling type conversion for primatives and wrappers where necessary. Options without arguments (argCount=0) are treated as boolean flags.

Resource Distribution/Injection

Mara greatly simplifies the distribution of resources, hiding the complexities of the distributed cache behind the annotations @Distribute and @Resource. This pair of annotations is used to distribute values from the Tool class to mapreduce components. Both static and dynamically-built objects and values may be passed into your component classes.

In your driver, you simply mark the field or method you wish to project with the @Distribute tag. Mara will then inject these resources in your component class fields marked with the matching @Resource annotation. This mechanism presently works for primatives, wrapper classes, and any object implementing java.io.Serializable.

This mechanism is especially helpful in unit testing your jobs.

Unit Testing

Mara supports unit testing and TDD for MapReduce through the Apache MRUnit project. Although MRUnit makes unit testing MapReduce feasible, there are still numerous headaches with writing and maintaining these tests. First, the driver class isn’t exercised in standard MRUnit. This often means copying code and manually porting to the MRUnit equivalent. It also leaves no coverage for your base configuration which can result in problems that are only exposed upon deployment to a full Hadoop environment.

Because you must essentially translate the driver configuration into MRUnit, future changes to your driver configuration must also be duplicated in the unit tests. This can become rather tedious and error prone. Furthermore, effective testing of certain jobs - such as those employing Avro or MultipleOutputs - require the use of mocks to work properly.

Also, for testing with MultipleOutputs you typically need to create a method in mappers or reducers for directly injecting a mock version. Not a huge problem, but it’s bad practice to add code to a production class solely for the purpose of unit testing.

Mara eliminates these complexities and removes the need to separately write bootstrap code for annotated drivers in the majority of cases. For MultipleOutputs this means using the @NamedOutput tags in your mapper or reducer classes and allowing Mara to manage them. In production these are properly configured using the standard classes while under MRUnit they’ll be mocked out. Methods exist in the BaseMRUnitTest class that allow you to easily verify the writes to any multiple outputs.

/** Sample of methods in BaseMRUnitTest for verifying interractions with MultipleOutputs */

// Verify calls to write(key, value, basePath), 1x
protected void verifyNamedOutput(Object key, Object value, String path) {...}

protected void verifyNamedOutput(String name, Object key, Object value, String path) {...}

// Verify calls to write(key, value, basePath), specified number of times
protected void verifyNamedOutput(String name, Object key, Object value, String path, int times) {...}

protected void verifyNamedOutput(String name, Object key, Object value) {...}

protected void verifyNamedOutput(String name, Object key, Object value, VerificationMode mode) {...}
...

Unit testing with distributed resources

Mara allows you to set your @Resource values in unit tests through a simple naming convention. Assume you have a resource in your driver — perhaps it’s a Map<String,String> named ‘myMap’ needed for a join in your mapper class. In production the @Distribute annotation is applied to the driver class member (or method). In your unit test, you would write a method Map<String,String> getMyMap() returning the map - or mock - you want to use for test purposes. If you need to use different resources for different test cases, simply append the name of the test case method to your method. For example…

@Test public void myMapperTestCase1() { ... }

// Default resource
Map<String, String> getMyMap() { return map; }

// Test-specific resource
public Map<String, String> getMyMap_myMapperTestCase() {
   return map1;
}

For more information

Conversant has released Mara to the open source community! There are many other options and a few ‘extras’ in the library for handling things like HBase or Avro. For the full documentation and source code, please visit the project page on GitHub: