Managing time in SSB
Time attributes define how streams are processed in time. There are two high-level options for providing time attributes to your SQL Stream Builder queries.
Source-provided timestamps
Source-provided timestamps are injected directly into the data stream by the source
connector. For example, when using a Kafka data source, the timestamp extracted from the
Kafka message header is embedded in the data stream by default, and exposed through an
eventTimestamp
column. In the following query, the built-in
eventTimestamp
column is used to window the query based on the timestamp
recorded in the source Kafka
topic.SELECT flight_number, flight_origin, flight_destination,
TUMBLE_END(eventTimestamp, INTERVAL '5' MINUTE) AS window_end_timestamp
FROM airplane_flights
GROUP BY TUMBLE(eventTimestamp, INTERVAL '5' MINUTE)
The following query uses the source-provided timestamp from two virtual tables to perform a
streaming join on multiple Kafka
topics:
SELECT a.web_order_id, a.product_name, a.order_date
b.next_shipment_time
FROM online_orders a, shipment_events b
WHERE a.shipping_type = 'Priority'
AND a.eventTimestamp BETWEEN b.eventTimestamp - INTERVAL '1' HOUR AND b.eventTimestamp
User-provided timestamps
The user can also specify timestamps contained in the data stream itself.
Requirements:
- The timestamp needs to be contained in a column of type
long
. - The timestamp needs to be in epoch format (for example, milliseconds since Jan 1, 1970).
timestamp_ms
, it is possible to
construct a query as the following
example:SELECT *
FROM airplane_flights a
WHERE flight_number IS NOT NULL
GROUP BY HOP(a.timestamp_ms, INTERVAL '15' SECOND, INTERVAL '5' SECOND), flight_number
If an invalid timestamp is encountered in the stream (for example, NaN), then the timestamp of the message defaults to 0, causing the message to be excluded from the current window.
If your data does not include a timestamp in a suitable format, it is possible to compute a new timestamp column from another existing column (using input transformations).