Iceberg with Flink
You can use Iceberg as a connector in Cloudera Streaming Analytics for your Flink jobs. Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data.
Iceberg is integrated with Flink as a connector, where you can read and write data from a data
source to an Iceberg table. The following table summarizes the Iceberg features that are
supported in Flink:
When using
Iceberg with the DataStream API, you need to create the Flink job that includes referencing
the Iceberg table at the
| Feature | Flink |
|---|---|
| Read | Supported |
| Append | Supported |
| Overwrite | Supported |
| Upsert | Technical preview1 |
1 The Upsert feature of the Iceberg and Flink
integration is in Technical Preview and not ready for production
deployment. Cloudera encourages you to explore these features in
non-production environments and provide feedback on your experiences
through the Cloudera Community Forums.
TableLoader. The following example shows how to
create your Flink job that reads or writes data to or from an Iceberg table using HDFS.StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.startSnapshotId(3821550127947089987L)
.build();
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();
env.execute("Test Iceberg DataStream");StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.overwrite(true)
.append();
env.execute("Test Iceberg DataStream");StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.upsert(true)
.append();
env.execute("Test Iceberg DataStream");You can add the options to write when configuring the Flink Sink as shown in the following
example:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
.table(table)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");