Creating tables with Flink DDL

You can use Flink DDL to create tables or views by adding the CREATE statement to the Flink DDL window or filling out the Available templates.

When using Flink DDL, you have the option to either define your CREAT TABLE statement using SQL in the Flink DDL window, or use the available Flink DDL templates. The Flink DDL templates are predefined examples of CREATE TABLE statements which you can fill out with your job specific values.

You can choose from the following templates:
CREATE TABLE datagen_sample (
    column_int		INT,
    column_str    	STRING,
    column_ts    	TIMESTAMP(3),
    WATERMARK FOR column_ts AS column_ts - INTERVAL '3' SECOND
) WITH (
    'connector' = 'datagen',
    'rows-per-second' = '100',
    'fields.column_int.min' = '100000',
    'fields.column_int.max' = '999999',
    'fields.column_str.length' = '40'
)
CREATE TABLE jdbc_sample (
    column_int		INT,
    column_str    	STRING,
    column_ts    	TIMESTAMP(3)
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:postgresql://<db_host>:5432/<db_name>',
    'table-name' = '...',
    'username' = '...',
    'password' = '...'
)
CREATE TABLE kafka_sample (
    column_int		INT,
    column_str    	STRING,
    column_ts    	BIGINT,
    event_time AS CAST(from_unixtime(floor(column_ts/1000)) AS TIMESTAMP(3)),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = '...',
    'properties.bootstrap.servers' = '...',
    --'properties.group.id' = '...',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
)
CREATE TABLE upsert_kafka (
    column_int		INT,
    column_str    	STRING,
    column_ts    	TIMESTAMP(3),
    PRIMARY KEY (column_int) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = '...',
    'properties.bootstrap.servers' = '...',
    'key.format' = 'avro',
    'value.format' = 'avro'
)

For full reference on the Flink SQL DDL functionality, see the official Apache Flink documentation.