Apache Storm Component Guide
Also available as:
PDF
loading table of contents...

Tuple Timestamps and Out of Order Tuples

Window calculations are performed based on the processing timestamp by default. The timestamp tracked in each window is the time when the tuple is processed by the bolt.

Storm can also track windows by source-generated timestamp. This can be useful for processing events based on the time when an event occurs, such as log entries with timestamps.

The following example specifies a source-generated timestamp field. The value for fieldName is retrieved from the incoming tuple, and then considered for use in windowing calculations.

When this option is specified, all tuples are expected to contain the timestamp field. If the timestamp field is not present in the tuple, an exception will be thrown and the topology will terminate. To resolve this issue, remove the erroneous tuple manually from the source (such as Kafka) and restart the topology.

/**
 * Specify the tuple field that represents the timestamp as a long value. If this field
 * is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
 *
 * @param fieldName the name of the field that contains the timestamp
 */
public BaseWindowedBolt withTimestampField(String fieldName)  

In addition to timestamp field, you can specify a time lag parameter that indicates the maximum time limit for tuples with out-of-order timestamps:

/**
 * Specify the maximum time lag of the tuple timestamp in millis. The tuple timestamps
 * cannot be out of order by more than this amount.
 *
 * @param duration the max lag duration
 */
public BaseWindowedBolt withLag(Duration duration)

For example, if the lag is 5 seconds and tuple t1 arrives with timestamp 06:00:05, no tuples can arrive with tuple timestamp earlier than 06:00:00. If a tuple arrives with timestamp 05:59:59 after t1 and the window has moved past t1, the tuple will be treated as a late tuple and will not be processed; late tuples are ignored and are logged in the worker log files at the INFO level.