Iceberg with Flink

Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data. In Cloudera Streaming Analytics, the DataStream API enables you to use Iceberg tables in your Flink jobs.

Iceberg is integrated with Flink as a connector, and you can use the DataStream API to read and write data from a data source to an Iceberg table. Both the V1 and V2 version specifics are supported by the Flink connector. For more information, about the version changes, see the Apache Iceberg documentation.
Table 1. DataStream feature support for the Iceberg and Flink integration
Feature Flink
Read Supported
Append Supported
Overwrite Supported
Upsert Technical Preview1
When using Iceberg with the DataStream API, you need to create the Flink job that includes referencing the Iceberg table at the TableLoader. The following example shows how to create your Flink job that reads or writes data to or from an Iceberg table using HDFS.
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> stream = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .streaming(true)
    .startSnapshotId(3821550127947089987L)
    .build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
   .tableLoader(tableLoader)
   .append();

env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
   .tableLoader(tableLoader)
   .overwrite(true)
   .append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
   .tableLoader(tableLoader)
   .upsert(true)
   .append();
env.execute("Test Iceberg DataStream");
You can add the options to write when configuring the Flink Sink as shown in the following example:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
   .table(table)
   .tableLoader(tableLoader)
   .set("write-format", "orc")
   .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
1 The upsert feature of the Iceberg and Flink integration is in Technical Preview and not ready for production deployment. Cloudera encourages you to explore these features in non-production environments and provide feedback on your experiences through the Cloudera Community Forums.