Aggregator
The Aggregator
interface represents the most general form of
aggregation operations:
public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T val, TridentTuple tuple, TridentCollector collector); void complete(T val, TridentCollector collector); }
A key difference between Aggregator
and other Trident aggregation
interfaces is that an instance of TridentCollector
is passed as a
parameter to every method. This allows Aggregator implementations to emit tuples
at any time during execution.
Storm executes Aggregator
instances as follows:
- Storm calls the
init()
method, which returns an objectT
representing the initial state of the aggregation.T
is also passed to theaggregate()
andcomplete()
methods. - Storm calls the
aggregate()
method repeatedly, to process each tuple in the batch. - Storm calls
complete()
with the final value of the aggregation.
The word count example uses the built-in Count
class that
implements the CombinerAggregator
interface. The Count
class could also be implemented as an Aggregator:
public class Count extends BaseAggregator<CountState> { static class CountState { long count = 0; } public CountState init(Object batchId, TridentCollector collector) { return new CountState(); } public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) { state.count+=1; } public void complete(CountState state, TridentCollector collector) { collector.emit(new Values(state.count)); } }