Create a Parser for Your New Data Source by Using the CLI
As an alternative to using the HCP Management module to parse your new data source, you can use the CLI.
- Determine the format of the new data source’s log entries, so you can parse them:
- Use ssh to access the host for the new data source.
-
Look at the different log files and determine which to parse:
sudo su - cd /var/log/$NEW_DATASOURCE ls
The file you want is typically the access.log, but your data source might use a different name. -
Generate entries for the log that needs to be parsed so that you can see the
format of the entries:
timestamp | time elapsed | remotehost | code/status | bytes | method | URL rfc931 peerstatus/peerhost | type
- Create a Kafka topic for the new data source:
- Log in to $KAFKA_HOST as root.
-
Create a Kafka topic with the same name as the new data source:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST:2181 --create --topic $NEW_DATASOURCE --partitions 1 --replication-factor 1
-
Verify your new topic by listing the Kafka topics:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER_HOST:2181 --list
- Create a Grok statement file that defines the Grok expression for the log type you identified
in Step 1.
NoteYou must include
timestamp
to ensure that the system uses the event time rather than the system time. For information about setting the grok parser to use the current year, see step 5c.Refer to the Grok documentation for additional details. - Save the Grok pattern and load it into Hadoop Distributed File System (HDFS) in a named
location:
-
Create a local file for the new data source:
touch /tmp/$DATASOURCE
-
Open $DATASOURCE and add the Grok pattern defined in Step 3b:
vi /tmp/$DATASOURCE
-
Put the $DATASOURCE file into the HDFS directory where Metron stores its Grok
parsers.
Existing Grok parsers that ship with HCP are staged under
/apps/metron/patterns
:su - hdfs hadoop fs -rmr /apps/metron/patterns/$DATASOURCE hdfs dfs -put /tmp/$DATASOURCE /apps/metron/patterns/
-
Create a local file for the new data source:
-
Define a parser configuration for the Metron Parsing Topology.
-
As root, log into the host with HCP installed:
ssh $HCP_HOST
-
Create a $DATASOURCE parser configuration file at
$METRON_HOME/config/zookeeper/parsers/$DATASOURCE.json
:{ "parserClassName": "org.apache.metron.parsers.GrokParser", "filterClassName:": null, "sensorTopic": "$DATASOURCE", "outputTopic": null, "errorTopic": null, "readMetadata" : true, "mergeMetadata" : true, "rawMessageStrtegy" : "ENVELOPE" "rawMessageStrategyConfig" : { "messsageField" : "data", "metadataPefix" : "" "numWorkers": null, "numAckers": null, "spoutParallelism": 1, "spoutNumTasks": 1, "parserParallelism": 1, "parserNumTasks": 1, "errorWriterParallism": 1, "errorWriterNumTasks": 1, "spoutConfig:" :{}, "securityProtocol:" null, "stormConfig": {}, "parserConfig": { "grokPath": "/apps/metron/patterns/$DATASOURCE", "patternLabel": "$DATASOURCE_DELIMITED", "timestampField": "timestamp" }, "fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "full_hostname", "domain_without_subdomains" ] ,"config" : { "full_hostname" : "URL_TO_HOST(url)" ,"domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)" } } ] }
- parserClassName
-
The name of the parser's class in the .jar file.
- filterClassName
- The filter to use.
- This can be the fully qualified name of a class that implements the
org.apache.metron.parsers.interfaces.MessageFilter<JSONObject>
interface. Message filters enable you to ignore a set of messages by using custom logic. The existing implementation isSTELLAR
. The Stellar implementation enables you to apply a Stellar statement that returns a Boolean, which passes every message for which the statement returnstrue
. The stellar statement is specified by thefilter.query
property in theparserConfig
. For example, the following Stellar filter includes messages that contain afield1
field: -
{ "filterClassName" : "STELLAR" ,"parserConfig" : { "filter.query" : "exists(field1)" } }
- sensorTopic
-
The Kafka topic on which the telemetry is being streamed. If the topic is prefixed and suffixed by
/
then it is assumed to be a regex and will match any topic matching the pattern (for example,/bro.*/
matchesbro_cust0
,bro_cust1
andbro_cust2
). - readMetadata
-
A Boolean indicating whether to read metadata and make it available to field transformations (
false
by default).There are two types of metadata supported in HCP:
- Environmental metadata about the whole system
For example, if you have multiple Kafka topics being processed by one parser, you might want to tag the messages with the Kafka topic.
- Custom metadata from an individual telemetry source that you might want to use within Metron
- Environmental metadata about the whole system
- mergeMetadata
-
A Boolean indicating whether to merge metadata with the message (
false
by default).If this property is set to
true
, then every metadata field becomes part of the messages and, consequently, is also available for field transformations. - rawMessageStrategy
-
The strategy to use when reading the raw data and metadata. The following strategies are supported:
DEFAULT
: Data is read directly from the Kafka record value and metadata, if any, is read from the Kafka record key. This strategy defaults to not reading metadata and not merging metadata.ENVELOPE
: Data from the Kafka record value is presumed to be a JSON blob. One of these fields must contain the raw data to pass to the parser. All other fields should be considered metadata. The field containing the raw data is specified in therawMessageStrategyConfig
. Data held in the Kafka key as well as the non-data fields in the JSON blob passed into the Kafka value are considered metadata. Note that the exception to this is that anyoriginal_string
field is inherited from the envelope data so that the original string contains the envelope data. If you do not prefer this behavior, remove this field from the envelope data.
- rawMessageStrategyConfig
-
The raw message strategy configuration map. The following strategies are supported:
- DEFAULT:
metadataPrefix
defines the key prefix for metadata (default ismetron.metadata
). - ENVELOPE:
metadataPrefix
defines the key prefix for metadata (default ismetron.metadata
)messageField
defines the field from the envelope to use as the data. All other fields are considered metadata.
- DEFAULT:
- numWorkers
-
The number of workers to use in the topology (default is the storm default of
1
). - numAckers
-
The number of acker executors to use in the topology (default is the Storm default of
1
). - spoutParallelism
-
The Kafka spout parallelism (default to
1
). You can override the default on the command line and if there are multiple sensors they should be in a comma separated list in the same order as the sensors. - spoutNumTasks
-
The number of tasks for the spout (default to
1
). You can override the default on the command line, and if there are multiple sensors they should be in a comma separated list in the same order as the sensors. - parserParallelism
-
The parser bolt parallelism (default to
1
). This can be overridden on the command line , and if there are multiple sensors should be in a comma separated list in the same order as the sensors. - parserNumTasks
-
The number of tasks for the parser bolt (default to
1
). If there are multiple sensors, the last one's configuration will be used. This can be overridden on the command line. - errorWriterParallelism
-
The error writer bolt parallelism (default to
1
). This can be overridden on the command line. - errorWriterNumTasks
-
The number of tasks for the error writer bolt (default to
1
). This can be overridden on the command line. - spoutConfig
-
A map representing a custom spout configuration (this is a map). If there are multiple sensors, the configs will be merged with the last specified taking precedence. This can be overridden on the command line.
- securityProtocol
-
The security protocol to use for reading from Kafka (this is a string). This can be overridden on the command line and also specified in the spout configuration via the
security.protocol
key. If both are specified, then they are merged and the CLI will take precedence. If multiple sensors are used, any non "PLAINTEXT" value will be used. - stormConfig
-
The storm configuration to use (this is a map). This can be overridden on the command line. If both are specified, they are merged with CLI properties taking precedence.
- cacheConfig
-
Cache config for stellar field transformations. This configures a least frequently used cache. This is a map with the following keys. If not explicitly configured (the default), then no cache will be used.
-
stellar.cache.maxSize
- The maximum number of elements in the cache. Default is to not use a cache.stellar.cache.maxTimeRetain
- The maximum amount of time an element is kept in the cache (in minutes). Default is to not use a cache.
-
- grokPath
-
The path for the Grok statement.
- patternLabel
-
The top-level pattern of the Grok file.
- parserConfig
-
A JSON map defining the parser implementation configuration.
This configuration file also includes batch sizing and timeout settings for writer configuration. If you do not define these properties, the system uses their default values.
- batchSize - Number of records to batch together before sending to the writer. Default is 15.
- batchTimeout - Optional. The timeout after which a batch is flushed even if the batchSize is not met.
-
"parserConfig" { "batchSize": 15, "batchTimeout" : 0 },
- In addition, you can override settings for the kafka writer within the parserConfig file.
- fieldTransformations
-
An array of complex objects representing the transformations to be performed on the message generated from the parser before writing to the Kafka topic. You can use three values:
- STELLAR
- This transformation executes a set of transformations
expressed as
[Stellar Language](../metron-common)
statements.In the previous example, the Grok parser is designed to extract the URL, but the only information that you need is the domain (or even the domain without subdomains). To obtain this, you can use the STELLAR Field Transformation. The STELLAR Field Transformation enables you to use the Stellar DSL (Domain Specific Language) to define extra transformations to be performed on the messages flowing through the topology.
- RENAME
- This transformation allows users to rename a set of fields.
Specifically, the configuration is presumed to be the
mapping. The keys to the configuration are the existing field
names and the values for the configuration map are the
associated new field name.
For example, the following configuration will rename the fields
old_field
anddifferent_old_field
tonew_field
anddifferent_new_field
respectively: -
{ ... "fieldTransformations" : [ { "transformation" : "RENAME", , "config" : { "old_field" : "new_field", "different_old_field" : "different_new_field" } } ] }
- REGEX_SELECT
- This transformation lets users set an output field to one of a set of possibilities based on matching regexes. This transformation is useful when the number or conditions are large enough to make a stellar language match statement unwieldy.
-
For example, the following configuration will set the field
logical_source_type
to one of the following, dependent upon the value of thepix_type
field: -
cisco-6-302
ifpix_type
starts with either6-302
or06-302
cisco-5-304
ifpix_type
starts with5-304
-
{ ... "fieldTransformations" : [ { "transformation" : "REGEX_ROUTING" ,"input" : "pix_type" ,"output" : "logical_source_type" ,"config" : { "cisco-6-302" : [ "^6-302.*", "^06-302.*"] "cisco-5-304" : "^5-304.*" } } ] ... }
-
If you want to set the grok parser to use the current year in its timestamp,
add the following information to the transformations function in the
datasource
json file:"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "timestamp"] ,"config" : { "timestamp”: “TO_EPOCH_TIMESTAMP(FORMAT(‘%s %d’, timestamp_str , YEAR()), ‘MMM dd HH:mm:ss:yyyy’)”
For example, the datasource json file would change to:"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "full_hostname", "domain_without_subdomains" , "timestamp"] ,"config" : { "full_hostname" : "URL_TO_HOST(url)" ,"domain_without_subdomains" : ,”timestamp”: “TO_EPOCH_TIMESTAMP(FORMAT(‘%s %d’, timestamp_str , YEAR()), ‘MMM dd HH:mm:ss:yyyy’)” "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
-
Use the following script to upload configurations to Apache ZooKeeper:
$METRON_HOME/bin/zk_load_configs.sh --mode PUSH -i $METRON_HOME/config/zookeeper -z $ZOOKEEPER_HOST:2181
-
As root, log into the host with HCP installed:
- Deploy the new parser topology to the cluster:
If you want to deploy multiple parsers on one topology, refer to Creating Multiple Parsers on One Topology.
- Log in to the host that has Metron installed as root user.
-
Deploy the new parser topology:
$METRON_HOME/bin/start_parser_topology.sh -k $KAFKA_HOST:6667 -z $ZOOKEEPER_HOST:2181 -s $DATASOURCE
- Use the Apache Storm UI to verify that the new topology is listed and that it has no errors.
This new data source processor topology ingests from the $DATASOURCE Kafka topic that you created earlier and then parses the event with the HCP Grok framework using the Grok pattern defined earlier.