Managing time in SSB

Time attributes define how streams are processed in time. There are two high-level options for providing time attributes to your SQL Stream Builder queries.

Source-provided timestamps

Source-provided timestamps are injected directly into the data stream by the source connector. For example, when using a Kafka data source, the timestamp extracted from the Kafka message header will be embedded in the data stream by default, and exposed via a magic "eventTimestamp" column. For example, this query will use the built-in "eventTimestamp" column to window the query based on the timestamp recorded in the source Kafka topic.
SELECT flight_number, flight_origin, flight_destination,
  TUMBLE_END("eventTimestamp", INTERVAL '5' MINUTE) AS window_end_timestamp
FROM airplane_flights
GROUP BY TUMBLE("eventTimestamp", INTERVAL '5' MINUTE)
This query will use the source-provided timestamp from two virtual tables to perform a streaming join on multiple Kafka topics:
SELECT a.web_order_id, a.product_name, a.order_date
 b.next_shipment_time
FROM online_orders a, shipment_events b
WHERE a.shipping_type = 'Priority'
 AND a.eventTimestamp BETWEEN b.eventTimestamp - INTERVAL '1' HOUR AND b.eventTimestamp

User-provided timestamps

The user can also specify timestamps contained in the data stream itself.

Requirements:
  • The timestamp should be contained in a column of type "long".
  • The timestamp should be in epoch format (eg. milliseconds since Jan 1, 1970).
For example, if your schema includes a field called "timestamp_ms", it's possible to construct a query such as:
SELECT *
FROM airplane_flights a
WHERE flight_number IS NOT NULL
GROUP BY HOP(a."timestamp_ms", INTERVAL '15' SECOND, INTERVAL '5' SECOND), flight_number

If an invalid timestamp is encountered in the stream (eg. NaN), then that message's timestamp defaults to 0, causing the message to be excluded from the current window.

If your data does not include a timestamp in a suitable format, it's possible to compute a new timestamp column from another existing column (using input transformations).