Converting DataStreams to Tables
When converting DataStreams to Tables you need to define the StreamTableEnvironment for the conversion. Cloudera recommends creating the tables with names as it is easier to refer to them in SQL. You should also take the processing and event time into consideration as crucial elements of Flink streaming applications.
StreamTableEnvironment
is used to convert a DataStream
into a
Table
. You can use the fromDataStream
and
createTemporaryView
methods for the conversion. Cloudera recommends that you
use the createTemporaryView
method as it provides a way to assign a name to the
created table. Named tables can be referenced directly in SQL afterwards.
Both of these methods take an optional, but recommended, string parameter to define field name
mappings. The string must contain a comma separated list of the desired column names. If the
string is not specified, the column names are set to f0, f1, ...fn
.
DataStream<Tuple2<Integer, String>> stream = ...
Table table = tableEnv.fromDataStream(stream, "col_1, col_2");
tableEnv.createTemporaryView("MyTableName", stream, "col_1, col_2");
You need to take into consideration the event timestamps and watermarks when converting
DataStreams
.
- The processing time attribute must be defined as an additional (logical) column marked
with the
.proctime
property during schema definition.
For more information on time handling in SQL, see the Apache Flink documentation.DataStream<Row> stream = ... Table table = tableEnv.fromDataStream(stream, "col_1, col_2, ts_col.proctime");
- Even time attributes are defined by the
.rowtime
property during schema definition. This can either replace an existing field or create a new one, but in either case, the field holds the event timestamp of the current record.
For more information on time handling in SQL, see the Apache Flink documentation.DataStream<Tuple2<Timestamp, String>> stream = … stream.assignTimestampsAndWatermarks(...) Table table = tableEnv.fromDataStream(stream, "event_ts.rowtime, col_2");