Creating tables with Flink DDL
You can use Flink DDL to create tables or views by adding the CREATE statement to the Flink DDL window or filling out the Available templates.
When using Flink DDL, you have the option to either define your CREAT
TABLE
statement using SQL in the Flink DDL window, or use the available
Flink DDL templates. The Flink DDL templates are predefined examples of CREATE
TABLE
statements which you can fill out with your job specific values.
You can choose from the following templates:
CREATE TABLE like_table_sample (
-- new_time_col AS to_timestamp(time_col),
-- WATERMARK FOR new_time_col AS new_time_col - INTERVAL '5' SECOND
) WITH (
-- 'scan.startup.mode' = 'latest-offset'
)
LIKE kafka_table (
INCLUDING ALL
OVERWRITING OPTIONS
EXCLUDING GENERATED
)
CREATE TABLE datagen_sample (
column_int INT,
column_str STRING,
column_ts TIMESTAMP(3),
WATERMARK FOR column_ts AS column_ts - INTERVAL '3' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '100',
'fields.column_int.min' = '100000',
'fields.column_int.max' = '999999',
'fields.column_str.length' = '40'
)
CREATE TABLE jdbc_sample (
column_int INT,
column_str STRING,
column_ts TIMESTAMP(3)
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://<db_host>:5432/<db_name>',
'table-name' = '...',
'username' = '...',
'password' = '...'
)
CREATE TABLE kafka_sample (
column_int INT,
column_str STRING,
column_ts BIGINT,
event_time AS CAST(from_unixtime(floor(column_ts/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = '...',
'properties.bootstrap.servers' = '...',
--'properties.group.id' = '...',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
)
CREATE TABLE upsert_kafka (
column_int INT,
column_str STRING,
column_ts TIMESTAMP(3),
PRIMARY KEY (column_int) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = '...',
'properties.bootstrap.servers' = '...',
'key.format' = 'avro',
'value.format' = 'avro'
)
For full reference on the Flink SQL DDL functionality, see the official Apache Flink documentation.