6. Ingesting Data with the Apache Kafka Spout

Apache Kafka is a high-throughput, distributed messaging system. Apache Storm provides a Kafka spout to facilitate ingesting data from Kafka 0.8x brokers. Storm developers should include downstream bolts in their topologies to process data ingested with the Kafka spout. The storm-kafka components include a core-storm spout, as well as a fully transactional Trident spout. The storm-kafka spouts provide the following key features:

  • 'Exactly once' tuple processing with the Trident API

  • Dynamic discovery of Kafka brokers and partitions

Hortonworks recommends that Storm developers use the Trident API. However, use the core-storm API if sub-second latency is critical for your Storm topology.

The core-storm API represents a Kafka spout with the KafkaSpout class, and the Trident API provides a OpaqueTridentKafkaSpout class to represent the spout. To initialize KafkaSpout and OpaqueTridentKafkaSpout, Storm developers need an instance of a subclass of the KafkaConfig class, which represents configuration information needed to ingest data from a Kafka cluster. The KafkaSpout constructor requires the SpoutConfig subclass, and the OpaqueTridentKafkaSpout requires the TridentKafkaConfig subclass. In turn, the constructors for both KafkaSpout and OpaqueTridentKafkaSpout require an implementation of the BrokerHosts interface, which is used to map Kafka brokers to topic partitions. The storm-kafka component provides two implementations of BrokerHosts: ZkHosts and StaticHosts. Use the ZkHosts implementation to dynamically track broker-to-partition mapping and the StaticHosts implementation when broker-to-partition mapping is static.

The following code samples demonstrate the use of these classes and interfaces.

Core-storm API

BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts,topicName,"/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

Trident API

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); 
zookeeper.connect=host1:2181,host2:2181,host3:2181

Storm-Kafka API Reference

Javadoc for the storm-kafka component is installed at <$STORM_HOME>/contrib/storm-kafka/storm-kafka-0.9.3.2.2.6.0-<buildnumber>-javadoc.jar. This section provides additional reference documentation for the primary classes and interfaces of the storm-kafka component.

BrokerHosts Interface

The storm-kafka component provides two implementations of the BrokerHosts interface: ZkHosts and StaticHosts. Use the ZkHosts implementation to dynamically track broker-to-partition mapping and the StaticHosts implementation when broker-to-partition mapping is static. The constructor for StaticHosts requires an instance of GlobalPartitionInformation:

Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0
partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1
partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);

KafkaConfig Class

Instantiate an instance of KafkaConfig with one of the following constructors, each of which requires an implementation of the BrokerHosts interface:

public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
 

Table 1.8. KafkaConfig Parameters

KafkaConfig Parameter

Description

hosts

Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

topic

Name of the Kafka topic.

clientId

Optional parameter used as part of the ZooKeeper path where the spout's current offset is stored.


Both SpoutConfig from the core-storm API and TridentKafkaConfig from the Trident API extend KafkaConfig. Instantiate these classes with the following constructors:

Core-Storm API

Constructor public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)

 

Table 1.9. SpoutConfig Parameters

SpoutConfig Parameter

Description

hosts

Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

topic

Name of the Kafka topic.

zkroot

Root directory in Zookeeper where all topics and partition information is stored. By default, this is /brokers.

id

Unique identifier for this spout.


Trident API

Constructors public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)

 

Table 1.10. TridentKafkaConfig Parameters

TridentKafkaConfig

Description

hosts

Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

topic

Name of the Kafka topic.

clientid

Unique identifier for this spout.


KafkaConfig contains several fields used to configure the behavior of a Kafka spout in a Storm topology:

 

Table 1.11. KafkaConfig Fields

KafkaConfig Field

Description

fetchSizeBytes

Specifies the number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB.

socketTimeoutMs

Specifies the number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds.

bufferSizeBytes

Specifies the buffer size in bytes for network requests. The default is 1MB.

scheme

The interface that specifies how a byte[] from a Kafka topic is transformed into a Storm tuple. The default, MultiScheme, returns a tuple with the byte[] and no additional processing. The API provides the following implementations: * storm.kafka.StringScheme * storm.kafka.KeyValueSchemeAsMultiScheme * storm.kafka.StringKeyValueScheme * storm.kafka.KeyValueSchemeAsMultiScheme

forceFromStart

Controls whether a Kafka spout fetches data from the beginning of a Kafka topic. The default is false.

startOffsetTime

Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values: * kafka.api.OffsetRequest.EarliestTime() - starts streaming from the beginning of the topic * kafka.api.OffsetRequest.LatestTime() - streams only new messages

maxOffsetBehind

Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less then maxOffsetBehind, the spout stops retrying the tuple. The default is LONG.MAX_VALUE.

useStartOffsetTimeOfOffsetOutOfRange

Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true.

metricsTimeBucketSizeInSecs

Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds.


Limitations

The current version of the Kafka spout contains the following limitations:

  • Does not support Kafka 0.7x brokers.

  • Storm developers must include ${STORM_HOME}/lib/* in the CLASSPATH environment variable from the command line when running kafka-topology in local mode. Otherwise, developers will likely receive a java.lang.NoClassDefFoundError exception:

    java -cp "/usr/lib/storm/contrib/storm-kafka-example-0.9.1.2.1.1.0-320-jar-with-dependencies.jar:
     /usr/lib/storm/lib/*" org.apache.storm.kafka.TestKafkaTopology <zookeeper_host>
  • Secure Hadoop clusters must comment out the following statement from ${STORM_HOME}/bin/kafka-server-start.sh:

    EXTRA_ARGS="-name kafkaServer -loggc"
  • Core-storm API Constructor

    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id)
     

    Table 1.12. SpoutConfig Parameters

    SpoutConfig Parameter

    Description

    hosts

    Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

    topic

    Name of the Kafka topic.

    zkroot

    Root directory in Zookeeper where all topics and partition information is stored. By default, this is /brokers.

    id

    Unique identifier for this spout.


  • Trident API Constructors

    public TridentKafkaConfig(BrokerHosts hosts, String topic)
    public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)
     

    Table 1.13. TridentKafkaConfig Parameters

    TridentKafkaConfig

    Description

    hosts

    Any implementation of the BrokerHosts interface, currently either ZkHosts or StaticHosts.

    topic

    Name of the Kafka topic.

    clientid

    Unique identifier for this spout.


Kafka Cluster Configuration

The storm-kafka connector requires some configuration of the Apache Kafka installation. Kafka administrators must add a zookeeper.connect property with the hostnames and port numbers of the HDP Zookeeper nodes to Kafka's server.properties file.