CREATE Statements
You can use CREATE statements to register database, table, and function objects into catalogs. You should add the connector, the name of the table, the schema and the data format to the statement based on your application design. You can further customize your table statement with computed columns to reflect time in a Flink application.
For more information about CREATE statements, see the Apache Flink documentation.
CREATE TABLE
   
   CREATE
          TABLE statement. The followings can be defined upfront for connecting to an
        external system:- Name of the table
 - Schema of the table
 - Connector
 - Data format
 
tableEnvironment.sqlUpdate(
    "CREATE TABLE MyTable (\n" +
    "  ...    -- declare table schema \n" +
    ") WITH (\n" +
    "  'connector.type' = '...',  -- connector specific properties\n" +
    "  ...\n" +
    "  'update-mode' = 'append',  -- declare update mode\n" +
    "  'format.type' = '...',     -- format specific properties\n" +
    "  ...\n" +
    ")");Computed column and watermark
A computed column is a virtual column that is generated using the syntax “column_name
          AS computed_column_expression”. Computed columns are commonly used in Flink for
        defining time attributes in CREATE TABLE statements.
The WATERMARK defines the event time attributes of a table, and allows
        computed columns to calculate the watermark in the following form: WATERMARK FOR
          rowtime_column_name AS watermark_strategy_expression. The expression return type
        must be TIMESTAMP(3).
CREATE TABLE ItemTransactions (
	transactionId    BIGINT,
	ts    BIGINT,
	itemId    STRING,
	quantity INT,
	event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
	WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
	'connector.type'    	 = 'kafka',
	'connector.version' 	 = 'universal',
	'connector.topic'   	 = 'transaction.log.1',
	'connector.startup-mode' = 'earliest-offset',
	'connector.properties.bootstrap.servers' = '<hostname>:<port>',
	'connector.properties.group.id' = 'test',
	'format.type' = 'json'
);
  CREATE DATABASE
   
   tableEnvironment.sqlUpdate("CREATE DATABASE sample_database");
tableEnvironment.useDatabase("sample_database");
   CREATE FUNCTION
   
   package com.cloudera.udfs;
public static class HashCode extends ScalarFunction {
  public int eval(String s) {
     return s.hashCode();
  }
}
tableEnvironment.sqlUpdate("CREATE FUNCTION hashcode AS 
'com.cloudera.udfs.HashCode");
   