Using Morphlines with Syslog
This example illustrates using a morphline to extract information from a
syslog
file.
A syslog file contains semi-structured lines of the following form:
<164>Feb 4 10:46:14 syslog sshd[607]: listening on 0.0.0.0 port 22.
The program extracts the following record from the log line and loads it into Solr:
syslog_pri:164
syslog_timestamp:Feb 4 10:46:14
syslog_hostname:syslog
syslog_program:sshd
syslog_pid:607
syslog_message:listening on 0.0.0.0 port 22.
Use the following rules to create a chain of transformation commands, which are expressed with
the readLine
, grok
, and logDebug
morphline commands, by editing a
morphline.conf
file.
# Specify server locations in a SOLR_LOCATOR variable; used later in
# variable substitutions:
SOLR_LOCATOR : {
# Name of solr collection
collection : collection1
# ZooKeeper ensemble
zkHost : "127.0.0.1:2181/solr"
}
# Specify an array of one or more morphlines, each of which defines an ETL
# transformation chain. A morphline consists of one or more potentially
# nested commands. A morphline is a way to consume records such as Flume events,
# HDFS files or blocks, turn them into a stream of records, and pipe the stream
# of records through a set of easily configurable transformations on the way to
# a target application such as Solr.
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
{
readLine {
charset : UTF-8
}
}
{
grok {
# a grok-dictionary is a config file that contains prefabricated regular expressions
# that can be referred to by name.
# grok patterns specify such a regex name, plus an optional output field name.
# The syntax is %{REGEX_NAME:OUTPUT_FIELD_NAME}
# The input line is expected in the "message" input field.
dictionaryFiles : [target/test-classes/grok-dictionaries]
expressions : {
message : """<%{POSINT:syslog_pri}>%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}"""
}
}
}
# Consume the output record of the previous command and pipe another
# record downstream.
#
# This command deletes record fields that are unknown to Solr
# schema.xml.
#
# Recall that Solr throws an exception on any attempt to load a document
# that contains a field that is not specified in schema.xml.
{
sanitizeUnknownSolrFields {
# Location from which to fetch Solr schema
solrLocator : ${SOLR_LOCATOR}
}
}
# log the record at DEBUG level to SLF4J
{ logDebug { format : "output record: {}", args : ["@{}"] } }
# load the record into a Solr server or MapReduce Reducer
{
loadSolr {
solrLocator : ${SOLR_LOCATOR}
}
}
]
}
]