KafkaSpout Integration: Core Storm APIs
The core-storm API represents a Kafka spout with the KafkaSpout
class.
To initialize KafkaSpout
, define a SpoutConfig
subclass instance of the KafkaConfig
class, representing
configuration information needed to ingest data from a Kafka cluster.
KafkaSpout
requires an instance of the BrokerHosts
interface.
BrokerHosts
Interface
The BrokerHost
interface maps Kafka brokers to topic partitions.
Constructors for KafkaSpout
(and, for the Trident API,
TridentKafkaConfig
) require an implementation of the
BrokerHosts
interface.
The storm-kafka
component provides two implementations of
BrokerHosts
, ZkHosts
and
StaticHosts
:
Use
ZkHosts
if you want to track broker-to-partition mapping dynamically.This class uses Kafka's ZooKeeper entries to track mapping.You can instantiate an object as follows:
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
where:
brokerZkStr
is theIP:port
address for the ZooKeeper host; for example,localhost:2181
.brokerZkPath
is the root directory under which topics and partition information are stored. By default this is /brokers
, which is the default used by Kafka.
By default, broker-partition mapping refreshes every 60 seconds. If you want to change the refresh frequency, set
host.refreshFreqSecs
to your chosen value.Use
StaticHosts
for static broker-to-partition mapping. To construct an instance of this class, you must first construct an instance ofGlobalPartitionInformation
; for example:Broker brokerForPartition0 = new Broker("localhost");//localhost:9092 Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string. GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation(); partitionInfo.add(0, brokerForPartition0)//mapping form partition 0 to brokerForPartition0 partitionInfo.add(1, brokerForPartition1)//mapping form partition 1 to brokerForPartition1 partitionInfo.add(2, brokerForPartition2)//mapping form partition 2 to brokerForPartition2 StaticHosts hosts = new StaticHosts(partitionInfo);
KafkaConfig
Class and SpoutConfig
Subclass
Next, define a SpoutConfig
subclass instance of the
KafkaConfig
class.
KafkaConfig
contains several fields used to configure the
behavior of a Kafka spout in a Storm topology; Spoutconfig
extends
KafkaConfig
, supporting additional fields for ZooKeeper connection
info and for controlling behavior specific to KafkaSpout
.
KafkaConfig
implements the following constructors, each of
which requires an implementation of the BrokerHosts
interface:
public KafkaConfig(BrokerHosts hosts, String topic) public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
KafkaConfig
Parameters
hosts
One or more hosts that are Kafka ZooKeeper broker nodes (see "
BrokerHosts
Interface").topic
Name of the Kafka topic that KafkaSpout will consume from.
clientId
Optional parameter used as part of the ZooKeeper path, specifying where the spout's current offset is stored.
KafkaConfig
Fields
fetchSizeBytes
Number of bytes to attempt to fetch in one request to a Kafka server. The default is 1MB.
socketTimeoutMs
Number of milliseconds to wait before a socket fails an operation with a timeout. The default value is 10 seconds.
bufferSizeBytes
Buffer size (in bytes) for network requests. The default is 1MB.
scheme
The interface that specifies how a
ByteBuffer
from a Kafka topic is transformed into a Storm tuple.The default,
MultiScheme
, returns a tuple and no additional processing.The API provides many implementations of the
Scheme
class, including:storm.kafka.StringScheme
storm.kafka.KeyValueSchemeAsMultiScheme
storm.kafka.StringKeyValueScheme
storm.kafka.KeyValueSchemeAsMultiScheme
Important In Apache Storm versions prior to 1.0,
MultiScheme
methods accepted abyte[]
parameter instead of aByteBuffer
. In Storm version 1.0,MultiScheme
and related scheme APIs changed; they now accept aByteBuffer
instead of abyte[]
.As a result, Kafka spouts built with Storm versions earlier than 1.0 do not work with Storm versions 1.0 and later. When running topologies with Storm version 1.0 and later, ensure that your version of
storm-kafka
is at least 1.0. Rebuild pre-1.0 shaded topology .jar files that bundlestorm-kafka
classes withstorm-kafka
version 1.0 before running them in clusters with Storm 1.0 and later.ignoreZKOffsets
To force the spout to ignore any consumer state information stored in ZooKeeper, set
ignoreZkOffsets
totrue
. Iftrue
, the spout always begins reading from the offset defined bystartOffsetTime
.startOffsetTime
Controls whether streaming for a topic starts from the beginning of the topic or whether only new messages are streamed. The following are valid values:
kafka.api.OffsetRequest.EarliestTime()
starts streaming from the beginning of the topickafka.api.OffsetRequest.LatestTime()
streams only new messages
maxOffsetBehind
Specifies how long a spout attempts to retry the processing of a failed tuple. If a failing tuple's offset is less than
maxOffsetBehind
, the spout stops retrying the tuple. The default isLONG.MAX_VALUE
.useStartOffsetTimeOfOffsetOutOfRange
Controls whether a spout streams messages from the beginning of a topic when the spout throws an exception for an out-of-range offset. The default value is true.
metricsTimeBucketSizeInSecs
Controls the time interval at which Storm reports spout-related metrics. The default is 60 seconds.
Instantiate SpoutConfig
as follows:
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String nodeId)
SpoutConfig
Parameters
hosts
One or more hosts that are Kafka ZooKeeper broker nodes (see "
BrokerHosts
Interface").topic
Name of the Kafka topic that KafkaSpout will consume from.
zkroot
Root directory in ZooKeeper under which KafkaSpout consumer offsets are stored. The default is
/brokers
.nodeId
ZooKeeper node under which KafkaSpout stores offsets for each topic-partition. The node ID must be unique for each Topology. The topology uses this path to recover in failure scenarios, or when there is maintenance that requires killing the topology.
zkroot
and nodeId
are used to construct the ZooKeeper
path where Storm stores the Kafka offset. You can find offsets at
zkroot+"/"+nodeId
.
To start processing messages from where the last operation left off, use the same
zkroot
and nodeId
. To start from the beginning of the
Kafka topic, set KafkaConfig.ignoreZKOffsets
to
true
.
Example
The following example illustrates the use of the KafkaSpout
class and related interfaces:
BrokerHosts hosts = new ZkHosts(zkConnString); SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + zkrootDir, node); spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);