Converting DataStreams to Tables
When converting DataStreams to Tables you need to define the StreamTableEnvironment for the conversion. Cloudera recommends creating the tables with names as it is easier to refer to them in SQL. You should also take the processing and event time into consideration as crucial elements of Flink streaming applications.
StreamTableEnvironment
is used to convert a DataStream
into a
Table
. You can use the fromDataStream
and
createTemporaryView
methods for the conversion. Cloudera recommends that you
use the createTemporaryView
method as it provides a way to assign a name to the
created table. Named tables can be referenced directly in SQL afterwards.
Both of these methods take an optional, but recommended, string parameter to define field name
mappings. The string must contain a comma separated list of the desired column names. If the
string is not specified, the column names are set to f0, f1, ...fn
.
DataStream<Tuple2<Integer, String>> stream = ...
Table table = tableEnv.fromDataStream(stream, "col_1, col_2");
tableEnv.createTemporaryView("MyTableName", stream, "col_1, col_2");
You need to take into consideration the event timestamps and watermarks when converting
DataStreams
.
.proctime
property during schema
definition.DataStream<Row> stream = ...
Table table = tableEnv.fromDataStream(stream, "col_1, col_2, ts_col.proctime");
For
more information on time handling in SQL, see the Apache Flink documentation..rowtime
property during schema
definition. This can either replace an existing field or create a new one, but in either
case, the field holds the event timestamp of the current
record.DataStream<Tuple2<Timestamp, String>> stream = …
stream.assignTimestampsAndWatermarks(...)
Table table = tableEnv.fromDataStream(stream, "event_ts.rowtime, col_2");
For
more information on time handling in SQL, see the Apache Flink documentation.