kite-morphlines-avro

This maven module contains morphline commands for reading, extracting, and transforming Avro files and Avro objects.

readAvroContainer

The readAvroContainer command (source code) parses an InputStream or byte array that contains Apache Avro binary container file data. For each Avro datum, the command emits a morphline record containing the datum as an attachment in the field _attachment_body.

The Avro schema that was used to write the Avro data is retrieved from the Avro container. Optionally, the Avro schema that shall be used for reading can be supplied with a configuration option; otherwise it is assumed to be the same as the writer schema.

The input stream or byte array is read from the first attachment of the input record.

The command provides the following configuration options:

Property Name Default Description
supportedMimeTypes null Optionally, require the input record to match one of the MIME types in this list.
readerSchemaFile null An optional Avro schema file in JSON format on the local file system to use for reading.
readerSchemaString null An optional Avro schema in JSON format given inline to use for reading.

Example usage:

# Parse Avro container file and emit a record for each avro object
readAvroContainer {
  # Optionally, require the input to match one of these MIME types:
  # supportedMimeTypes : [avro/binary]

  # Optionally, use this Avro schema in JSON format inline for reading:
  # readerSchemaString : """<json can go here>"""

  # Optionally, use this Avro schema file in JSON format for reading:
  # readerSchemaFile : /path/to/syslog.avsc
}

readAvro

The readAvro command (source code) parses containerless Avro. This command is the same as the readAvroContainer command except that the Avro schema that was used to write the Avro data must be explicitly supplied to the readAvro command because it expects raw Avro data without an Avro container and hence without a built-in writer schema.

Optionally, the Avro schema that shall be used for reading can be supplied with a configuration option; otherwise it is assumed to be the same as the writer schema.

The command provides the following configuration options:

Property Name Default Description
supportedMimeTypes null Optionally, require the input record to match one of the MIME types in this list.
readerSchemaFile null An optional Avro schema file in JSON format on the local file system to use for reading.
readerSchemaString null An optional Avro schema in JSON format given inline to use for reading.
writerSchemaFile null The Avro schema file in JSON format that was used to write the Avro data.
writerSchemaString null The Avro schema file in JSON format that was used to write the Avro data, given inline.
isJson false Whether the Avro input data is encoded as JSON or binary.

Example usage:

# Parse Avro and emit a record for each avro object
readAvro {
  # supportedMimeTypes : [avro/binary]
  # readerSchemaString : """<json can go here>"""
  # readerSchemaFile : test-documents/sample-statuses-20120906-141433-subschema.avsc
  # writerSchemaString : """<json can go here>"""
  writerSchemaFile : test-documents/sample-statuses-20120906-141433.avsc
}

extractAvroTree

The extractAvroTree command (source code) converts an attached Avro datum to a morphline record by recursively walking the Avro tree and extracting all data into a single morphline record, with fields named by their path in the Avro tree.

The Avro input object is expected to be contained in the field _attachment_body, and typically placed there by an upstream readAvroContainer or readAvro command.

This kind of mapping is useful for simple Avro schemas, but for more complex schemas, this approach may be overly simplistic and expensive.

The command provides the following configuration options:

Property Name Default Description
outputFieldPrefix "" A string to be prepended to each output field name.

Example usage:

extractAvroTree {
  outputFieldPrefix : ""
}

extractAvroPaths

The extractAvroPaths command (source code) extracts specific values from an Avro object, akin to a simple form of XPath. The command uses zero or more Avro path expressions to extract values from an Avro object.

The Avro input object is expected to be contained in the field _attachment_body, and typically placed there by an upstream readAvroContainer or readAvro command.

Each path expression consists of a record output field name (on the left side of the colon ':') as well as zero or more path steps (on the right hand side), each path step separated by a '/' slash, akin to a simple form of XPath. Avro arrays are traversed with the '[]' notation.

The result of a path expression is a list of objects, each of which is added to the given record output field.

The path language supports all Avro concepts, including such concepts as nested structures, records, arrays, maps, and unions. The path language supports a flatten option that collects the primitives in a subtree into a flat output list.

The command provides the following configuration options:

Property Name Default Description
flatten true Whether to collect the primitives in a subtree into a flat output list.
paths [] Zero or more Avro path expressions.

Example usage:

extractAvroPaths {
  flatten : true
  paths : {
    my_price : /price

    my_docId : /docId
    my_links : /links
    my_links_backward : "/links/backward"
    my_links_forward : "/links/forward"
    my_name_language_code : "/name[]/language[]/code"
    my_name_language_country : "/name[]/language[]/country"
    my_name : /name

    /mymapField/foo/label : /mapField/foo/label/
  }
}

Alternatively, if the extractAvroPaths and extractAvroTree commands don't fit your needs you can instead implement your own custom morphline command or script a java command config that uses the Generic Avro Java API to arbitrarily traverse and process the Avro tree that is emitted by the readAvroContainer and readAvro commands. For example, along the following lines:

{ 
  readAvroContainer { }
}

{
  java { 
    imports : """
      import org.apache.avro.generic.GenericRecord;
      import org.kitesdk.morphline.base.Fields;
      // import com.cloudera.cdk.morphline.base.Fields; // use this for CDK
    """
    code : """
      GenericRecord root = (GenericRecord) record.getFirstValue(Fields.ATTACHMENT_BODY);
      GenericRecord links = (GenericRecord) root.get("links"); // traverse via Avro Tree API
      String forwardLinks = links.get("forward").toString(); // traverse via Avro Tree API
      record.put("forwardLinks", forwardLinks);
      logger.debug("My output record: {}", record);
      return child.process(record);
    """
    } 
  }
}

toAvro

The toAvro command (source code) converts a morphline record to an Avro record of Java class org.apache.avro.generic.IndexedRecord.

The conversion supports all Avro concepts, including such concepts as nested structures, records, arrays, maps, and unions.

The Avro output record object is added to the morphline field _attachment_body.

The command provides the following configuration options:

Property Name Default Description
schemaFile null An optional Avro schema file in JSON format on the local file system to use for writing.
schemaString null An optional Avro schema in JSON format given inline to use for writing.
schemaField null An optional org.apache.avro.Schema object fetched from the given record input field. One of schemaFile or schemaString or schemaField must be present, but not more than one.
mappings [] An optional JSON object containing zero or more mappings from morphline record field names to Avro record field names. Each mapping consists of an Avro output field name (on the left side of the colon ':') as well as a Morphline field name (on the right hand side). Example mapping: avroPrice : morphlinePrice. Any such mappings are optional - by default data is extracted from the morphline fields that carry the same name as the Avro fields defined in the Avro schema.

Example usage:

toAvro {
  #schemaFile : /path/to/interop.avsc 
  #schemaField : _dataset_descriptor_schema 
  schemaString : """
    {
      "type" : "record",
      "name" : "Rating",
      "fields" : [ 
        {
          "name" : "userId",
          "type" : "int"
        }, 
        {
          "name" : "rating",
          "type" : ["int","null"]
        }, 
        {
          "name" : "reviews",
          "type" : {"type": "array", "items": "string"}
        }, 
        {
          "name" : "history",
          "type" : ["null", {"type": "map", "values":     
                              {"type": "record", "name": "Foo",
                                   "fields": [{"name": "timestamp", "type": "long"}]}}]
        } 
      ]
    }
  """
  mappings : { 
    userId : morphlineUserId
  }          
} 

writeAvroToByteArray

The writeAvroToByteArray command (source code) serializes the Avro records contained in the _attachment_body field into a byte array and replaces the _attachment_body field with that byte array. The records must share an identical Avro schema. Often, the records were originally generated by the toAvro command.

The command provides the following configuration options:

Property Name Default Description
format container Indicates the type of Avro output format that shall be written. Must be one of container (serialize into a byte array that contains Apache Avro binary container file data) or containerlessBinary (serialize into a byte array that contains Apache Avro without an Avro container and hence without a built-in writer schema) or containerlessJSON (same as containerlessBinary except that Avro output is encoded as JSON).
codec null Optional parameter that specifies the compression algorithm to use. Must be one of null or snappy or deflate or bzip2. This parameter only applies if format = container.

Example usage:

writeAvroToByteArray {
  format : container 
  codec : snappy
}