Converting DataStreams to Tables

When converting DataStreams to Tables you need to define the StreamTableEnvironment for the conversion. Cloudera recommends creating the tables with names as it is easier to refer to them in SQL. You should also take the processing and event time into consideration as crucial elements of Flink streaming applications.

StreamTableEnvironment is used to convert a DataStream into a Table. You can use the fromDataStream and createTemporaryView methods for the conversion. Cloudera recommends that you use the createTemporaryView method as it provides a way to assign a name to the created table. Named tables can be referenced directly in SQL afterwards.

Both of these methods take an optional, but recommended, string parameter to define field name mappings. The string must contain a comma separated list of the desired column names. If the string is not specified, the column names are set to f0, f1, ...fn.

DataStream<Tuple2<Integer, String>> stream = ...

Table table = tableEnv.fromDataStream(stream, "col_1, col_2");

tableEnv.createTemporaryView("MyTableName", stream, "col_1, col_2");

You need to take into consideration the event timestamps and watermarks when converting DataStreams.

The processing time attribute must be defined as an additional (logical) column marked with the .proctime property during schema definition.
DataStream<Row> stream = ...

Table table = tableEnv.fromDataStream(stream, "col_1, col_2, ts_col.proctime");
For more information on time handling in SQL, see the Apache Flink documentation.
Even time attributes are defined by the .rowtime property during schema definition. This can either replace an existing field or create a new one, but in either case, the field holds the event timestamp of the current record.
DataStream<Tuple2<Timestamp, String>> stream = …
stream.assignTimestampsAndWatermarks(...)

Table table = tableEnv.fromDataStream(stream, "event_ts.rowtime, col_2");
For more information on time handling in SQL, see the Apache Flink documentation.