INSERT Statements

Data can be inserted into sink tables in various ways.

You can use the INSERT clause with different source tables, or the VALUES clause. The following examples show how to use INSERT and VALUES at the same time:

tableEnv.sqlUpdate("CREATE TABLE source_table (c1 STRING)  WITH (...)");
tableEnv.sqlUpdate("CREATE TABLE sink_table (c1 STRING)  WITH (...)");

tableEnv.sqlUpdate("INSERT INTO source_table VALUES ('foo')");

Table elements = tableEnv.fromDataStream(env.fromElements("bar"));
tableEnv.sqlUpdate("INSERT INTO source_table SELECT f0 from " + elements);

tableEnv.sqlUpdate("INSERT INTO sink_table SELECT * from source_table");

Table table = tableEnv.sqlQuery("SELECT * FROM sink_table");
tableEnv.toAppendStream(table, Row.class).printToErr();

For more information about INSERT statements, see the Apache Flink documentation.