Iceberg with Flink
You can use Iceberg as a connector in Cloudera Streaming Analytics for your Flink jobs. Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data.
Iceberg is integrated with Flink as a connector, where you can read and write data from a data
source to an Iceberg table. Both the V1 and V2 version specifics are supported by the Flink
connector. For more information, about the version changes, see the Apache Iceberg documentation.
When using Iceberg with the DataStream API, you need to create the Flink job that
includes referencing the Iceberg table at the
Feature | Flink |
---|---|
Read | Supported |
Append | Supported |
Overwrite | Supported |
Upsert | Technical preview1 |
TableLoader
. The following
example shows how to create your Flink job that reads or writes data to or from an Iceberg
table using HDFS.StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.startSnapshotId(3821550127947089987L)
.build();
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.upsert(true)
.append();
env.execute("Test Iceberg DataStream");
You can add the options to write when configuring the Flink Sink as shown in the following
example:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
1 The Upsert feature of the Iceberg and Flink
integration is in Technical Preview and not ready for production
deployment. Cloudera encourages you to explore these features in
non-production environments and provide feedback on your experiences
through the Cloudera Community Forums.