Apache Storm Component Guide
Also available as:
PDF
loading table of contents...

Watermarks

When processing tuples using a timestamp field, Storm computes watermarks based on the timestamp of an incoming tuple. Each watermark is the minimum of the latest tuple timestamps (minus the lag) across all the input streams. At a higher level, this is similar to the watermark concept used by Flink and Google's MillWheel for tracking event-based timestamps.

Periodically (by default every second), Storm emits watermark timestamps. These timestamps are considered to be the clock tick for the window calculation if tuple-based timestamps are in use. The interval at which watermarks are emitted can be changed using the following API:

/**
 * Specify the watermark event generation interval. Watermark events
 * are used to track the progress of time
 *
 * @param interval the interval at which watermark events are generated
 */
public BaseWindowedBolt withWatermarkInterval(Duration interval)

When a watermark is received, all windows up to that timestamp will be evaluated.

For example, consider tuple timestamp based processing with the following window parameters:

  • Window length = 20 seconds, sliding interval = 10 seconds, watermark emit frequency = 1 second, max lag = 5 seconds

  • Current timestamp = 09:00:00

  • Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36) arrive between 9:00:00 and 9:00:01.

At time t = 09:00:01:

  1. Watermark w1 = 6:00:31 is emitted, because no tuples earlier than 6:00:31 can arrive.

  2. Three windows are evaluated. The first window ending timestamp (06:00:10) is computed by taking the earliest event timestamp (06:00:03) and computing the ceiling based on the sliding interval (10s):

    • 5:59:50 to 06:00:10 with tuples e1, e2, e3

    • 6:00:00 to 06:00:20 with tuples e1, e2, e3, e4

    • 6:00:10 to 06:00:30 with tuples e4, e5

  3. Tuple e6 is not evaluated, because watermark timestamp 6:00:31 is less than tuple timestamp 6:00:36.

  4. Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39) arrive between 9:00:01 and 9:00:02.

At time t = 09:00:02:

  1. Watermark w2 = 08:00:34 is emitted, because no tuples earlier than 8:00:34 can arrive.

  2. Three windows are evaluated:

    • 6:00:20 to 06:00:40 with tuples e5, e6 (from earlier batch)

    • 6:00:30 to 06:00:50 with tuple e6 (from earlier batch)

    • 8:00:10 to 08:00:30 with tuples e7, e8, e9

  3. Tuple e10 is not evaluated, because the tuple timestamp 8:00:39 is beyond the watermark time 8:00:34.

The window calculation considers the time gaps and computes the windows based on the tuple timestamp.