Apache Storm Component Guide
Also available as:
loading table of contents...

Implementing Windowing in Core Storm

Bolts that need windowing support can implement the bolt interface IWindowedBolt:

public interface IWindowedBolt extends IComponent {
   void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
     * Process tuples falling within the window and optionally emit 
     * new tuples based on the tuples in the input window.
   void execute(TupleWindow inputWindow);
   void cleanup(); 

Every time the window slides (the sliding interval elapses), the execute method is invoked.

The TupleWindow parameter gives access to current tuples in the window, tuples that expired, and new tuples that were added since the last window was computed. This information can be used to optimize the efficiency of windowing computations.

Bolts that need windowing support would typically extend BaseWindowedBolt, which has APIs for specifying window length and sliding intervals. For example:

public class SlidingWindowBolt extends BaseWindowedBolt {
   private OutputCollector collector;
   public void prepare(Map stormConf, TopologyContext context, OutputCollector collector){
       this.collector = collector;
   public void execute(TupleWindow inputWindow) {
     for(Tuple tuple: inputWindow.get()) {
       // do the windowing computation
     collector.emit(new Values(computedValue));

The BaseWindowedBolt has APIs to define type of window, window length, and sliding interval. You can specify window length and sliding interval as a count of the number of tuples, a duration of time, or both count and duration. The following window configuration settings are supported:

 * Tuple count based sliding window that slides after slidingInterval number of tuples 
withWindow(Count windowLength, Count slidingInterval)

 * Tuple count based window that slides with every incoming tuple 
withWindow(Count windowLength)

 * Tuple count based sliding window that slides after slidingInterval time duration 
withWindow(Count windowLength, Duration slidingInterval)

 * Time duration based sliding window that slides after slidingInterval time duration 
withWindow(Duration windowLength, Duration slidingInterval)

 * Time duration based window that slides with every incoming tuple 
withWindow(Duration windowLength)

 * Time duration based sliding window that slides after slidingInterval number of tuples 
withWindow(Duration windowLength, Count slidingInterval)

 * Count based tumbling window that tumbles after the specified count of tuples 
withTumblingWindow(BaseWindowedBolt.Count count)

 * Time duration based tumbling window that tumbles after the specified time duration 
withTumblingWindow(BaseWindowedBolt.Duration duration)   

To add windowed bolts to the topology, use the TopologyBuilder (as you would with non-windowed bolts). For example:

TopologyBuilder builder = new TopologyBuilder();
 * A windowed bolt that computes sum over a sliding window with window length of
 * 30 events that slides after every 10 events.
builder.setBolt("sum", new WindowSumBolt().withWindow(Count.of(30), Count.of(10)), 1)

For a sample topology that shows how to use the APIs to compute a sliding window sum and a tumbling window average, see the SlidingWindowTopology.java file in the storm-starter GitHub directory.

For examples of tumbling and sliding windows, see the Apache document Windowing Support in Core Storm.

The following subsections describe additional aspects of windowing calculations: timestamps, watermarks, guarantees, and state management.