Flink SQL and Table API

In Cloudera Streaming Analytics, you can enhance your streaming application with analytical queries using Table API or SQL API. These are integrated in a joint API and can also be embedded into regular DataStream applications. The central concept of the joint API is a Table that serves as the input and output of your streaming data queries.

There are also two planners that translate Table/SQL queries to Flink jobs: the old planner and the Blink planner. Cloudera Streaming Analytics only supports the Blink planner for Table/SQL applications.

Adding the following Maven dependency to the Flink configuration file allows you to use the Table API with the Blink planner.
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge</artifactId>
  <version>1.19.1-csa1.13.0.0</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-planner_2.12</artifactId>
  <version>1.19.1-csa1.13.0.0</version>
</dependency>
SQL programs in Flink follow a structure similar to regular DataStream applications:
  1. Create a StreamTableEnvironment with the Blink planner.
  2. Register catalogs and tables.
  3. Run the queries/updates.
  4. Run the StreamTableEnvironment.
You can see an example of the structure here:
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

EnvironmentSettings tableSettings = EnvironmentSettings
     .newInstance()
     .build();

StreamTableEnvironment tableEnv = StreamTableEnvironment
     .create(streamEnv, tableSettings);

tableEnv.executeSql("CREATE TABLE ...");
Table table = tableEnv.sqlQuery("SELECT ... FROM ...");

DataStream<Row> stream = tableEnv.toDataStream(table, Row.class);
stream.print();

The Table API exposes different flavors of TableEnvironment to the end users that cover different feature sets. To ensure smooth interaction between other DataStream applications, CSA only supports using StreamTableEnvironment.

StreamTableEnvironment wraps a regular StreamExecutionEnvironment. This allows you to seamlessly go from streams to tables and back within the same pipeline.

You can create StreamTableEnvironment with the following code entry:
StreamExecutionEnvironment streamEnv = ...
EnvironmentSettings tableSettings = EnvironmentSettings
     .newInstance()
     .build();

StreamTableEnvironment tableEnv = StreamTableEnvironment
     .create(streamEnv, tableSettings);

When combining regular DataStream and Table/SQL applications, make sure to always call the .execute command on the StreamTableEnvironment instead of the regular StreamExecutionEnvironment to ensure correct execution.