CREATE Statements

You can use CREATE statements to register database, table, and function objects into catalogs. You should add the connector, the name of the table, the schema and the data format to the statement based on your application design. You can further customize your table statement with computed columns to reflect time in a Flink application.

For more information about CREATE statements, see the Apache Flink documentation.


You can connect the Flink Table API and the Flink SQL programs with external systems to read and write streaming tables. The table declaration is similar to a SQL CREATE TABLE statement. The followings can be defined upfront for connecting to an external system:
  • Name of the table
  • Schema of the table
  • Connector
  • Data format
You can see an example of the defined parameters:
    "CREATE TABLE MyTable (\n" +
    "  ...    -- declare table schema \n" +
    ") WITH (\n" +
    "  'connector.type' = '...',  -- connector specific properties\n" +
    "  ...\n" +
    "  'update-mode' = 'append',  -- declare update mode\n" +
    "  'format.type' = '...',     -- format specific properties\n" +
    "  ...\n" +

Computed column and watermark

A computed column is a virtual column that is generated using the syntax “column_name AS computed_column_expression”. Computed columns are commonly used in Flink for defining time attributes in CREATE TABLE statements.

The WATERMARK defines the event time attributes of a table, and allows computed columns to calculate the watermark in the following form: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression. The expression return type must be TIMESTAMP(3).

CREATE TABLE ItemTransactions (
	transactionId    BIGINT,
	ts    BIGINT,
	itemId    STRING,
	quantity INT,
	event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
	WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'transaction.log.1',
	'connector.startup-mode' = 'earliest-offset',
	'' = '<hostname>:<port>',
	'' = 'test',
	'format.type' = 'json'


tableEnvironment.sqlUpdate("CREATE DATABASE sample_database");


package com.cloudera.udfs;
public static class HashCode extends ScalarFunction {
  public int eval(String s) {
     return s.hashCode();

tableEnvironment.sqlUpdate("CREATE FUNCTION hashcode AS