Storm Guide
Also available as:

Storm-Kafka API Reference

Javadoc for the storm-kafka component is installed at <$STORM_HOME>/contrib/storm-kafka/storm-kafka-<buildnumber>-javadoc.jar. This section provides additional reference documentation for the primary classes and interfaces of the storm-kafka component.

BrokerHosts Interface

The storm-kafka component provides two implementations of the BrokerHosts interface: ZkHosts and StaticHosts. Use the ZkHosts implementation to dynamically track broker-to-partition mapping and the StaticHosts implementation when broker-to-partition mapping is static. The constructor for StaticHosts requires an instance of GlobalPartitionInformation:

Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig Class

Instantiate an instance of KafkaConfig with one of the following constructors, each of which requires an implementation of the BrokerHosts interface:

public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

Table 5.1. KafkaConfig Parameters

KafkaConfig Parameter



Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.


Name of the Kafka topic.


Optional parameter used as part of the ZooKeeper path where the spout's current offset is stored.

Both SpoutConfig from the core-storm API and TridentKafkaConfig from the Trident API extend KafkaConfig. Instantiate these classes with the following constructors:

Core-Storm API

Constructor public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)

Table 5.2. SpoutConfig Parameters

SpoutConfig Parameter



Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.


Name of the Kafka topic.


Root directory in ZooKeeper where all topics and partition information is stored. By default, this is /brokers.


Unique identifier for this spout.

Trident API

Constructors public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)

Table 5.3. TridentKafkaConfig Parameters




Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.


Name of the Kafka topic.


Unique identifier for this spout.

KafkaConfig contains several fields used to configure the behavior of a Kafka spout in a Storm topology:

Table 5.4. KafkaConfig Fields

KafkaConfig Field



Specifies the number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB.


Specifies the number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds.


Specifies the buffer size in bytes for network requests. The default is 1MB.


The interface that specifies how a byte[] from a Kafka topic is transformed into a Storm tuple. The default, MultiScheme, returns a tuple with the byte[] and no additional processing. The API provides the following implementations: * storm.kafka.StringScheme * storm.kafka.KeyValueSchemeAsMultiScheme * storm.kafka.StringKeyValueScheme * storm.kafka.KeyValueSchemeAsMultiScheme


To force the spout to ignore any consumer state information stored in ZooKeeper, set ignoreZkOffsets to true. If true, the spout always begins reading from the offset defined by startOffsetTime. For more information, see "How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures."


Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: * kafka.api.OffsetRequest.EarliestTime() - starts streaming from the beginning of the topic * kafka.api.OffsetRequest.LatestTime() - streams only new messages


Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less then maxOffsetBehind, the spout stops retrying the tuple. The default is LONG.MAX_VALUE.


Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true.


Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds.