1.2. Bolts

All bolts must implement the IRichBolt interface. BaseRichBolt is the most basic implementation, but there are several others, including BatchBoltExecutor, ClojureBolt, and JoinResult. The following example, TotalRankingsBolt.java, is included with storm-starter and installed with Storm at /usr/lib/storm/contrib/storm-starter.

package storm.starter.bolt;

import backtype.storm.tuple.Tuple;
import org.apache.log4j.Logger;
import storm.starter.tools.Rankings;

/**
 * This bolt merges incoming {@link Rankings}.
 * <p/>
 * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,
 * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.
 */
public final class TotalRankingsBolt extends AbstractRankerBolt {

 private static final long serialVersionUID = -8447525895532302198L;
 private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);

 public TotalRankingsBolt() {
 super();
 }

 public TotalRankingsBolt(int topN) {
 super(topN);
 }

 public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
 super(topN, emitFrequencyInSeconds);
 }

 @Override
 void updateRankingsWithTuple(Tuple tuple) {
 Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
 super.getRankings().updateWith(rankingsToBeMerged);
 super.getRankings().pruneZeroCounts();
 }

 @Override
 Logger getLogger() {
 return LOG;
 }

}