Introductory Example: Trident Word Count
The following code sample illustrates how to implement a simple word count program using the Trident API:
TridentTopology topology = new TridentTopology(); Stream wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"), new Split(), new Fields("word")) .parallelismHint(16) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")) .newValuesStream() .parallelismHint(16);
Here is detailed information about lines of code in the example:
The first line creates the
TridentTopology
object that will be used to define the topology:TridentTopology topology = new TridentTopology();
The second line creates a
Stream
object from a spout; it will be used to define subsequent operations to be performed on the stream of data:Stream wordCounts = topology.newStream("spout1", spout)
The third line uses the
Stream.each()
method to apply the Split function on the “sentence” field, and specifies that the resulting output contains a new field named “word”:.each(new Fields("sentence"), new Split(), new Fields("word"))
The
Split
class is a simple Trident function that takes the first field of a tuple, tokenizes it on the space character, and emits resulting tokens:public class Split extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { String sentence = tuple.getString(0); for (String word : sentence.split(" ")) { collector.emit(new Values(word)); } } }
The next two lines set the parallelism of the
Split
function and apply agroupBy()
operation to ensure that all tuples with the same “word” value are grouped together in subsequent operations.Calling
parallelismHint()
before a partitioning operation applies the specified parallelism value on the resulting bolt:.parallelismHint(16)
The
groupBy()
operation is a partitioning operation; it forms the boundary between separate bolts in the resulting topology:.groupBy(new Fields("word"))
The
groupBy()
operation results in batches of tuples being repartitioned by the value of the “word” field.For more information about stream operations that support partitioning, see the Stream JavaDoc.
The remaining lines of code aggregate the running count for individual words, update a persistent state store, and emit the current count for each word.
The
persistentAggregate()
method applies a Trident Aggregator to a stream, updates a persistent state store with the result of the aggregation, and emits the result:.
persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
The sample code uses an in-memory state store (
MemoryMapState
); Storm comes with a number of state implementations for databases such as HBase.The
Count
class is a TridentCombinerAggregator
implementation that sums all values in a batch partition of tuples:public class Count implements CombinerAggregator<Long> { public Long init(TridentTuple tuple) { return 1L; } public Long combine(Long val1, Long val2) { return val1 + val2; } public Long zero() { return 0L; } }
When applying the aggregator, Storm passes grouped partitions to the aggregator, calling
init()
for each tuple. It callscombine()
repeatedly to process all tuples in the partition. When finished, the last value returned bycombine()
is used. If the partition is empty, the value ofzero()
is used.The call to
newValuesStream()
tells Storm to emit the result of the persistent aggregation. This consists of a stream of individual word counts. The resulting stream can be reused in other parts of a topology.