Creating a Streaming Threat Intel Feed Source
Streaming intelligence feeds and are incorporated slightly differently than data from a flat CSV file. This section describes how to define a streaming source.
Because we are defining a streaming source, we need to define a parser topology to
handle the streaming data. In order to do that, we will need to create a file in
$METRONHOME/zookeeper/parsers/user.json
.
Define a parser topology to handle the streaming data:
touch $METRONHOME/zookeeper/parsers/user.json
Populate the file the parser topology definition. For example:
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"writerClassName" : "org.apache.metron.enrichment.writer.SimpleHbaseEnrichmentWriter" ,"sensorTopic":"user" ,"parserConfig": { "shew.table" : "enrichment" ,"shew.cf" : "t" ,"shew.keyColumns" : "ip" ,"shew.enrichmentType" : "user" ,"columns" : { "user" : 0 ,"ip" : 1 } } }
This file fully defines the input structure and how that data can be used in enrichment.
Push this configuration file to ZooKeeper and start the parser topology by running the following:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --zookeeper $ZOOKEEPER_HOST:2181 --replication-factor 1 --partitions 1 --topic user $METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/zookeeper $METRON_HOME/bin/start_parser_topology.sh -s user -z $ZOOKEEPER_HOST:2181 -k $KAFKA_HOST:6667
Now you have data flowing into the HBase table, but you need to ensure that the enrichment topology can be used to enrich the data flowing past.
Edit the new data source enrichment configuration at
$METRON_HOME/config/zookeeper/enrichments/$DATASOURCE
to associate theip_src_addr
with the user enrichment.For example:
{ "index" : "squid", "batchSize" : 1, "enrichment" : { "fieldMap" : { "hbaseEnrichment" : [ "ip_src_addr" ] }, "fieldToTypeMap" : { "ip_src_addr" : [ "user" ] }, "config" : { } }, "threatIntel" : { "fieldMap" : { }, "fieldToTypeMap" : { }, "config" : { }, "triageConfig" : { "riskLevelRules" : { }, "aggregator" : "MAX", "aggregationConfig" : { } } }, "configuration" : { } }
Push this configuration to ZooKeeper:
$METRON_HOME/bin/zk_load_configs.sh -m PUSH -z $ZOOKEEPER_HOST:2181 -i $METRON_HOME/zookeeper