Managing time in SSB

Time attributes define how streams are processed in time. There are two high-level options for providing time attributes to your SQL Stream Builder queries.

Source-provided timestamps

Source-provided timestamps are injected directly into the data stream by the source connector. For example, when using a Kafka data source, the timestamp extracted from the Kafka message header is embedded in the data stream by default, and exposed through an eventTimestamp column. In the following query, the built-in eventTimestamp column is used to window the query based on the timestamp recorded in the source Kafka topic.
SELECT flight_number, flight_origin, flight_destination,
  TUMBLE_END(eventTimestamp, INTERVAL '5' MINUTE) AS window_end_timestamp
FROM airplane_flights
GROUP BY TUMBLE(eventTimestamp, INTERVAL '5' MINUTE)
The following query uses the source-provided timestamp from two virtual tables to perform a streaming join on multiple Kafka topics:
SELECT a.web_order_id, a.product_name, a.order_date
 b.next_shipment_time
FROM online_orders a, shipment_events b
WHERE a.shipping_type = 'Priority'
 AND a.eventTimestamp BETWEEN b.eventTimestamp - INTERVAL '1' HOUR AND b.eventTimestamp

User-provided timestamps

The user can also specify timestamps contained in the data stream itself.

Requirements:
  • The timestamp needs to be contained in a column of type long.
  • The timestamp needs to be in epoch format (for example, milliseconds since Jan 1, 1970).
If your schema includes a field called timestamp_ms, it is possible to construct a query as the following example:
SELECT *
FROM airplane_flights a
WHERE flight_number IS NOT NULL
GROUP BY HOP(a.timestamp_ms, INTERVAL '15' SECOND, INTERVAL '5' SECOND), flight_number

If an invalid timestamp is encountered in the stream (for example, NaN), then the timestamp of the message defaults to 0, causing the message to be excluded from the current window.

If your data does not include a timestamp in a suitable format, it is possible to compute a new timestamp column from another existing column (using input transformations).