Managing time in SSB
Time attributes define how streams behave for time based operations such as window aggregations or joins. For Kafka tables you can use the Event Time tab to create source provided or user provided timestamp and watermarks. For other tables you can define time related attributes in the Flink DDL or directly in the SQL query. You can use timestamps that are already provided in the source or you can use custom timestamps.
Source-provided timestamps
order_time
field to perform
a temporal join on multiple Kafka
topics:-- Table of orders
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- Table of currency rates
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3),
WATERMARK FOR update_time AS update_time
) WITH (/* ... */);
-- Event time temporal join to enrich orders with currencies
SELECT
order_id,
price,
currency,
conversion_rate,
order_time,
FROM orders
LEFT JOIN currency_rates FOR SYSTEM TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency
User-provided timestamps
-- Table of orders
-- Converts order_time_string field to timestamp
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time_string STRING,
order_time as to_timestamp(order_time_string),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
When an invalid timestamp is found in the stream (for example, NaN), the timestamp of the message is going to be 0. This way the message is excluded from the current window.
When your data does not include a timestamp in a suitable format, it is possible to compute a new timestamp column from another existing column using the Input Transform feature of SSB.