PutIcebergCDC

Description

Iceberg is a high-performance format for huge analytic tables.
The PutIcebergCDC processor is capable of applying CDC (Change Data Capture) operations on Iceberg tables using Hive Iceberg catalog.

CDC (Change Data Capture) support

Note: The processor requires Iceberg 2.0 tables since the solution depends on equality delete files availability.


PutIcebergCDC processor accepts CDC records as input.
The "Operation RecordPath" field specifies the operation. "Before Data RecordPath" and "After Data RecordPath" fields are specified as well to mark the status of the record before and after the operation.

Supported operation types

Note: In case of delete and update operations position or equality delete files will be generated to remove records from Iceberg.

Debezium record support

The processor accepts CDC records generated by Debezium when "Record Type" is set to "Debezium" (this is the default value). Each Debezium record contains a schema and a payload field.
The payload field contains the name of the operation (op field), the record status before the operation (before field)
and the record status after the operation (after field).

Example record:
                {
                  "schema": {...},
                  "payload": {
                    "before": null,
                    "after": {
                      "id": 1,
                      "first_name": "Anne",
                      "last_name": "Kretchmar",
                      "email": "annek@noanswer.org"
                    },
                    "source": { ...},
                    "op": "c",
                    "ts_ms": 1559033904863
                  }
                }
            
For more information about Debezium records, please check the following documentation:

GoldenGate record support

The processor accepts CDC records generated by GoldenGate when "Record Type" is set to "GoldenGate".
The PutIcebergCDC processor supports the GoldenGate record format when the before and after states are included in the incoming record.
The "op_type" field holds the operation type, "before" field stores the record before the operation, "after" field stores the record after the operation.

Example record:
                {
                  "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM",
                  "op_type": "I",
                  "op_ts": "2019-05-31 04:24:34.000327",
                  "current_ts": "2019-05-31 04:24:39.650000",
                  "pos": "00000000020000004074",
                  "primary_keys": [
                    "ID"
                  ],
                  "tokens": {
                    "txid": "9.32.6726",
                    "csn": "13906131"
                  },
                  "before": null,
                  "after": {
                    "ID": 11,
                    "ID_CUSTOMER_ORDER": 11,
                    "DESCRIPTION": "Cars 3",
                    "QUANTITY": 2
                  }
                }
            
For more information about GoldenGate records, please check the following documentation:

Custom record support

Having "Record Type" set to "Custom" it is possible to define how values of operation type are interpreted, which value represents an insert, delete or update operation.
In this case "Insert Operation Type", "Delete Operation Type" and "Update Operation Type" properties need to be set.

Commit retry properties

Iceberg supports multiple concurrent writes using optimistic concurrency. The processor's commit retry implementation is using exponential backoff with jitter and scale factor 2, and provides the following properties to configure the behaviour according to its usage.

The NiFi side retry logic is built on top of the Iceberg commit retry logic which can be configured through table properties. See more: Table behavior properties

Equality delete files

Equality delete files contain the column values which are used to identify the records to be deleted.

Note: Float and double fields can not be used in equality delete files.

An example equality delete file looks like this, denoting that records with size = 2 and color = 'red' will be deleted.
            ┌─────────┬────────────┐
            │  size   │  color     │
            │ int32   │  varchar   │
            ├─────────┼────────────┤
            │     2   │ red        │
            └─────────┴────────────┘
        

Equality Delete Field Strategy

In case a primary key is defined on the table, it is more efficient to include only the primary (and partition) keys in the equality delete files,
those fields are sufficient to identify the records that need to be deleted.

It is also possible to include all fields in the equality delete file, although this solution might cause performance issues when the table contains several fields.
Storing lots of fields to equality delete files also results huge delete files.

The "Equality Delete Field Strategy" processor property controls which strategy is used, whether "Primary and Partition Keys" or "All Field" are stored in equality delete files.