Developing Apache Storm Applications
Also available as:
PDF

Introductory Example: Trident Word Count

The following code sample illustrates how to implement a simple word count program using the Trident API:

TridentTopology topology = new TridentTopology();
    Stream wordCounts = topology.newStream("spout1", spout)
            .each(new Fields("sentence"), new Split(), new Fields("word"))
            .parallelismHint(16)
            .groupBy(new Fields("word"))
            .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
            .newValuesStream()
            .parallelismHint(16);

Here is detailed information about lines of code in the example:

  • The first line creates the TridentTopology object that will be used to define the topology:

    TridentTopology topology = new TridentTopology();

  • The second line creates a Stream object from a spout; it will be used to define subsequent operations to be performed on the stream of data:

    Stream wordCounts = topology.newStream("spout1", spout)

  • The third line uses the Stream.each() method to apply the Split function on the “sentence” field, and specifies that the resulting output contains a new field named “word”:

    .each(new Fields("sentence"), new Split(), new Fields("word"))

    The Split class is a simple Trident function that takes the first field of a tuple, tokenizes it on the space character, and emits resulting tokens:

    public class Split extends BaseFunction {
    
        public void execute(TridentTuple tuple, TridentCollector collector) {
          String sentence = tuple.getString(0);
          for (String word : sentence.split(" ")) {
            collector.emit(new Values(word));
          }
        }
      }
  • The next two lines set the parallelism of the Split function and apply a groupBy() operation to ensure that all tuples with the same “word” value are grouped together in subsequent operations.

    Calling parallelismHint() before a partitioning operation applies the specified parallelism value on the resulting bolt:

    .parallelismHint(16)

    The groupBy() operation is a partitioning operation; it forms the boundary between separate bolts in the resulting topology:

    .groupBy(new Fields("word"))

    The groupBy() operation results in batches of tuples being repartitioned by the value of the “word” field.

    For more information about stream operations that support partitioning, see the Stream JavaDoc.

  • The remaining lines of code aggregate the running count for individual words, update a persistent state store, and emit the current count for each word.

    The persistentAggregate() method applies a Trident Aggregator to a stream, updates a persistent state store with the result of the aggregation, and emits the result:

    .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

    The sample code uses an in-memory state store (MemoryMapState); Storm comes with a number of state implementations for databases such as HBase.

    The Count class is a Trident CombinerAggregator implementation that sums all values in a batch partition of tuples:
      public class Count implements CombinerAggregator<Long> {
    
          public Long init(TridentTuple tuple) {
              return 1L;
          }
          public Long combine(Long val1, Long val2) {
              return val1 + val2;
          }
          public Long zero() {
              return 0L;
          }
      }

    When applying the aggregator, Storm passes grouped partitions to the aggregator, calling init() for each tuple. It calls combine() repeatedly to process all tuples in the partition. When finished, the last value returned by combine() is used. If the partition is empty, the value of zero() is used.

    The call to newValuesStream() tells Storm to emit the result of the persistent aggregation. This consists of a stream of individual word counts. The resulting stream can be reused in other parts of a topology.