Writing Data to Kafka using a Kafka Bolt
Storm provides a Kafka Bolt for both the core-storm and Trident APIs that writes data to a Kafka cluster, also known as publishing to a topic using Kafka's terminology. Use the following procedure to add a Storm component to your topology that writes data to a Kafka cluster:
Instantiate a Kafka Bolt.
Configure the Kafka Bolt with a Tuple-to-Message mapper.
Configure the Kafka Bolt with a Kafka Topic Selector.
Configure the Kafka Bolt with the Kafka Producer properties.
The following code samples for each API illustrate the construction of a simple Kafka Bolt. The rest of this topic breaks the samples down to better describe each step.
Core-storm API
TopologyBuilder builder = new TopologyBuilder(); Fields fields = new Fields("key", "message"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); builder.setSpout("spout", spout, 5); KafkaBolt bolt = new KafkaBolt() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout"); Config conf = new Config(); //set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
Trident API
Fields fields = new Fields("word", "count"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); Config conf = new Config(); //set producer properties. Properties props = new Properties(); props.put("metadata.broker.list", "localhost:9092"); props.put("request.required.acks", "1"); props.put("serializer.class", "kafka.serializer.StringEncoder"); conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
Instantiate a KafkaBolt
The core-storm API uses the storm.kafka.bolt.KafkaBolt
class to instantiate a Kafka Bolt. The Trident API uses a combination of the storm.kafka.trident.TridentStateFactory
and storm.kafka.trident.TridentKafkaStateFactory
classes.
Core-storm API
KafkaBolt bolt = new KafkaBolt();
Trident API
TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout"); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory(); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Configure the KafkaBolt with a Tuple-to-Message Mapper
The KafkaBolt must map Storm tuples to Kafka messages. By default, KafkaBolt looks
for fields named "key" and "message." Storm provides the
storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper
class to support this default behavior and provide backward compatibility. The class
is used by both the core-storm and Trident APIs.
Core-storm API
KafkaBolt bolt = new KafkaBolt() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
Trident API
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
Storm developers must specify the field names for the Storm tuple key and the Kafka message for any implementation of the TridentKafkaState
in the Trident API. This interface does not provide a default constructor.
However, some Kafka bolts may require more than two fields. Storm developers may write their own implementation of the TupleToKafkaMapper
and TridentTupleToKafkaMapper
interfaces to customize the mapping of Storm tuples to Kafka messages. Both interfaces define 2 methods:
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
Configure the Kafka Bolt with a Kafka Topic Selector
Note | |
---|---|
To ignore a message, return NULL from the |
Core-storm API
KafkaBolt bolt = new KafkaBolt().withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()) .withTopicSelector(new DefaultTopicSelector());
Trident API
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
Storm developers can write their own implementation of the KafkaTopicSelector
interface if they need to write to multiple Kafka topics:
public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }