Flink SQL and Table API
In Cloudera Streaming Analytics, you can enhance your streaming application with analytical queries using Table API or SQL API. These are integrated in a joint API and can also be embedded into regular DataStream applications. The central concept of the joint API is a Table that serves as the input and output of your streaming data queries.
There are also two planners that translate Table/SQL queries to Flink jobs: the old planner and the Blink planner. Cloudera Streaming Analytics only supports the Blink planner for Table/SQL applications.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.19.2-csa1.13.3.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.19.2-csa1.13.3.0</version> </dependency>
- Create a
StreamTableEnvironment
with the Blink planner. - Register catalogs and tables.
- Run the queries/updates.
- Run the
StreamTableEnvironment
.
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings tableSettings = EnvironmentSettings .newInstance() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment .create(streamEnv, tableSettings); tableEnv.executeSql("CREATE TABLE ..."); Table table = tableEnv.sqlQuery("SELECT ... FROM ..."); DataStream<Row> stream = tableEnv.toDataStream(table, Row.class); stream.print();
The Table API exposes different flavors of TableEnvironment
to the end
users that cover different feature sets. To ensure smooth interaction between other
DataStream applications, Cloudera Streaming Analytics only supports using
StreamTableEnvironment
.
StreamTableEnvironment
wraps a regular
StreamExecutionEnvironment
. This allows you to seamlessly go from
streams to tables and back within the same pipeline.
StreamTableEnvironment
with the following code
entry:StreamExecutionEnvironment streamEnv = ... EnvironmentSettings tableSettings = EnvironmentSettings .newInstance() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment .create(streamEnv, tableSettings);
When combining regular DataStream and Table/SQL applications, make sure to always call
the .execute
command on the StreamTableEnvironment
instead of the regular StreamExecutionEnvironment
to ensure correct
execution.