Iceberg with Flink
You can use Iceberg as a connector in Cloudera Streaming Analytics for your Flink jobs. Apache Iceberg is an open, high-performance table format for organizing datasets that can contain petabytes of data.
Iceberg is integrated with Flink as a connector, where you can read and write data from a data
source to an Iceberg table. The following table summarizes the Iceberg features that are
supported in Flink:
When using
Iceberg with the DataStream API, you need to create the Flink job that includes referencing
the Iceberg table at the
Feature | Flink |
---|---|
Read | Supported |
Append | Supported |
Overwrite | Supported |
Upsert | Technical preview1 |
TableLoader
. The following example shows how to
create your Flink job that reads or writes data to or from an Iceberg table using HDFS.StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path"); DataStream<RowData> stream = FlinkSource.forRowData() .env(env) .tableLoader(tableLoader) .streaming(true) .startSnapshotId(3821550127947089987L) .build(); // Print all records to stdout. stream.print(); // Submit and execute this streaming read job. env.execute("Test Iceberg Streaming Read");
You can add the options to write when configuring the Flink Sink as shown in the following
example:
FlinkSink.Builder builder = FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA) .table(table) .tableLoader(tableLoader) .set("write-format", "orc") .set(FlinkWriteOptions.OVERWRITE_MODE, "true");
1 The Upsert feature of the Iceberg and Flink
integration is in Technical Preview and not ready for production
deployment. Cloudera encourages you to explore these features in
non-production environments and provide feedback on your experiences
through the Cloudera Community Forums.