Flink SQL and Table API
In Cloudera Streaming Analytics, you can enhance your streaming application with analytical queries using Table API or SQL API. These are integrated in a joint API and can also be embedded into regular DataStream applications. The central concept of the joint API is a Table that serves as the input and output of your streaming data queries.
There are also two planners that translate Table/SQL queries to Flink jobs: the old planner and the Blink planner. While currently the old planner is the default Table API planner, Cloudera Streaming Analytics only supports the Blink planner for Table/SQL applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>1.10.0-csa1.2.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.10.0-csa1.2.0.0</version>
<scope>provided</scope>
</dependency>
- Create a
StreamTableEnvironment
with the Blink planner. - Register catalogs and tables.
- Execute the queries/updates.
- Execute the
StreamTableEnvironment
.
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(streamEnv, tableSettings);
tableEnv.sqlUpdate("CREATE TABLE ...");
Table table = tableEnv.sqlQuery("SELECT ... FROM ...");
DataStream<Row> stream = tableEnv.toAppendStream(table, Row.class);
stream.print();
tableEnv.execute("Print");
The Table API exposes different flavors of TableEnvironment
to the end
users that cover different feature sets. To ensure smooth interaction between other
DataStream applications, CSA only supports using
StreamTableEnvironment
.
StreamTableEnvironment
wraps a regular
StreamExecutionEnvironment
. This allows you to seamlessly go from
streams to tables and back within the same pipeline.
StreamTableEnvironment
with the following code
entry:StreamExecutionEnvironment streamEnv = ...
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(streamEnv, tableSettings);
You have to make sure to use the Blink planner with Streaming mode in the Environment settings.
When combining regular DataStream and Table/SQL applications, make sure to always call
the .execute
command on the StreamTableEnvironment
instead of the regular StreamExecutionEnvironment
to ensure correct
execution.