Using Apache Storm
Also available as:
PDF

Storm-Kafka API Reference

Javadoc for the storm-kafka component is installed at <$STORM_HOME>/contrib/storm-kafka/storm-kafka-0.9.3.2.2.6.0-<buildnumber>-javadoc.jar. This section provides additional reference documentation for the primary classes and interfaces of the storm-kafka component.

BrokerHosts Interface

The storm-kafka component provides two implementations of the BrokerHosts interface: ZkHosts and StaticHosts. Use the ZkHosts implementation to dynamically track broker-to-partition mapping and the StaticHosts implementation when broker-to-partition mapping is static. The constructor for StaticHosts requires an instance of GlobalPartitionInformation:

Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig Class

Instantiate an instance of KafkaConfig with one of the following constructors, each of which requires an implementation of the BrokerHosts interface:

public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)

Table 5.1. KafkaConfig Parameters

KafkaConfig Parameter

Description

hosts

Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

topic

Name of the Kafka topic.

clientId

Optional parameter used as part of the ZooKeeper path where the spout's current offset is stored.


Both SpoutConfig from the core-storm API and TridentKafkaConfig from the Trident API extend KafkaConfig. Instantiate these classes with the following constructors:

Core-Storm API

Constructor public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)

Table 5.2. SpoutConfig Parameters

SpoutConfig Parameter

Description

hosts

Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

topic

Name of the Kafka topic.

zkroot

Root directory in ZooKeeper where all topics and partition information is stored. By default, this is /brokers.

id

Unique identifier for this spout.


Trident API

Constructors public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)

Table 5.3. TridentKafkaConfig Parameters

TridentKafkaConfig

Description

hosts

Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

topic

Name of the Kafka topic.

clientid

Unique identifier for this spout.


KafkaConfig contains several fields used to configure the behavior of a Kafka spout in a Storm topology:

Table 5.4. KafkaConfig Fields

KafkaConfig Field

Description

fetchSizeBytes

Specifies the number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB.

socketTimeoutMs

Specifies the number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds.

bufferSizeBytes

Specifies the buffer size in bytes for network requests. The default is 1MB.

scheme

The interface that specifies how a byte[] from a Kafka topic is transformed into a Storm tuple. The default, MultiScheme, returns a tuple with the byte[] and no additional processing. The API provides the following implementations: * storm.kafka.StringScheme * storm.kafka.KeyValueSchemeAsMultiScheme * storm.kafka.StringKeyValueScheme * storm.kafka.KeyValueSchemeAsMultiScheme

ignoreZKOffsets

To force the spout to ignore any consumer state information stored in ZooKeeper, set ignoreZkOffsets to true. If true, the spout always begins reading from the offset defined by startOffsetTime. For more information, see "How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures."

startOffsetTime

Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: * kafka.api.OffsetRequest.EarliestTime() - starts streaming from the beginning of the topic * kafka.api.OffsetRequest.LatestTime() - streams only new messages

maxOffsetBehind

Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less then maxOffsetBehind, the spout stops retrying the tuple. The default is LONG.MAX_VALUE.

useStartOffsetTimeOfOffsetOutOfRange

Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true.

metricsTimeBucketSizeInSecs

Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds.