Chain Parsers
Many sensors contain metadata that should be ingested along with the data or contain different sensor types that need to be parsed separately. You can chain multiple parsers for a sensor to individually address the different types of information in the sensor. For example, you can parse multiple components in a Syslog log file such as timestamp, message type, and message payload, to differentiate the information contained in the log file. To chain parsers, you need an enveloping parser and sub-parsers for one or more sensor types.
-
Before editing configurations, pull the configurations from ZooKeeper
locally:
$METRON_HOME/bin/zk_load_configs.sh --mode PULL -z $ZOOKEEPER -o $METRON_HOME/config/zookeeper/ -f
For ease of explanation, steps in this topic use the Grok parser format example provided in Step 2c. - Determine the format of the new data source’s log entries, so you can parse them.
-
Create a statement that defines the pattern of the parser expression for the log
type for your enveloping parser.
For ease of explanation, we assume that we are using a Grok topology. Refer to the Grok documentation for additional details.
-
Save the Grok statement and load it into Hadoop Distributed File System (HDFS) in a
named location:
-
Create a local file for the new data source:
touch /tmp/$ENVELOPE_DATASOURCE
-
Open $ENVELOPE_DATASOURCE and add the Grok statement defined in Step 3:
vi /tmp/$ENVELOPE_DATASOURCE
-
Put the $ENVELOPE_DATASOURCE file into the HDFS directory where Metron stores
its Grok parsers.
Existing Grok parsers that ship with CCP are staged under
/apps/metron/patterns
:su - hdfs hadoop fs -rmr /apps/metron/patterns/$ENVELOPE_DATASOURCE hdfs dfs -put /tmp/$ENVELOPE_DATASOURCE /apps/metron/patterns/
-
Create a local file for the new data source:
-
Define the enveloping parser configuration.
-
As root, log into the host with CCP installed:
ssh $CCP_HOST
-
Create a $DATASOURCE envelope parser configuration file at
$METRON_HOME/config/zookeeper/parsers/$ENVELOPE_DATASOURCE.json
:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic $ENVELOPE_DATASOURCE --partitions 1 --replication-factor 1
-
Verify your new topic by listing the Kafka topics:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --list
-
Populate the $ENVELOPE_PARSER Kafka topic with the following:
{ "parserClassName": "org.apache.metron.parsers.GrokParser", "sensorTopic": "$ENVELOPE_DATASOURCE", "parserConfig": { "grokPath": "/apps/metron/patterns/$ENVELOPE_DATASOURCE", "batchSize" : 1, "patternLabel": "$DATASOURCE_DELIMITED", "timestampField": "timestamp" "timeFields" : [ "timestamp" ], "dateFormat" : "MMM dd yyyy HH:mm:ss", "kafka.topicField" : "logical_source_type" }
The important parameters to set for this parser are the following:- parserClassName
-
The name of the parser's class in the .jar file.
- sensorTopic
-
The Kafka topic on which the telemetry is being streamed. If the topic is prefixed and suffixed by
/
then it is assumed to be a regex and will match any topic matching the pattern (for example,/bro.*/
matchesbro_cust0
,bro_cust1
andbro_cust2
). - parserConfig
-
A JSON map defining the parser implementation configuration.
For an envelope parser, this parameter specifies that the parser will send messages to the topic specified in the
logical_source_type
field. If the field does not exist, then the message is not sent.
EXAMPLE for Envelope ParserThe following is an example of an envelope parser calledpix_syslog_router
configured to:- Parse the timestamp field
- Parse the payload into a field called
data
(messageField" : "data
) - Parse the tag into a field called
pix_type
(input": "pix_type
) - Route the enveloped message to the appropriate Kafka topic based on the tag.
In this case, it's called
logical_source_type
.
The envelope parser will send output to two sub-parsers:- cisco-6-302 - Connection creation and teardown messages, for example,
Built UDP connection for faddr 198.207.223.240/53337 gaddr 10.0.0.187/53 laddr 192.168.0.2/53
- cisco-5-304 - URL access events, for example
192.168.0.2 Accessed URL 66.102.9.99:/
In order for this parser configuration to work, you must create a file calledcisco_patterns
and populate it with the following grok expressions:CISCO_ACTION Built|Teardown|Deny|Denied|denied|requested|permitted|denied by ACL|discarded|est-allowed|Dropping|created|deleted CISCO_REASON Duplicate TCP SYN|Failed to locate egress interface|Invalid transport field|No matching connection|DNS Response|DNS Query|(?:%{WORD}\s*)* CISCO_DIRECTION Inbound|inbound|Outbound|outbound CISCOFW302020_302021 %{CISCO_ACTION:action}(?:%{CISCO_DIRECTION:direction})? %{WORD:protocol} connection %{GREEDYDATA:ignore} faddr %{IP:ip_dst_addr}/%{INT:icmp_seq_num}(?:\(%{DATA:fwuser}\))? gaddr %{IP:ip_src_xlated}/%{INT:icmp_code_xlated} laddr %{IP:ip_src_addr}/%{INT:icmp_code}( \(%{DATA:user}\))? ACCESSED %{URIHOST:ip_src_addr} Accessed URL %{IP:ip_dst_addr}:%{URIPATHPARAM:uri_path} CISCO_PIX %{GREEDYDATA:timestamp}: %PIX-%{NOTSPACE:pix_type}: %{GREEDYDATA:data}
Place the file at/tmp/cisco_patterns
in HDFS by using:hadoop fs -put ~/cisco_patterns /tmp
Parser Configuration{ "parserClassName" : "org.apache.metron.parsers.GrokParser" ,"sensorTopic" : "pix_syslog_router" , "parserConfig": { "grokPath": "/tmp/cisco_patterns", "batchSize" : 1, "patternLabel": "CISCO_PIX", "timestampField": "timestamp", "timeFields" : [ "timestamp" ], "dateFormat" : "MMM dd yyyy HH:mm:ss", "kafka.topicField" : "logical_source_type" } ,"fieldTransformations" : [ { "transformation" : "REGEX_SELECT" ,"input" : "pix_type" ,"output" : "logical_source_type" ,"config" : { "cisco-6-302" : "^6-302.*", "cisco-5-304" : "^5-304.*" } } ] }
- fieldTransformations
-
An array of complex objects representing the transformations to be performed on the message generated from the parser before writing to the Kafka topic.
For this example, this parameter includes the following options:
-
- transformation - The REGEX_SELECT field transformation sets the logical_source_type field based on the value of the input value.
- input - Determines the subparser type.
- output - The output of the field transform.
- config - The name of the sub-parsers and the REGEX that matches them.
-
As root, log into the host with CCP installed:
-
Define one or more sub-parser configurations.
-
As root, log into the host with CCP installed:
ssh $CCP_HOST
-
Create a $DATASOURCE sub-parser configuration file at
$METRON_HOME/config/zookeeper/parsers/$SUBPARSER_DATASOURCE.json
:/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic $SUBPARSER_DATASOURCE --partitions 1 --replication-factor 1
-
Populate the $SUBPARSER_DATASOURCE.json file with the following:
{ "parserClassName": "org.apache.metron.parsers.GrokParser", "sensorTopic": "$SUBPARSER_DATASOURCE", "rawMessageStrategy" : "ENVELOPE" ,"rawMessageStrategyConfig" : { "messageField" : "data", "metadataPrefix" : "" "parserConfig": { "grokPath": "/apps/metron/patterns/$SUBPARSER_DATASOURCE", "batchSize" : 1, "patternLabel": "$DATASOURCE_DELIMITED", "timestampField": "timestamp" "timeFields" : [ "timestamp" ], "dateFormat" : "MMM dd yyyy HH:mm:ss", "kafka.topicField" : "logical_source_type" } }
The important parameters to set for this parser are the following:- parserClassName
-
The name of the parser's class in the .jar file.
- sensorTopic
-
The Kafka topic on which the telemetry is being streamed. If the topic is prefixed and suffixed by
/
then it is assumed to be a regex and will match any topic matching the pattern (for example,/bro.*/
matchesbro_cust0
,bro_cust1
andbro_cust2
). - rawMessageStrategyConfig
- This is a strategy that indicates how to read data and metadata. The
strategies supported are:
- DEFAULT - Data is read directly from the Kafka record value and metadata, if any, is read from the Lafka record key. This strategy defaults to not reading metadata and not merging metadata.
- ENVELOPE - Data from Kafka record value is presumed to
be a JSON blob. One of these fields must contain the raw data to
pass to the parser. All other fields should be considered
metadata. The field containing the raw data is specified in
rawMessageStrategyConfig
. Data held in the Kafka key as well as the non-data fields in the JSON blob passed into the Kafka value are considered metadata. Note that the exception to this is that anyoriginal_string
field is inherited from the envelope data so that the original string contains the envelope data. If you do not prefer this behavior, remove this field from the envelope data.
- rawMessageStrategyConfig
- The configuration (a map) for the
rawMessageStrategy
. Available configurations are strategy dependent:- DEFAULT -
metadataPrefix
defines the key prefix for metadata (default ismetron.metadata
). - ENVELOPE -
metadataPrefix
defines the key prefix for metadata (default ismetron.metadata
)messageField
defines the field from the envelope to use as the data. All other fields are considered metadata.
- DEFAULT -
- parserConfig
-
A JSON map defining the parser implementation configuration.
For a chained parser, this parameter specifies that the parser will send messages to the topic specified in the
logical_source_type
field. If the field does not exist, then the message is not sent.This parameter also includes batch sizing and timeout settings for writer configuration. If you do not define these properties, the system uses their default values.
- grokPath - The path in HDFS (or in the Jar) to the grok statement. By default attempts to load from HDFS, then falls back to the classpath, and finally throws an exception if unable to load a pattern..
- batchSize - Number of records to batch together before sending to the writer. Default is 15.
- patternLabel - The name of the Grok statement that defines the pattern of the Grok expression.
- kafka.topicField - Specifies the topic as the value of a
particular field.
This field enables the routing capabilities necessary for handling enveloped date. sIf this value is unpopulated, the message is dropped.
- timestampField - Use the
timestamp
value to ensure that the system uses the event time rather than the system time.
EXAMPLE for Sub-ParserThe following is an example of a parser calledcisco-6-302
configured to append to the existing fields from thepix_syslog_router
the sensor specific fields based on the tag type.{ "parserClassName" : "org.apache.metron.parsers.GrokParser" ,"sensorTopic" : "cisco-6-302" ,"rawMessageStrategy" : "ENVELOPE" ,"rawMessageStrategyConfig" : { "messageField" : "data", "metadataPrefix" : "" } , "parserConfig": { "grokPath": "/tmp/cisco_patterns", "batchSize" : 1, "patternLabel": "CISCOFW302020_302021" } }
-
As root, log into the host with CCP installed:
-
Use the following script to upload configurations to Apache ZooKeeper:
$METRON_HOME/bin/zk_load_configs.sh --mode PUSH -i $METRON_HOME/config/zookeeper -z $ZOOKEEPER
-
Deploy the new parser topology to the cluster:
- Log in to the host that has Metron installed as root user.
-
Deploy the new parser topology:
$METRON_HOME/bin/start_parser_topology.sh -k $KAFKA -z $ZOOKEEPER -s $DATASOURCE
- Use the Apache Storm UI to verify that the new topology is listed and that it has no errors.
This new data source processor topology ingests from the $DATASOURCE Kafka topic that you created earlier and then parses the event with the CCP Grok framework using the Grok pattern defined earlier.