KafkaSpout Integration: Trident APIs
The Trident API represents a Kafka spout with the
OpaqueTridentKafkaSpout
class.
To initialize OpaqueTridentKafkaSpout
, define a
TridentKafkaConfig
subclass instance of the
KafkaConfig
class, representing configuration information
needed to ingest data from a Kafka cluster.
KafkaConfig
Class and
TridentKafkaConfig
Subclass
Both the core-storm and Trident APIs use KafkaConfig
, which
contains several parameters and fields used to configure the behavior of a Kafka
spout in a Storm topology. For more information, see "KafkaConfig
Class" in KafkaSpout Configuration Settings: Core Storm API.
Instantiate a TridentKafkaConfig
subclass instance of the
KafkaConfig
class. Use one of the following constructors,
each of which requires an implementation of the BrokerHosts
interface. For more information about BrokerHosts
, see
"BrokerHosts
Interface" in KafkaSpout Configuration Settings: Core Storm APIs.
public TridentKafkaConfig(BrokerHosts hosts, String topic) public TridentKafkaConfig(BrokerHosts hosts, String topic, String id)
TridentKafkaConfig
Parameters
hosts
One or more hosts that are Kafka ZooKeeper broker nodes (see "
BrokerHosts
Interface").topic
Name of the Kafka topic.
clientid
Unique identifier for this spout.
Example
The following example illustrates the use of the
OpaqueTridentKafkaSpout
class and related interfaces:
TridentTopology topology = new TridentTopology(); BrokerHosts zk = new ZkHosts("localhost"); TridentKafkaConfig spoutConf = new TridentKafkaConfig(zk, "test-topic"); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
Important | |
---|---|
In Apache Storm versions prior to 1.0, As a result, Kafka spouts built with Storm versions earlier than
1.0 do not work with Storm versions 1.0 and later. When running
topologies with Storm version 1.0 and later, ensure that your
version of |