Storm-Kafka API Reference
Javadoc for the storm-kafka
component is installed at
<$STORM_HOME>/contrib/storm-kafka/storm-kafka-0.9.3.2.2.6.0-<buildnumber>-javadoc.jar
.
This section provides additional reference documentation for the primary classes and
interfaces of the storm-kafka
component.
BrokerHosts Interface
The storm-kafka
component provides two implementations of the
BrokerHosts
interface: ZkHosts
and
StaticHosts
. Use the ZkHosts
implementation to dynamically track broker-to-partition mapping and the
StaticHosts
implementation when broker-to-partition mapping
is static. The constructor for StaticHosts
requires an instance
of GlobalPartitionInformation
:
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0 partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1 partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);
KafkaConfig Class
Instantiate an instance of KafkaConfig
with one of the following constructors, each of which requires an implementation of the BrokerHosts
interface:
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
Table 5.1. KafkaConfig Parameters
KafkaConfig Parameter | Description |
---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Optional parameter used as part of the ZooKeeper path where the spout's current offset is stored. |
Both SpoutConfig
from the core-storm API and TridentKafkaConfig
from the Trident API extend KafkaConfig
. Instantiate these classes with the following constructors:
Core-Storm API
Constructor public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)
Table 5.2. SpoutConfig Parameters
SpoutConfig Parameter | Description |
---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Root directory in ZooKeeper where all topics and partition information is stored. By default, this is |
| Unique identifier for this spout. |
Trident API
Constructors public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)
Table 5.3. TridentKafkaConfig Parameters
TridentKafkaConfig | Description |
---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Unique identifier for this spout. |
KafkaConfig
contains several fields used to configure the behavior of a Kafka spout in a Storm topology:
Table 5.4. KafkaConfig Fields
KafkaConfig Field | Description |
---|---|
| Specifies the number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB. |
| Specifies the number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds. |
| Specifies the buffer size in bytes for network requests. The default is 1MB. |
| The interface that specifies how a byte[] from a Kafka topic is transformed into a Storm tuple. The default, |
| To force the spout to ignore any consumer state
information stored in ZooKeeper, set
|
| Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: * |
| Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing
tuple's offset is less then |
| Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true. |
| Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds. |