kite-morphlines-avro
This maven module contains morphline commands for reading, extracting, and transforming Avro files and Avro objects.
readAvroContainer
The readAvroContainer
command (source code) parses an InputStream or byte array
that contains Apache Avro binary container file data. For each Avro datum, the command emits
a morphline record containing the datum as an attachment in the field _attachment_body.
The Avro schema that was used to write the Avro data is retrieved from the Avro container. Optionally, the Avro schema that shall be used for reading can be supplied with a configuration option; otherwise it is assumed to be the same as the writer schema.
The input stream or byte array is read from the first attachment of the input record.
The command provides the following configuration options:
Property Name | Default | Description |
---|---|---|
supportedMimeTypes | null | Optionally, require the input record to match one of the MIME types in this list. |
readerSchemaFile | null | An optional Avro schema file in JSON format on the local file system to use for reading. |
readerSchemaString | null | An optional Avro schema in JSON format given inline to use for reading. |
Example usage:
# Parse Avro container file and emit a record for each avro object
readAvroContainer {
# Optionally, require the input to match one of these MIME types:
# supportedMimeTypes : [avro/binary]
# Optionally, use this Avro schema in JSON format inline for reading:
# readerSchemaString : """<json can go here>"""
# Optionally, use this Avro schema file in JSON format for reading:
# readerSchemaFile : /path/to/syslog.avsc
}
readAvro
The readAvro
command (source code) parses containerless Avro. This command
is the same as the readAvroContainer
command except that the Avro schema that was used to write the Avro data must be explicitly
supplied to the readAvro
command because it expects raw Avro data without
an Avro container and hence without a built-in writer schema.
Optionally, the Avro schema that shall be used for reading can be supplied with a configuration option; otherwise it is assumed to be the same as the writer schema.
The command provides the following configuration options:
Property Name | Default | Description |
---|---|---|
supportedMimeTypes | null | Optionally, require the input record to match one of the MIME types in this list. |
readerSchemaFile | null | An optional Avro schema file in JSON format on the local file system to use for reading. |
readerSchemaString | null | An optional Avro schema in JSON format given inline to use for reading. |
writerSchemaFile | null | The Avro schema file in JSON format that was used to write the Avro data. |
writerSchemaString | null | The Avro schema file in JSON format that was used to write the Avro data, given inline. |
isJson | false | Whether the Avro input data is encoded as JSON or binary. |
Example usage:
# Parse Avro and emit a record for each avro object
readAvro {
# supportedMimeTypes : [avro/binary]
# readerSchemaString : """<json can go here>"""
# readerSchemaFile : test-documents/sample-statuses-20120906-141433-subschema.avsc
# writerSchemaString : """<json can go here>"""
writerSchemaFile : test-documents/sample-statuses-20120906-141433.avsc
}
extractAvroTree
The extractAvroTree
command (source code) converts an attached Avro datum to a
morphline record by recursively walking the Avro tree and extracting all data into a single
morphline record, with fields named by their path in the Avro tree.
The Avro input object is expected to be contained in the field _attachment_body, and typically placed there by an upstream readAvroContainer or readAvro command.
This kind of mapping is useful for simple Avro schemas, but for more complex schemas, this approach may be overly simplistic and expensive.
The command provides the following configuration options:
Property Name | Default | Description |
---|---|---|
outputFieldPrefix | "" | A string to be prepended to each output field name. |
Example usage:
extractAvroTree {
outputFieldPrefix : ""
}
extractAvroPaths
The extractAvroPaths
command (source code) extracts specific values from an Avro
object, akin to a simple form of XPath. The command uses zero or more Avro path expressions
to extract values from an Avro object.
The Avro input object is expected to be contained in the field _attachment_body, and typically placed there by an upstream readAvroContainer or readAvro command.
Each path expression consists of a record output field name (on the left side of the colon ':') as well as zero or more path steps (on the right hand side), each path step separated by a '/' slash, akin to a simple form of XPath. Avro arrays are traversed with the '[]' notation.
The result of a path expression is a list of objects, each of which is added to the given record output field.
The path language supports all Avro concepts, including such concepts as nested structures, records, arrays, maps, and unions. The path language supports a flatten option that collects the primitives in a subtree into a flat output list.
The command provides the following configuration options:
Property Name | Default | Description |
---|---|---|
flatten | true | Whether to collect the primitives in a subtree into a flat output list. |
paths | [] | Zero or more Avro path expressions. |
Example usage:
extractAvroPaths {
flatten : true
paths : {
my_price : /price
my_docId : /docId
my_links : /links
my_links_backward : "/links/backward"
my_links_forward : "/links/forward"
my_name_language_code : "/name[]/language[]/code"
my_name_language_country : "/name[]/language[]/country"
my_name : /name
/mymapField/foo/label : /mapField/foo/label/
}
}
Alternatively, if the extractAvroPaths and extractAvroTree commands don't fit your needs you can instead implement your own custom morphline command or script a java command config that uses the Generic Avro Java API to arbitrarily traverse and process the Avro tree that is emitted by the readAvroContainer and readAvro commands. For example, along the following lines:
{
readAvroContainer { }
}
{
java {
imports : """
import org.apache.avro.generic.GenericRecord;
import org.kitesdk.morphline.base.Fields;
// import com.cloudera.cdk.morphline.base.Fields; // use this for CDK
"""
code : """
GenericRecord root = (GenericRecord) record.getFirstValue(Fields.ATTACHMENT_BODY);
GenericRecord links = (GenericRecord) root.get("links"); // traverse via Avro Tree API
String forwardLinks = links.get("forward").toString(); // traverse via Avro Tree API
record.put("forwardLinks", forwardLinks);
logger.debug("My output record: {}", record);
return child.process(record);
"""
}
}
}
toAvro
The toAvro
command (source code) converts a morphline record to an Avro
record of Java class org.apache.avro.generic.IndexedRecord
.
The conversion supports all Avro concepts, including such concepts as nested structures, records, arrays, maps, and unions.
The Avro output record object is added to the morphline field _attachment_body.
The command provides the following configuration options:
Property Name | Default | Description |
---|---|---|
schemaFile | null | An optional Avro schema file in JSON format on the local file system to use for writing. |
schemaString | null | An optional Avro schema in JSON format given inline to use for writing. |
schemaField | null | An optional org.apache.avro.Schema object fetched from the given record input field. One of schemaFile or schemaString or schemaField must be present, but not more than one. |
mappings | [] | An optional JSON object containing zero or more mappings from morphline record field names to Avro record field names. Each mapping consists of an Avro output field name (on the left side of the colon ':') as well as a Morphline field name (on the right hand side). Example mapping: avroPrice : morphlinePrice. Any such mappings are optional - by default data is extracted from the morphline fields that carry the same name as the Avro fields defined in the Avro schema. |
Example usage:
toAvro {
#schemaFile : /path/to/interop.avsc
#schemaField : _dataset_descriptor_schema
schemaString : """
{
"type" : "record",
"name" : "Rating",
"fields" : [
{
"name" : "userId",
"type" : "int"
},
{
"name" : "rating",
"type" : ["int","null"]
},
{
"name" : "reviews",
"type" : {"type": "array", "items": "string"}
},
{
"name" : "history",
"type" : ["null", {"type": "map", "values":
{"type": "record", "name": "Foo",
"fields": [{"name": "timestamp", "type": "long"}]}}]
}
]
}
"""
mappings : {
userId : morphlineUserId
}
}
writeAvroToByteArray
The writeAvroToByteArray
command (source code) serializes the Avro records contained
in the _attachment_body field into a byte array and replaces the _attachment_body field with
that byte array. The records must share an identical Avro schema. Often, the records were
originally generated by the toAvro command.
The command provides the following configuration options:
Property Name | Default | Description |
---|---|---|
format | container | Indicates the type of Avro output format that shall be written. Must be one of container (serialize into a byte array that contains Apache Avro binary container file data) or containerlessBinary (serialize into a byte array that contains Apache Avro without an Avro container and hence without a built-in writer schema) or containerlessJSON (same as containerlessBinary except that Avro output is encoded as JSON). |
codec | null | Optional parameter that specifies the compression algorithm to use. Must be one of null or snappy or deflate or bzip2. This parameter only applies if format = container. |
Example usage:
writeAvroToByteArray {
format : container
codec : snappy
}