SQL Queries in Flink

A Table can be used for subsequent SQL and Table API queries, to be converted into a DataSet or DataStream, and to be written to a TableSink. You need to specify the SELECT queries with the sqlQuery() method of the TableEnvironment to return the result of the SELECT query as a Table.

SQL and Table API queries can be seamlessly mixed, and are holistically optimized and translated into a single program.

In order to access a Table in a SQL query, it must be registered in the TableEnvironment. A Table can be registered from the following ways:
  • TableSource
  • Table
  • CREATE TABLE statement
  • DataStream
  • DataSet

Alternatively, users can also register catalogs in a TableEnvironment to specify the location of the data sources.

The following is an example of SQL query in Java:
DataStream<Tuple2<String, Integer>> transactionStream = ...

tEnv.createTemporaryView("Transactions", transactionStream, "account, amount");

Table balance = tEnv.sqlQuery(
  "SELECT account, sum(amount) as balance FROM Transactions GROUP BY account"
);

DataStream<Tuple2<Boolean, Row>> balanceStream = tEnv.toRetractStream(balance, Row.class);

For the detailed documentation and the example code for the different query types, see the Apache Flink documentation.