PutIcebergCDC
Description
Iceberg is a high-performance format for huge analytic tables.
The PutIcebergCDC processor is capable of applying CDC (Change Data Capture) operations on Iceberg tables using Hive Iceberg catalog.
CDC (Change Data Capture) support
Note: The processor requires Iceberg 2.0 tables since the solution depends on equality delete files availability.
PutIcebergCDC processor accepts CDC records as input.
The "Operation RecordPath" field specifies the operation. "Before Data RecordPath" and "After Data RecordPath" fields are specified as well to mark the status of the record before and after the operation.
Supported operation types
-
c (Debezium create) or I (GoldenGate insert) - The record - specified in after field - will be inserted to Iceberg.
-
d (Debezium delete) or D (GoldenGate delete) - The record - specified in before field - will be deleted from Iceberg.
-
u (Debezium update) or U (GoldenGate update) - The record - specified in before field - will be replaced with the new content - specified in after field.
-
r (Debezium read) - Read records are handled as create records, those records are inserted to Iceberg.
Note: In case of delete and update operations position or equality delete files will be generated to remove records from Iceberg.
Debezium record support
The processor accepts CDC records generated by Debezium when "Record Type" is set to "Debezium" (this is the default value).
Each Debezium record contains a schema and a payload field.
The payload field contains the name of the operation (op field), the record status before the operation (before field)
and the record status after the operation (after field).
Example record:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"source": { ...},
"op": "c",
"ts_ms": 1559033904863
}
}
For more information about Debezium records, please check the following documentation:
GoldenGate record support
The processor accepts CDC records generated by GoldenGate when "Record Type" is set to "GoldenGate".
The PutIcebergCDC processor supports the GoldenGate record format when the before and after states are included in the incoming record.
The "op_type" field holds the operation type, "before" field stores the record before the operation, "after" field stores the record after the operation.
Example record:
{
"table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM",
"op_type": "I",
"op_ts": "2019-05-31 04:24:34.000327",
"current_ts": "2019-05-31 04:24:39.650000",
"pos": "00000000020000004074",
"primary_keys": [
"ID"
],
"tokens": {
"txid": "9.32.6726",
"csn": "13906131"
},
"before": null,
"after": {
"ID": 11,
"ID_CUSTOMER_ORDER": 11,
"DESCRIPTION": "Cars 3",
"QUANTITY": 2
}
}
For more information about GoldenGate records, please check the following documentation:
Custom record support
Having "Record Type" set to "Custom" it is possible to define how values of operation type are interpreted, which value represents an insert,
delete or update operation.
In this case "Insert Operation Type", "Delete Operation Type" and "Update Operation Type" properties need to be set.
Commit retry properties
Iceberg supports multiple concurrent writes using optimistic concurrency.
The processor's commit retry implementation is using exponential backoff with jitter and scale factor 2, and provides the following properties to configure the behaviour according to its usage.
-
Number Of Commit Retries (default: 10) - Number of retries that the processor is going to try to commit the new data files.
-
Minimum Commit Wait Time (default: 100 ms) - Minimum time that the processor is going to wait before each commit attempt.
-
Maximum Commit Wait Time (default: 2 sec) - Maximum time that the processor is going to wait before each commit attempt.
-
Maximum Commit Duration (default: 30 sec) - Maximum duration that the processor is going to wait before failing the current processor event's commit.
The NiFi side retry logic is built on top of the Iceberg commit retry logic which can be configured through table properties. See more: Table behavior properties
Equality delete files
Equality delete files contain the column values which are used to identify the records to be deleted.
Note: Float and double fields can not be used in equality delete files.
An example equality delete file looks like this, denoting that records with size = 2 and color = 'red'
will be deleted.
┌─────────┬────────────┐
│ size │ color │
│ int32 │ varchar │
├─────────┼────────────┤
│ 2 │ red │
└─────────┴────────────┘
Equality Delete Field Strategy
In case a primary key is defined on the table, it is more efficient to include only the primary (and partition) keys in the equality delete files,
those fields are sufficient to identify the records that need to be deleted.
It is also possible to include all fields in the equality delete file, although this solution might cause performance issues when the table contains several fields. Storing lots of fields to equality delete files also results huge delete files.
The "Equality Delete Field Strategy" processor property controls which strategy is used, whether "Primary and Partition Keys" or "All Field" are stored in equality delete files.