Iceberg with Flink
Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data. In Cloudera Streaming Analytics, the DataStream API enables you to use Iceberg tables in your Flink jobs.
Iceberg is integrated with Flink as a connector, and you can use the DataStream API to read and
write data from a data source to an Iceberg table. Both the V1 and V2 version specifics are
supported by the Flink connector. For more information, about the version changes, see the Apache Iceberg documentation.
When using Iceberg with the DataStream API, you need to create the Flink job that
includes referencing the Iceberg table at the
Feature | Flink |
---|---|
Read | Supported |
Append | Supported |
Overwrite | Supported |
Upsert | Technical Preview1 |
TableLoader
. The following example
shows how to create your Flink job that reads or writes data to or from an Iceberg table using
HDFS.StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.startSnapshotId(3821550127947089987L)
.build();
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.upsert(true)
.append();
env.execute("Test Iceberg DataStream");
You can add the options to write when configuring the Flink Sink as shown in the following
example:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
1 The upsert feature of the Iceberg and Flink
integration is in Technical Preview and not ready for production
deployment. Cloudera encourages you to explore these features in
non-production environments and provide feedback on your experiences
through the Cloudera Community Forums.