Flink SQL and Table API
In Cloudera Streaming Analytics, you can enhance your streaming application with analytical queries using Table API or SQL API. These are integrated in a joint API and can also be embedded into regular DataStream applications. The central concept of the joint API is a Table that serves as the input and output of your streaming data queries.
There are also two planners that translate Table/SQL queries to Flink jobs: the old planner and the Blink planner. Cloudera Streaming Analytics only supports the Blink planner for Table/SQL applications.
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge</artifactId> <version>1.18.0-csadh1.12.0.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.18.0-csadh1.12.0.0</version> </dependency>
- Create a
StreamTableEnvironment
with the Blink planner. - Register catalogs and tables.
- Run the queries/updates.
- Run the
StreamTableEnvironment
.
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings tableSettings = EnvironmentSettings .newInstance() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment .create(streamEnv, tableSettings); tableEnv.executeSql("CREATE TABLE ..."); Table table = tableEnv.sqlQuery("SELECT ... FROM ..."); DataStream<Row> stream = tableEnv.toDataStream(table, Row.class); stream.print();
The Table API exposes different flavors of TableEnvironment
to the end
users that cover different feature sets. To ensure smooth interaction between other
DataStream applications, CSA only supports using
StreamTableEnvironment
.
StreamTableEnvironment
wraps a regular
StreamExecutionEnvironment
. This allows you to seamlessly go from
streams to tables and back within the same pipeline.
StreamTableEnvironment
with the following code
entry:StreamExecutionEnvironment streamEnv = ... EnvironmentSettings tableSettings = EnvironmentSettings .newInstance() .build(); StreamTableEnvironment tableEnv = StreamTableEnvironment .create(streamEnv, tableSettings);
When combining regular DataStream and Table/SQL applications, make sure to always call
the .execute
command on the StreamTableEnvironment
instead of the regular StreamExecutionEnvironment
to ensure correct
execution.