KafkaBolt Integration: Core Storm APIs
To use KafkaBolt
, create an instance of
org.apache.storm.kafka.bolt.KafkaBolt
and attach it as a component to your
topology.
The following example shows construction of a Kafka bolt using core Storm APIs, followed by details about the code:
TopologyBuilder builder = new TopologyBuilder(); Fields fields = new Fields("key", "message"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); builder.setSpout("spout", spout, 5); //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaBolt bolt = new KafkaBolt() .withProducerProperties(props) .withTopicSelector(new DefaultTopicSelector("test")) .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout"); Config conf = new Config(); StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
- Instantiate a
KafkaBolt
.The core-storm API uses the
storm.kafka.bolt.KafkaBolt
class to instantiate a Kafka Bolt:KafkaBolt bolt = new KafkaBolt();
- Configure the KafkaBolt with a Tuple-to-Message Mapper.
The
KafkaBolt
maps Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides thestorm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper
class to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.KafkaBolt bolt = new KafkaBolt() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
- Configure the Kafka Bolt with a Kafka Topic Selector.Note
To ignore a message, return NULL from the
getTopics()
method.KafkaBolt bolt = new KafkaBolt().withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()) .withTopicSelector(new DefaultTopicSelector());
If you need to write to multiple Kafka topics, you can write your own implementation of the
KafkaTopicSelector
interface . - Configure the Kafka Bolt with Kafka Producer properties.
You can specify producer properties in your Storm topology by calling
KafkaBolt.withProducerProperties()
. See the Apache Producer Configs documentation for more information.