Recently I was tasked with developing a number of mapreduce jobs involving Avro files. The final step of the process was a join between avro and tab-delimited text files. Because the final output consisted of the original avro records updated with information from the flat files, I decided to convert the text files into avro in the mapper and then send them on for joining in the reduce phase.
That’s where the trouble started.
AvroWrapper, AvroKey, AvroValue
Although Avro is well supported in Hadoop MapReduce, the design is rather limiting. A typical
Writable bean is capable of performing serialization/deserialization (SerDe) operations on itself. This design is clean, compact, and makes it relatively easy to implement your own custom writables. However, Avro records require a schema to be deserialized. In order to eliminate the need to include the schema with every individual Avro record, the library includes an
AvroWrapper as it sounds merely wraps an individual record or datum. You don’t use the wrapper directly however. When specifying a key or value in your job, you must select
AvroKey for keys,
AvroValue for values (duh, right?).
If you look into these two subclasses you’ll discover something curious — they’re essentially empty.
The reason for this is that SerDe operations on these classes are actually handled by the
AvroSerialization class as configured through the
AvroJob static helper methods. The reader and writer schemas may then be registered with the serialization class and the AvroKey and AvroValue wrappers act merely to tag your records and identify which schema to use. This works quite well for most jobs, however it leaves us limited to two Avro schemas: one for keys and another for values. But what if we want to perform a join operation on two or more Avro records with different schemas?
AvroMultiWrapper and MultiSchemaAvroSerialization
To work around this limitation, a new serialization implementation is needed. One option, attaching the schema to every record sent through the mapreduce process, would explode the volume of data flowing through the job. An alternative is to build on the default mechanism and add new tagging interfaces for each schema, perhaps extentions of
AvroValue. Doing this would mean creating a host of extra empty “tagging” classes and would need to be repeated with every future job joining new schemas.
For Mara, the choice was to map each avro schema to a key and then tag each mapreduce record key with the appropriate reader schema’s bit for deserialization. By limiting this key to a single byte, overhead is kept very low while supporting up to 128 different schemas. This is accomplished through the use of Mara’s
Usage mirrors the usage of the standard Avro mapreduce libraries. To configure schemas in the system, you call the static
MultiSchemaAvroSerialization.registerSchemas method which populates the set of schemas:
For your map output key and/or value you’d specify the
AvroMultiWrapper…and that’s it.
When the key and/or value datum is serialized, the MultiSchemaAvroSerialization looks up the ordinal of the schema from the registered set and includes that as the first byte of the serialized datum:
The same thing happens in reverse for deserializing our wrapper:
I’ve used this successfully in conjunction with the standard AvroSerialization and AvroKey/AvroValue classes when inputting from Avro files and then emitting multiple schemas from the Mapper class. The process is efficient with the only overhead being the single byte key attached to each record. The schema lookups should cost basically the same in the “standard” serialization as in the multi.