Implementing Windowing in Core Storm
Bolts that need windowing support can implement the bolt interface
IWindowedBolt
:
public interface IWindowedBolt extends IComponent { void prepare(Map stormConf, TopologyContext context, OutputCollector collector); /** * Process tuples falling within the window and optionally emit * new tuples based on the tuples in the input window. */ void execute(TupleWindow inputWindow); void cleanup(); }
Every time the window slides (the sliding interval elapses), the execute
method is invoked.
The TupleWindow
parameter gives access to current tuples in the window,
tuples that expired, and new tuples that were added since the last window was
computed. This information can be used to optimize the efficiency of windowing
computations.
Bolts that need windowing support would typically extend
BaseWindowedBolt
, which has APIs for specifying window length and sliding
intervals. For example:
public class SlidingWindowBolt extends BaseWindowedBolt { private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){ this.collector = collector; } @Override public void execute(TupleWindow inputWindow) { for(Tuple tuple: inputWindow.get()) { // do the windowing computation ... } collector.emit(new Values(computedValue)); } }
The BaseWindowedBolt
has APIs to define type of window, window length,
and sliding interval. You can specify window length and sliding interval as a count of
the number of tuples, a duration of time, or both count and duration. The following
window configuration settings are supported:
/* * Tuple count based sliding window that slides after slidingInterval number of tuples */ withWindow(Count windowLength, Count slidingInterval) /* * Tuple count based window that slides with every incoming tuple */ withWindow(Count windowLength) /* * Tuple count based sliding window that slides after slidingInterval time duration */ withWindow(Count windowLength, Duration slidingInterval) /* * Time duration based sliding window that slides after slidingInterval time duration */ withWindow(Duration windowLength, Duration slidingInterval) /* * Time duration based window that slides with every incoming tuple */ withWindow(Duration windowLength) /* * Time duration based sliding window that slides after slidingInterval number of tuples */ withWindow(Duration windowLength, Count slidingInterval) /* * Count based tumbling window that tumbles after the specified count of tuples */ withTumblingWindow(BaseWindowedBolt.Count count) /* * Time duration based tumbling window that tumbles after the specified time duration */ withTumblingWindow(BaseWindowedBolt.Duration duration)
To add windowed bolts to the topology, use the TopologyBuilder
(as you
would with non-windowed bolts). For example:
TopologyBuilder builder = new TopologyBuilder(); /* * A windowed bolt that computes sum over a sliding window with window length of * 30 events that slides after every 10 events. */ builder.setBolt("sum", new WindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1) .shuffleGrouping("spout");
For a sample topology that shows how to use the APIs to compute a sliding window sum
and a tumbling window average, see the
SlidingWindowTopology.java
file in the
storm-starter
GitHub directory.
For examples of tumbling and sliding windows, see the Apache document Windowing Support in Core Storm.
The following subsections describe additional aspects of windowing calculations: timestamps, watermarks, guarantees, and state management.