Joining streaming and bounded tables
Beside regular join and interval join, in Flink SQL you are able to join a streaming table and a slowly changing dimension table for enrichment. In this case, you need to use a temporal join where the streaming table is joined with a versioned table based on a key, and the processing or event time.
A versioned table is a table that contains a time attribute, and reflects the records from a table at a specific point of time. When you use append-only or regularly updated sources, the values related to a key are updated over a long period of time. For example, a table can contain the currency rates since last month. At every change of the currency rate, a new value is added to the stream, therefore to the table. With creating a versioned table of the currency rate, you can specify which rate you need in an exact point of time: use the currency rates from 12:00.
For more information of Version Tables, see the official Apache Flink documentation.
Temporal join
After determining the version of the bounded table, you also need to define a time for the streaming table. In Flink, event time and processing time can be specified. When using event time, you need to create a temporal join.
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (
...
)
JOIN
, you need to refer to the event
time column as defined for the table in the FOR SYSTEM_TIME AS OF part of
the SQL
query:SELECT
order_id,
price,
currency,
conversion_rate,
order_time,
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency
Lookup join
JOIN
SQL syntax, Flink translates
into a lookup join and uses the latest version of the bounded table. The following example
shows the join syntax that needs to be used for enriching streaming
data:SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF PROCTIME()
ON o.customer_id = c.id
FOR
SYSTEM_TIME AS OF PROCTIME()
syntax indicates that you always want to look up in
the latest version of the table. With including the processing time in the SQL syntax, you
can query the latest version of a lookup table, and enrich your streaming data with the
corresponding value.- Kudu
- Hive
- JDBC