7.3. Writing Data to Kafka using a Kafka Bolt

Storm provides a Kafka Bolt for both the core-storm and Trident APIs that writes data to a Kafka cluster, also known as publishing to a topic using Kafka's terminology. Use the following procedure to add a Storm component to your topology that writes data to a Kafka cluster:

  1. Instantiate a Kafka Bolt.

  2. Configure the Kafka Bolt with a Tuple-to-Message mapper.

  3. Configure the Kafka Bolt with a Kafka Topic Selector.

  4. Configure the Kafka Bolt with the Kafka Producer properties.

The following code samples for each API illustrate the construction of a simple Kafka Bolt. The rest of this topic breaks the samples down to better describe each step.

Core-storm API

 TopologyBuilder builder = new TopologyBuilder();
 
 Fields fields = new Fields("key", "message");
 FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
 new Values("storm", "1"),
 new Values("trident", "1"),
 new Values("needs", "1"),
 new Values("javadoc", "1")
 );
 
 spout.setCycle(true);
 builder.setSpout("spout", spout, 5);
 KafkaBolt bolt = new KafkaBolt()
 .withKafkaTopicSelector(new DefaultTopicSelector("test"))
 .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
 builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
 
 Config conf = new Config();
 //set producer properties.
 Properties props = new Properties();
 props.put("metadata.broker.list", "localhost:9092");
 props.put("request.required.acks", "1");
 props.put("serializer.class", "kafka.serializer.StringEncoder");
 conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
 
 StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());

Trident API

 Fields fields = new Fields("word", "count");
 FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
 new Values("storm", "1"),
 new Values("trident", "1"),
 new Values("needs", "1"),
 new Values("javadoc", "1")
 );
 
 spout.setCycle(true);
 
 TridentTopology topology = new TridentTopology();
 Stream stream = topology.newStream("spout1", spout);
 
 TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
 .withKafkaTopicSelector(new DefaultTopicSelector("test"))
 .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
 stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
 
 Config conf = new Config();
 //set producer properties.
 Properties props = new Properties();
 props.put("metadata.broker.list", "localhost:9092");
 props.put("request.required.acks", "1");
 props.put("serializer.class", "kafka.serializer.StringEncoder");
 conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
 StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());

Instantiate a KafkaBolt

The core-storm API uses the storm.kafka.bolt.KafkaBolt class to instantiate a Kafka Bolt. The Trident API uses a combination of the storm.kafka.trident.TridentStateFactory and storm.kafka.trident.TridentKafkaStateFactory classes.

Core-storm API

KafkaBolt bolt = new KafkaBolt();

Trident API

TridentTopology topology = new TridentTopology();
 Stream stream = topology.newStream("spout");
 TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory();
 stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); 

Configure the KafkaBolt with a Tuple-to-Message Mapper

The KafkaBolt must map Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper class to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.

Core-storm API

KafkaBolt bolt = new KafkaBolt()
 .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); 

Trident API

TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
 .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); 

Storm developers must specify the field names for the Storm tuple key and the Kafka message for any implementation of the TridentKafkaState in the Trident API. This interface does not provide a default constructor.

However, some Kafka bolts may require more than two fields. Storm developers may write their own implementation of the TupleToKafkaMapper and TridentTupleToKafkaMapper interfaces to customize the mapping of Storm tuples to Kafka messages. Both interfaces define 2 methods:

K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);

Configure the Kafka Bolt with a Kafka Topic Selector

[Note]Note

To ignore a message, return NULL from the getTopics() method.

Core-storm API

KafkaBolt bolt = new KafkaBolt().withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper())
 .withTopicSelector(new DefaultTopicSelector()); 

Trident API

TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
 .withKafkaTopicSelector(new DefaultTopicSelector("test"))
 .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); 

Storm developers can write their own implementation of the KafkaTopicSelector interface if they need to write to multiple Kafka topics:

public interface KafkaTopicSelector {
 String getTopics(Tuple/TridentTuple tuple);
 }