Example Morphline Usage
The following examples show how you can use morphlines.
Using Morphlines to Index Avro
This example illustrates using a morphline to index an Avro file with a schema.
- View the content of the Avro file to understand the data:
$ wget http://archive.apache.org/dist/avro/avro-1.7.4/java/avro-tools-1.7.4.jar $ java -jar avro-tools-1.7.4.jar tojson \ /usr/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433.avro
- Inspect the schema of the Avro file:
$ java -jar avro-tools-1.7.4.jar getschema /usr/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433.avro { "type" : "record", "name" : "Doc", "doc" : "adoc", "fields" : [ { "name" : "id", "type" : "string" }, { "name" : "user_statuses_count", "type" : [ "int", "null" ] }, { "name" : "user_screen_name", "type" : [ "string", "null" ] }, { "name" : "created_at", "type" : [ "string", "null" ] }, { "name" : "text", "type" : [ "string", "null" ] } ... ] }
- Extract the
id
,user_screen_name
,created_at
, andtext
fields from the Avro records, and then store and index them in Solr, using the following Solr schema definition inschema.xml
:<fields> <field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> <field name="username" type="text_en" indexed="true" stored="true" /> <field name="created_at" type="tdate" indexed="true" stored="true" /> <field name="text" type="text_en" indexed="true" stored="true" /> <field name="_version_" type="long" indexed="true" stored="true"/> <dynamicField name="ignored_*" type="ignored"/> </fields>
The Solr output schema omits some Avro input fields, such as
user_statuses_count
. If your data includes Avro input fields that are not included in the Solr output schema, you may want to make changes to data as it is ingested. For example, suppose you need to rename the input fielduser_screen_name
to the output fieldusername
. Also suppose that the time format for thecreated_at
field isyyyy-MM-dd'T'HH:mm:ss'Z'
. Finally, suppose any unknown fields present are to be removed. Recall that Solr throws an exception on any attempt to load a document that contains a field that is not specified inschema.xml
. - These transformation rules that make it possible
to modify data so it fits your particular schema can be expressed with
morphline commands called readAvroContainer, extractAvroPaths, convertTimestamp, sanitizeUnknownSolrFields and loadSolr,
by editing a
morphline.conf
file.# Specify server locations in a SOLR_LOCATOR variable; used later in # variable substitutions: SOLR_LOCATOR : { # Name of solr collection collection : collection1 # ZooKeeper ensemble zkHost : "127.0.0.1:2181/solr" } # Specify an array of one or more morphlines, each of which defines an ETL # transformation chain. A morphline consists of one or more potentially # nested commands. A morphline is a way to consume records such as Flume events, # HDFS files or blocks, turn them into a stream of records, and pipe the stream # of records through a set of easily configurable transformations on its way to # Solr. morphlines : [ { # Name used to identify a morphline. For example, used if there are multiple # morphlines in a morphline config file. id : morphline1 # Import all morphline commands in these java packages and their subpackages. # Other commands that may be present on the classpath are not visible to this # morphline. importCommands : ["org.kitesdk.**", "org.apache.solr.**"] commands : [ { # Parse Avro container file and emit a record for each Avro object readAvroContainer { # Optionally, require the input to match one of these MIME types: # supportedMimeTypes : [avro/binary] # Optionally, use a custom Avro schema in JSON format inline: # readerSchemaString : """<json can go here>""" # Optionally, use a custom Avro schema file in JSON format: # readerSchemaFile : /path/to/syslog.avsc } } { # Consume the output record of the previous command and pipe another # record downstream. # # extractAvroPaths is a command that uses zero or more Avro path # excodeblockssions to extract values from an Avro object. Each excodeblockssion # consists of a record output field name, which appears to the left of the # colon ':' and zero or more path steps, which appear to the right. # Each path step is separated by a '/' slash. Avro arrays are # traversed with the '[]' notation. # # The result of a path excodeblockssion is a list of objects, each of which # is added to the given record output field. # # The path language supports all Avro concepts, including nested # structures, records, arrays, maps, unions, and others, as well as a flatten # option that collects the primitives in a subtree into a flat list. In the # paths specification, entries on the left of the colon are the target Solr # field and entries on the right specify the Avro source paths. Paths are read # from the source that is named to the right of the colon and written to the # field that is named on the left. extractAvroPaths { flatten : false paths : { id : /id username : /user_screen_name created_at : /created_at text : /text } } } # Consume the output record of the previous command and pipe another # record downstream. # # convert timestamp field to native Solr timestamp format # such as 2012-09-06T07:14:34Z to 2012-09-06T07:14:34.000Z { convertTimestamp { field : created_at inputFormats : ["yyyy-MM-dd'T'HH:mm:ss'Z'", "yyyy-MM-dd"] inputTimezone : America/Los_Angeles outputFormat : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" outputTimezone : UTC } } # Consume the output record of the previous command and pipe another # record downstream. # # This command deletes record fields that are unknown to Solr # schema.xml. # # Recall that Solr throws an exception on any attempt to load a document # that contains a field that is not specified in schema.xml. { sanitizeUnknownSolrFields { # Location from which to fetch Solr schema solrLocator : ${SOLR_LOCATOR} } } # log the record at DEBUG level to SLF4J { logDebug { format : "output record: {}", args : ["@{}"] } } # load the record into a Solr server or MapReduce Reducer { loadSolr { solrLocator : ${SOLR_LOCATOR} } } ] } ]
Using Morphlines with Syslog
The following example illustrates using a morphline to extract information from
a syslog
file. A syslog file contains semi-structured
lines of the following form:
<164>Feb 4 10:46:14 syslog sshd[607]: listening on 0.0.0.0 port 22.
The program extracts the following record from the log line and loads it into Solr:
syslog_pri:164
syslog_timestamp:Feb 4 10:46:14
syslog_hostname:syslog
syslog_program:sshd
syslog_pid:607
syslog_message:listening on 0.0.0.0 port 22.
Use the following rules to create a chain of transformation commands, which are
expressed with the
readLine
,
grok
,
and
logDebug
morphline commands, by editing a morphline.conf
file.
# Specify server locations in a SOLR_LOCATOR variable; used later in
# variable substitutions:
SOLR_LOCATOR : {
# Name of solr collection
collection : collection1
# ZooKeeper ensemble
zkHost : "127.0.0.1:2181/solr"
}
# Specify an array of one or more morphlines, each of which defines an ETL
# transformation chain. A morphline consists of one or more potentially
# nested commands. A morphline is a way to consume records such as Flume events,
# HDFS files or blocks, turn them into a stream of records, and pipe the stream
# of records through a set of easily configurable transformations on the way to
# a target application such as Solr.
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
{
readLine {
charset : UTF-8
}
}
{
grok {
# a grok-dictionary is a config file that contains prefabricated regular expressions
# that can be referred to by name.
# grok patterns specify such a regex name, plus an optional output field name.
# The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME}
# The input line is expected in the "message" input field.
dictionaryFiles : [target/test-classes/grok-dictionaries]
expressions : {
message : """<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"""
}
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# This command deletes record fields that are unknown to Solr
# schema.xml.
#
# Recall that Solr throws an exception on any attempt to load a document
# that contains a field that is not specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}
# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }
# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]
Next Steps
Learn more about morphlines. For more information, see: