Create a Streaming Threat Intel Feed Source
Streaming intelligence feeds are incorporated slightly differently than data from a flat CSV file. Because you are defining a streaming source, you need to define a parser topology to handle the streaming data. In order to do that, you will need to create a file in $METRON_HOME/config/zookeeper/parsers/user.json.
- Define a parser topology to handle the streaming data:
touch $METRON_HOME/config/zookeeper/parsers/user.json
- Populate the file with the parser topology definition.
For example:
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter" ,"sensorTopic":"user" ,"parserConfig": { "shew.table" : "threatintel" ,"shew.cf" : "t" ,"shew.keyColumns" : "ip" ,"shew.enrichmentType" : "user" ,"columns" : { "user" : 0 ,"ip" : 1 } } }
where- parserClassName
-
The parser name.
- writerClassName
-
The writer destination. For streaming parsers, the destination is
SimpleHbaseEnrichmentWriter
. - sensorTopic
-
Name of the sensor topic.
- shew.table
-
The simple HBase enrichment writer (shew) table to which we want to write.
- shew.cf
-
The simple HBase enrichment writer (shew) column family.
- shew.keyColumns
-
The simple HBase enrichment writer (shew) key.
- shew.enrichmentType
-
The simple HBase enrichment writer (shew) enrichment type.
- columns
-
The CSV parser information. For our example, this information is the user name and IP address.
This file fully defines the input structure and how that data can be used in enrichment. - Push the configuration file to ZooKeeper:
-
Create a Kafka topic:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper $ZOOKEEPER_HOST:2181 --replication-factor 1 --partitions 1 --topic user
When you create the Kafka topic, consider how much data will be flowing into this topic. -
Push the configuration file to ZooKeeper.
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/config/zookeeper
-
Create a Kafka topic:
- Edit the new data source enrichment configuration at
$METRON_HOME/config/zookeeper/enrichments/$DATASOURCE
to associate theip_src_addr
with the user enrichment.For example:{ "enrichment" : { "fieldMap" : { "hbaseEnrichment" : [ "ip_src_addr" ] }, "fieldToTypeMap" : { "ip_src_addr" : [ "user" ] }, "config" : { } }, "threatIntel" : { "fieldMap" : { }, "fieldToTypeMap" : { }, "config" : { }, "triageConfig" : { "riskLevelRules" : { }, "aggregator" : "MAX", "aggregationConfig" : { } } }, "configuration" : { } }
- Push this configuration to ZooKeeper:
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/config/zookeeper