CREATE Statements

CREATE statements are used to register database, table, and function objects into catalogs.

For more information about CREATE statements, see the Apache Flink documentation.

CREATE TABLE

You can connect the Flink Table API and the Flink SQL programs with external systems to read and write streaming tables. The table declaration is similar to a SQL CREATE TABLE statement. The followings can be defined upfront for connecting to an external system:
  • Name of the table
  • Schema of the table
  • Connector
  • Data format
You can see an example of the defined parameters:
tableEnvironment.sqlUpdate(
    "CREATE TABLE MyTable (\n" +
    "  ...    -- declare table schema \n" +
    ") WITH (\n" +
    "  'connector.type' = '...',  -- connector specific properties\n" +
    "  ...\n" +
    "  'update-mode' = 'append',  -- declare update mode\n" +
    "  'format.type' = '...',     -- format specific properties\n" +
    "  ...\n" +
    ")");

Computed column and watermark

A computed column is a virtual column that is generated using the syntax “column_name AS computed_column_expression”. Computed columns are commonly used in Flink for defining time attributes in CREATE TABLE statements.

The WATERMARK defines the event time attributes of a table, and allows computed columns to calculate the watermark in the following form: WATERMARK FOR rowtime_column_name AS watermark_strategy_expression. The expression return type must be TIMESTAMP(3).

CREATE TABLE ItemTransactions (
	transactionId    BIGINT,
	ts    BIGINT,
	itemId    STRING,
	quantity INT,
	event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
	WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'transaction.log.1',
	'connector.startup-mode' = 'earliest-offset',
	'connector.properties.bootstrap.servers' = '<hostname>:<port>',
	'connector.properties.group.id' = 'test',
	'format.type' = 'json'
);

CREATE DATABASE

tableEnvironment.sqlUpdate("CREATE DATABASE sample_database");
tableEnvironment.useDatabase("sample_database");

CREATE FUNCTION

package com.cloudera.udfs;
public static class HashCode extends ScalarFunction {
  public int eval(String s) {
     return s.hashCode();
  }
}

tableEnvironment.sqlUpdate("CREATE FUNCTION hashcode AS 
'com.cloudera.udfs.HashCode");