SQL Queries in Flink
A Table can be used for subsequent SQL and Table API queries, to be converted into a DataSet or DataStream, and to be written to a TableSink. You need to specify the SELECT queries with the sqlQuery() method of the TableEnvironment to return the result of the SELECT query as a Table.
SQL and Table API queries can be seamlessly mixed, and are holistically optimized and translated into a single program.
In order to access a
Table
in a SQL query, it must be registered in the
TableEnvironment
. A Table
can be registered from
the following ways:TableSource
- Table
CREATE TABLE
statementDataStream
DataSet
Alternatively, users can also register catalogs in a TableEnvironment
to
specify the location of the data sources.
The following is an example of SQL query in
Java:
DataStream<Tuple2<String, Integer>> transactionStream = ...
tEnv.createTemporaryView("Transactions", transactionStream, "account, amount");
Table balance = tEnv.sqlQuery(
"SELECT account, sum(amount) as balance FROM Transactions GROUP BY account"
);
DataStream<Tuple2<Boolean, Row>> balanceStream = tEnv.toRetractStream(balance, Row.class);
For the detailed documentation and the example code for the different query types, see the Apache Flink documentation.