Iceberg with Flink
You can use Iceberg as a connector in Cloudera Streaming Analytics for your Flink jobs. Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data.
Iceberg is integrated with Flink as a connector, where you can read and write data from a data
source to an Iceberg table. The following table summarizes the Iceberg features that are
supported in Flink:
When using
Iceberg with the DataStream API, you need to create the Flink job that includes referencing
the Iceberg table at the
Feature | Flink |
---|---|
Read | Supported |
Append | Supported |
Overwrite | Supported |
Upsert | Technical preview1 |
TableLoader
. The following example shows how to
create your Flink job that reads or writes data to or from an Iceberg table using HDFS.StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.startSnapshotId(3821550127947089987L)
.build();
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.upsert(true)
.append();
env.execute("Test Iceberg DataStream");
You can add the options to write when configuring the Flink Sink as shown in the following
example:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
1 The Upsert feature of the Iceberg and Flink
integration is in Technical Preview and not ready for production
deployment. Cloudera encourages you to explore these features in
non-production environments and provide feedback on your experiences
through the Cloudera Community Forums.