Spouts
All spouts must implement the org.apache.storm.topology.IRichSpout
interface from the core-storm API. BaseRichSpout
is the most basic
implementation, but there are several others, including ClojureSpout
,
DRPCSpout
, and FeederSpout
. In addition, Hortonworks
provides a Kafka spout to ingest data from a Kafka cluster.
The following example, RandomSentenceSpout
, is included with the
storm-starter
connector installed with Storm at
/usr/lib/storm/contrib/storm-starter
.
package storm.starter.spout; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.apache.storm.utils.Utils; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public void nextTuple() { Utils.sleep(100); String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; String sentence = sentences[_rand.nextInt(sentences.length)]; _collector.emit(new Values(sentence)); } @Override public void ack(Object id) { } @Override public void fail(Object id) { } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }