1.1. Spouts

All spouts must implement the backtype.storm.topology.IRichSpout interface from the core-storm API. BaseRichSpout is the most basic implementation, but there are several others, including ClojureSpout, DRPCSpout, and FeederSpout. In addition, Hortonworks provides a Kafka spout to ingest data from a Kafka cluster. The following example, RandomSentenceSpout, is included with the storm-starter connector installed with Storm at /usr/lib/storm/contrib/storm-starter.

package storm.starter.spout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

import java.util.Map;
import java.util.Random;

public class RandomSentenceSpout extends BaseRichSpout {
 SpoutOutputCollector _collector;
 Random _rand;


 @Override
 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
 _collector = collector;
 _rand = new Random();
 }

 @Override
 public void nextTuple() {
 Utils.sleep(100);
 String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
 String sentence = sentences[_rand.nextInt(sentences.length)];
 _collector.emit(new Values(sentence));
 }

 @Override
 public void ack(Object id) {
 }

 @Override
 public void fail(Object id) {
 }

 @Override
 public void declareOutputFields(OutputFieldsDeclarer declarer) {
 declarer.declare(new Fields("word"));
 }

}