Apache Kafka is a high-throughput, distributed messaging system. Apache Storm provides a Kafka spout to facilitate ingesting data from Kafka 0.8x brokers. Storm developers should include downstream bolts in their topologies to process data ingested with the Kafka spout. The storm-kafka
components include a core-storm spout, as well as a fully transactional Trident spout. The storm-kafka spouts provide the following key features:
'Exactly once' tuple processing with the Trident API
Dynamic discovery of Kafka brokers and partitions
Hortonworks recommends that Storm developers use the Trident API. However, use the core-storm API if sub-second latency is critical for your Storm topology.
The core-storm API represents a Kafka spout with the KafkaSpout
class, and the Trident API provides a OpaqueTridentKafkaSpout
class to represent the spout. To initialize KafkaSpout
and OpaqueTridentKafkaSpout
, Storm developers need an instance of a subclass of the KafkaConfig
class, which represents configuration information needed to ingest data from a Kafka cluster. The KafkaSpout
constructor requires the SpoutConfig
subclass, and the OpaqueTridentKafkaSpout
requires the TridentKafkaConfig
subclass. In turn, the constructors for both KafkaSpout
and OpaqueTridentKafkaSpout
require an implementation of the BrokerHosts
interface, which is used to map Kafka brokers to topic partitions. The storm-kafka component provides two implementations of BrokerHosts
: ZkHosts
and StaticHosts
. Use the ZkHosts
implementation to dynamically track broker-to-partition mapping and the StaticHosts
implementation when broker-to-partition mapping is static.
The following code samples demonstrate the use of these classes and interfaces.
Core-storm API
BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,"/" + topicName, UUID.randomUUID().toString()); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
Trident API
TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts("localhost"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
zookeeper.connect=host1:2181,host2:2181,host3:2181
Storm-Kafka API Reference
Javadoc for the storm-kafka
component is installed at <$STORM_HOME>/contrib/storm-kafka/storm-kafka-0.9.3.2.2.6.0-<buildnumber>-javadoc.jar
. This section provides additional reference documentation for the primary classes and interfaces of the storm-kafka
component.
BrokerHosts Interface
The storm-kafka
component provides two implementations of the BrokerHosts
interface: ZkHosts
and StaticHosts
. Use the ZkHosts
implementation to dynamically track broker-to-partition mapping and the StaticHosts
implementation when broker-to-partition mapping is static. The constructor for
StaticHosts
requires an instance of GlobalPartitionInformation
:
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0 partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1 partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);
KafkaConfig Class
Instantiate an instance of KafkaConfig
with one of the following constructors, each of which requires an implementation of the BrokerHosts
interface:
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
Table 1.8. KafkaConfig Parameters
KafkaConfig Parameter | Description |
---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Optional parameter used as part of the ZooKeeper path where the spout's current offset is stored. |
Both SpoutConfig
from the core-storm API and TridentKafkaConfig
from the Trident API extend KafkaConfig
. Instantiate these classes with the following constructors:
Core-Storm API
Constructor public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)
Table 1.9. SpoutConfig Parameters
SpoutConfig Parameter | Description |
---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Root directory in Zookeeper where all topics and partition information is stored. By default, this is |
| Unique identifier for this spout. |
Trident API
Constructors public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)
Table 1.10. TridentKafkaConfig Parameters
TridentKafkaConfig | Description |
---|---|
| Any implementation of the |
| Name of the Kafka topic. |
| Unique identifier for this spout. |
KafkaConfig
contains several fields used to configure the behavior of a Kafka spout in a Storm topology:
Table 1.11. KafkaConfig Fields
KafkaConfig Field | Description |
---|---|
| Specifies the number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB. |
| Specifies the number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds. |
| Specifies the buffer size in bytes for network requests. The default is 1MB. |
| The interface that specifies how a byte[] from a Kafka topic is transformed into a Storm tuple. The default, |
| Controls whether a Kafka spout fetches data from the beginning of a Kafka topic. The default is false. |
| Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: * |
| Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less then |
| Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true. |
| Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds. |
Limitations
The current version of the Kafka spout contains the following limitations:
Does not support Kafka 0.7x brokers.
Storm developers must include
${STORM_HOME}/lib/*
in the CLASSPATH environment variable from the command line when runningkafka-topology
in local mode. Otherwise, developers will likely receive ajava.lang.NoClassDefFoundError
exception:java -cp "/usr/lib/storm/contrib/storm-kafka-example-0.9.1.2.1.1.0-320-jar-with-dependencies.jar: /usr/lib/storm/lib/*" org.apache.storm.kafka.TestKafkaTopology <zookeeper_host>
Secure Hadoop clusters must comment out the following statement from
${STORM_HOME}/bin/kafka-server-start.sh
:EXTRA_ARGS="-name kafkaServer -loggc"
Core-storm API Constructor
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)
Table 1.12. SpoutConfig Parameters
SpoutConfig Parameter
Description
hosts
Any implementation of the
BrokerHosts
interface, currently eitherZkHosts
orStaticHosts
.topic
Name of the Kafka topic.
zkroot
Root directory in Zookeeper where all topics and partition information is stored. By default, this is
/brokers
.id
Unique identifier for this spout.
Trident API Constructors
public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)
Table 1.13. TridentKafkaConfig Parameters
TridentKafkaConfig
Description
hosts
Any implementation of the
BrokerHosts
interface, currently eitherZkHosts
orStaticHosts
.topic
Name of the Kafka topic.
clientid
Unique identifier for this spout.
Kafka Cluster Configuration
The storm-kafka connector requires some configuration of the Apache Kafka installation. Kafka administrators must add a zookeeper.connect
property with the hostnames and port numbers of the HDP Zookeeper nodes to Kafka's server.properties file.