Flink SQL and Table API
In Cloudera Streaming Analytics, you can enhance your streaming application with analytical queries using Table API or SQL API. These are integrated in a joint API and can also be embedded into regular DataStream applications. The central concept of the joint API is a Table that serves as the input and output of your streaming data queries.
There are also two planners that translate Table/SQL queries to Flink jobs: the old planner and the Blink planner. Cloudera Streaming Analytics only supports the Blink planner for Table/SQL applications.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.19.1-csa1.14.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader-bundle</artifactId>
<version>1.19.1-csa1.14.0.0</version>
</dependency>
- Create a
StreamTableEnvironment
with the Blink planner. - Register catalogs and tables.
- Run the queries/updates.
- Run the
StreamTableEnvironment
.
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(streamEnv, tableSettings);
tableEnv.executeSql("CREATE TABLE ...");
Table table = tableEnv.sqlQuery("SELECT ... FROM ...");
DataStream<Row> stream = tableEnv.toDataStream(table, Row.class);
stream.print();
The Table API exposes different flavors of TableEnvironment
to the end
users that cover different feature sets. To ensure smooth interaction between other
DataStream applications, CSA only supports using
StreamTableEnvironment
.
StreamTableEnvironment
wraps a regular
StreamExecutionEnvironment
. This allows you to seamlessly go from
streams to tables and back within the same pipeline.
StreamTableEnvironment
with the following code
entry:StreamExecutionEnvironment streamEnv = ...
EnvironmentSettings tableSettings = EnvironmentSettings
.newInstance()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment
.create(streamEnv, tableSettings);
When combining regular DataStream and Table/SQL applications, make sure to always call
the .execute
command on the StreamTableEnvironment
instead of the regular StreamExecutionEnvironment
to ensure correct
execution.