Using Apache Storm to Move Data
Also available as:
PDF

KafkaSpout Integration: Trident APIs

The Trident API represents a Kafka spout with the OpaqueTridentKafkaSpout class.

To initialize OpaqueTridentKafkaSpout, define a TridentKafkaConfig subclass instance of the KafkaConfig class, representing configuration information needed to ingest data from a Kafka cluster.

KafkaConfig Class and TridentKafkaConfig Subclass

Both the core-storm and Trident APIs use KafkaConfig, which contains several parameters and fields used to configure the behavior of a Kafka spout in a Storm topology.

Instantiate a TridentKafkaConfig subclass instance of the KafkaConfig class. Use one of the following constructors, each of which requires an implementation of the BrokerHosts interface.

public TridentKafkaConfig(BrokerHosts hosts, String topic) 
public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)

TridentKafkaConfig Parameters

hosts

One or more hosts that are Kafka ZooKeeper broker nodes (see "BrokerHosts Interface").

topic

Name of the Kafka topic.

clientid

Unique identifier for this spout.

Example

The following example illustrates the use of the OpaqueTridentKafkaSpout class and related interfaces:

TridentTopology topology = new TridentTopology();
BrokerHosts zk = new ZkHosts("localhost");
TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic");
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
Important
Important

In Apache Storm versions prior to 1.0, MultiScheme methods accepted a byte[] parameter instead of a ByteBuffer. In Storm version 1.0, MultiScheme and related scheme APIs changed; they now accept a ByteBuffer instead of a byte[].

As a result, Kafka spouts built with Storm versions earlier than 1.0 do not work with Storm versions 1.0 and later. When running topologies with Storm version 1.0 and later, ensure that your version of storm-kafka is at least 1.0. Rebuild pre-1.0 shaded topology .jar files that bundle storm-kafka classes with storm-kafka version 1.0 before running them in clusters with Storm 1.0 and later.