Patrick Jaromin bio photo

Patrick Jaromin

Patrick is the Software Engineering Director of the Ad Tech group at Conversant. His team of software engineers works with Java, Scala, Hadoop, HBase, Spark, Storm, Kafka and more to provide low-latency decisioning and persistence to the core ad server stack.

Email Twitter Github

Recently I was tasked with developing a number of mapreduce jobs involving Avro files. The final step of the process was a join between avro and tab-delimited text files. Because the final output consisted of the original avro records updated with information from the flat files, I decided to convert the text files into avro in the mapper and then send them on for joining in the reduce phase.

That’s where the trouble started.

AvroWrapper, AvroKey, AvroValue

Although Avro is well supported in Hadoop MapReduce, the design is rather limiting. A typical Writable bean is capable of performing serialization/deserialization (SerDe) operations on itself. This design is clean, compact, and makes it relatively easy to implement your own custom writables. However, Avro records require a schema to be deserialized. In order to eliminate the need to include the schema with every individual Avro record, the library includes an AvroWrapperAvroKey and AvroValue. The AvroWrapper as it sounds merely wraps an individual record or datum. You don’t use the wrapper directly however. When specifying a key or value in your job, you must select AvroKey for keys, AvroValue for values (duh, right?).

If you look into these two subclasses you’ll discover something curious — they’re essentially empty.

The reason for this is that SerDe operations on these classes are actually handled by the AvroSerialization class as configured through the AvroJob static helper methods. The reader and writer schemas may then be registered with the serialization class and the AvroKey and AvroValue wrappers act merely to tag your records and identify which schema to use. This works quite well for most jobs, however it leaves us limited to two Avro schemas: one for keys and another for values. But what if we want to perform a join operation on two or more Avro records with different schemas?

AvroMultiWrapper and MultiSchemaAvroSerialization

To work around this limitation, a new serialization implementation is needed. One option, attaching the schema to every record sent through the mapreduce process, would explode the volume of data flowing through the job. An alternative is to build on the default mechanism and add new tagging interfaces for each schema, perhaps extentions of AvroKey and AvroValue. Doing this would mean creating a host of extra empty “tagging” classes and would need to be repeated with every future job joining new schemas.

For Mara, the choice was to map each avro schema to a key and then tag each mapreduce record key with the appropriate reader schema’s bit for deserialization. By limiting this key to a single byte, overhead is kept very low while supporting up to 128 different schemas. This is accomplished through the use of Mara’s MultiSchemaAvroSerialization class.

Usage mirrors the usage of the standard Avro mapreduce libraries. To configure schemas in the system, you call the static MultiSchemaAvroSerialization.registerSchemas method which populates the set of schemas:

public static void registerSchemas(Job job, Schema...schemas) {
    String[] names = new String[schemas.length];
    int idx = 0;
    for (Schema schema : schemas) {
        names[idx++] = schema.getFullName();
    }
    job.getConfiguration().setStrings(CONF_KEY_MULTI_SCHEMAS, names);
    registerSerialization(job);
}

For your map output key and/or value you’d specify the AvroMultiWrapperand that’s it.

When the key and/or value datum is serialized, the MultiSchemaAvroSerialization looks up the ordinal of the schema from the registered set and includes that as the first byte of the serialized datum:

public void serialize(AvroMultiWrapper<T> avroWrapper) throws IOException {
    DatumWriter<T> writer = datumWriterFor((Class<T>) avroWrapper.datum().getClass());
    int b = MultiSchemaAvroSerialization.getIndexForSchema(getConf ), avroWrapper.datum().getClass());
    outputStream.write(b);
    writer.write(avroWrapper.datum(), encoder);
    this.encoder.flush();
}

The same thing happens in reverse for deserializing our wrapper:

public AvroMultiWrapper<T> deserialize(AvroMultiWrapper<T> wrapper)
        throws IOException {
    if (wrapper == null) {
        wrapper = new AvroMultiWrapper&lt;T&gt;();
    }

    // Read in the first byte - the schema index
    int schemaIndex = decoder.inputStream().read();

    // Now hand off the rest to the datum reader for normal deser.
    DatumReader<T> reader = datumReaderFor(schemaIndex);
    wrapper.datum(reader.read(wrapper.datum(), decoder));
    return wrapper;
}

I’ve used this successfully in conjunction with the standard AvroSerialization and AvroKey/AvroValue classes when inputting from Avro files and then emitting multiple schemas from the Mapper class. The process is efficient with the only overhead being the single byte key attached to each record. The schema lookups should cost basically the same in the “standard” serialization as in the multi.

For more information on the Mara Annotations for MapReduce, see this blog post or the clone the project on github.