Iceberg with Flink

You can use Iceberg as a connector in Cloudera Streaming Analytics for your Flink jobs. Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data.

Iceberg is integrated with Flink as a connector, where you can read and write data from a data source to an Iceberg table. The following table summarizes the Iceberg features that are supported in Flink:
Table 1. DataStream feature support for the Iceberg and Flink integration
Feature Flink
Read Supported
Append Supported
Overwrite Supported
Upsert Technical preview1
When using Iceberg with the DataStream API, you need to create the Flink job that includes referencing the Iceberg table at the TableLoader. The following example shows how to create your Flink job that reads or writes data to or from an Iceberg table using HDFS.
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");

DataStream<RowData> stream = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .streaming(true)
    .startSnapshotId(3821550127947089987L)
    .build();

// Print all records to stdout.
stream.print();

// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
   .tableLoader(tableLoader)
   .append();

env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
   .tableLoader(tableLoader)
   .overwrite(true)
   .append();
env.execute("Test Iceberg DataStream");
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
   .tableLoader(tableLoader)
   .upsert(true)
   .append();
env.execute("Test Iceberg DataStream");
You can add the options to write when configuring the Flink Sink as shown in the following example:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
   .table(table)
   .tableLoader(tableLoader)
   .set("write-format", "orc")
   .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
1 The Upsert feature of the Iceberg and Flink integration is in Technical Preview and not ready for production deployment. Cloudera encourages you to explore these features in non-production environments and provide feedback on your experiences through the Cloudera Community Forums.