CREATE Statements
You can use CREATE statements to register database, table, and function objects into catalogs. You should add the connector, the name of the table, the schema and the data format to the statement based on your application design. You can further customize your table statement with computed columns to reflect time in a Flink application.
For more information about CREATE
statements, see the Apache Flink documentation.
CREATE TABLE
CREATE
TABLE
statement. The followings can be defined upfront for connecting to an
external system:- Name of the table
- Schema of the table
- Connector
- Data format
tableEnvironment.sqlUpdate(
"CREATE TABLE MyTable (\n" +
" ... -- declare table schema \n" +
") WITH (\n" +
" 'connector.type' = '...', -- connector specific properties\n" +
" ...\n" +
" 'update-mode' = 'append', -- declare update mode\n" +
" 'format.type' = '...', -- format specific properties\n" +
" ...\n" +
")");
Computed column and watermark
A computed column is a virtual column that is generated using the syntax “column_name
AS computed_column_expression
”. Computed columns are commonly used in Flink for
defining time attributes in CREATE TABLE
statements.
The WATERMARK
defines the event time attributes of a table, and allows
computed columns to calculate the watermark in the following form: WATERMARK FOR
rowtime_column_name AS watermark_strategy_expression
. The expression return type
must be TIMESTAMP(3)
.
CREATE TABLE ItemTransactions (
transactionId BIGINT,
ts BIGINT,
itemId STRING,
quantity INT,
event_time AS CAST(from_unixtime(floor(ts/1000)) AS TIMESTAMP(3)),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'transaction.log.1',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.bootstrap.servers' = '<hostname>:<port>',
'connector.properties.group.id' = 'test',
'format.type' = 'json'
);
CREATE DATABASE
tableEnvironment.sqlUpdate("CREATE DATABASE sample_database");
tableEnvironment.useDatabase("sample_database");
CREATE FUNCTION
package com.cloudera.udfs;
public static class HashCode extends ScalarFunction {
public int eval(String s) {
return s.hashCode();
}
}
tableEnvironment.sqlUpdate("CREATE FUNCTION hashcode AS
'com.cloudera.udfs.HashCode");