KafkaBolt Integration: Core Storm APIs
To use KafkaBolt
, create an instance of
org.apache.storm.kafka.bolt.KafkaBolt
and attach it as a component to
your topology. The following example shows construction of a Kafka bolt using core Storm
APIs, followed by details about the code:
TopologyBuilder builder = new TopologyBuilder(); Fields fields = new Fields("key", "message"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); builder.setSpout("spout", spout, 5); //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaBolt bolt = new KafkaBolt() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector("test")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout"); Config conf = new Config(); StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
Instantiate a
KafkaBolt
.The core-storm API uses the
storm.kafka.bolt.KafkaBolt
class to instantiate a Kafka Bolt:KafkaBolt bolt = new KafkaBolt();
Configure the KafkaBolt with a Tuple-to-Message Mapper.
The
KafkaBolt
maps Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides thestorm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper
class to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.KafkaBolt bolt = new KafkaBolt() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
Configure the Kafka Bolt with a Kafka Topic Selector.
Note To ignore a message, return NULL from the
getTopics()
method.KafkaBolt bolt = new KafkaBolt().withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()) .withTopicSelector(new DefaultTopicSelector());
If you need to write to multiple Kafka topics, you can write your own implementation of the
KafkaTopicSelector
interface .Configure the Kafka Bolt with Kafka Producer properties.
You can specify producer properties in your Storm topology by calling
KafkaBolt.withProducerProperties()
. See the Apache Producer Configs documentation for more information.