PutIcebergCDC

Description

Iceberg is a high-performance format for huge analytic tables.
The PutIcebergCDC processor is capable of applying CDC (Change Data Capture) operations on Iceberg tables using Hive Iceberg catalog.

CDC (Change Data Capture) support

Note: The processor requires Iceberg 2.0 tables since the solution depends on equality delete files availability.


PutIcebergCDC processor accepts CDC records as input.
The "Operation RecordPath" field specifies the operation. "Before Data RecordPath" and "After Data RecordPath" fields are specified as well to mark the status of the record before and after the operation.

Supported operation types

Note: In case of delete and update operations position or equality delete files will be generated to remove records from Iceberg.

Debezium record support

The processor accepts CDC records generated by Debezium when "Record Type" is set to "Debezium" (this is the default value). Each Debezium record contains a schema and a payload field.
The payload field contains the name of the operation (op field), the record status before the operation (before field)
and the record status after the operation (after field).

Example record:
                {
                  "schema": {...},
                  "payload": {
                    "before": null,
                    "after": {
                      "id": 1,
                      "first_name": "Anne",
                      "last_name": "Kretchmar",
                      "email": "annek@noanswer.org"
                    },
                    "source": { ...},
                    "op": "c",
                    "ts_ms": 1559033904863
                  }
                }
            
For more information about Debezium records, please check the following documentation:

GoldenGate record support

The processor accepts CDC records generated by GoldenGate when "Record Type" is set to "GoldenGate".
The PutIcebergCDC processor supports the GoldenGate record format when the before and after states are included in the incoming record.
The "op_type" field holds the operation type, "before" field stores the record before the operation, "after" field stores the record after the operation.

Example record:
                {
                  "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM",
                  "op_type": "I",
                  "op_ts": "2019-05-31 04:24:34.000327",
                  "current_ts": "2019-05-31 04:24:39.650000",
                  "pos": "00000000020000004074",
                  "primary_keys": [
                    "ID"
                  ],
                  "tokens": {
                    "txid": "9.32.6726",
                    "csn": "13906131"
                  },
                  "before": null,
                  "after": {
                    "ID": 11,
                    "ID_CUSTOMER_ORDER": 11,
                    "DESCRIPTION": "Cars 3",
                    "QUANTITY": 2
                  }
                }
            
For more information about GoldenGate records, please check the following documentation:

Custom record support

Having "Record Type" set to "Custom" it is possible to define how values of operation type are interpreted, which value represents an insert, delete or update operation.
In this case "Insert Operation Type", "Delete Operation Type" and "Update Operation Type" properties need to be set.

Commit retry properties

Iceberg supports multiple concurrent writes using optimistic concurrency. The processor's commit retry implementation is using exponential backoff with jitter and scale factor 2, and provides the following properties to configure the behaviour according to its usage.

The NiFi side retry logic is built on top of the Iceberg commit retry logic which can be configured through table properties. See more: Table behavior properties