KafkaBolt Integration: Trident APIs
To use KafkaBolt
, create an instance of
org.apache.storm.kafka.trident.TridentState
and
org.apache.storm.kafka.trident.TridentStateFactory
, and attach them to
your topology. The following example shows construction of a Kafka bolt using Trident
APIs, followed by details about the code:
Fields fields = new Fields("word", "count"); FixedBatchSpout spout = new FixedBatchSpout(fields, 4, new Values("storm", "1"), new Values("trident", "1"), new Values("needs", "1"), new Values("javadoc", "1") ); spout.setCycle(true); TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout1", spout); //set producer properties. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "1"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withProducerProperties(props) .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); Config conf = new Config(); StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
Instantiate a KafkaBolt.
The Trident API uses a combination of the
storm.kafka.trident.TridentStateFactory
andstorm.kafka.trident.TridentKafkaStateFactory
classes.TridentTopology topology = new TridentTopology(); Stream stream = topology.newStream("spout"); TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory(); stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Configure the KafkaBolt with a Tuple-to-Message Mapper.
The KafkaBolt must map Storm tuples to Kafka messages. By default, KafkaBolt looks for fields named "key" and "message." Storm provides the
storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper
class to support this default behavior and provide backward compatibility. The class is used by both the core-storm and Trident APIs.TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
You must specify the field names for the Storm tuple key and the Kafka message for any implementation of the
TridentKafkaState
in the Trident API. This interface does not provide a default constructor.However, some Kafka bolts may require more than two fields. You can write your own implementation of the
TupleToKafkaMapper
andTridentTupleToKafkaMapper
interfaces to customize the mapping of Storm tuples to Kafka messages. Both interfaces define two methods:K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
Configure the
KafkaBolt
with a Kafka Topic Selector.Note To ignore a message, return NULL from the
getTopics()
method.TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() .withKafkaTopicSelector(new DefaultTopicSelector("test")) .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
If you need to write to multiple Kafka topics, you can write your own implementation of the
KafkaTopicSelector
interface; for example:public interface KafkaTopicSelector { String getTopics(Tuple/TridentTuple tuple); }
Configure the
KafkaBolt
with Kafka Producer properties.You can specify producer properties in your Storm topology by calling
TridentKafkaStateFactory.withProducerProperties()
. See the Apache Producer Configs documentation for more information.