PutIcebergCDC 2.3.0.4.10.0.0-147

Bundle
com.cloudera | nifi-cdf-iceberg-nar
Description
This processor uses Iceberg API to parse and load CDC (Change Data Capture) records into Iceberg tables. The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information. A CDC operation type and before/after records need to be specified in the incoming record to identify the change which is applied to Iceberg table. The processor requires Iceberg 2.0 tables. The target Iceberg table should already exist and it must have the same schema which before/after records have in the incoming CDC record. To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.
Tags
avro, iceberg, orc, parquet, parse, put, record, store, table
Input Requirement
Supports Sensitive Dynamic Properties
false
  • Additional Details for PutIcebergCDC 2.3.0.4.10.0.0-147

    PutIcebergCDC

    Description

    Iceberg is a high-performance format for huge analytic tables.

    The PutIcebergCDC processor is capable of applying CDC (Change Data Capture) operations on Iceberg tables using Hive Iceberg catalog.

    CDC (Change Data Capture) support

    Note: The processor requires Iceberg 2.0 tables since the solution depends on equality delete files availability.

    PutIcebergCDC processor accepts CDC records as input.

    The “Operation RecordPath” field specifies the operation. “Before Data RecordPath” and “After Data RecordPath” fields are specified as well to mark the status of the record before and after the operation.

    Supported operation types

    • c (Debezium create) or I (GoldenGate insert) - The record - specified in after field - will be inserted to Iceberg.
    • d (Debezium delete) or D (GoldenGate delete) - The record - specified in before field - will be deleted from Iceberg.
    • u (Debezium update) or U (GoldenGate update) - The record - specified in before field - will be replaced with the new content - specified in after field.
    • r (Debezium read) - Read records are handled as create records, those records are inserted to Iceberg.

    Note: In case of delete and update operations position or equality delete files will be generated to remove records from Iceberg.

    Debezium record support

    The processor accepts CDC records generated by Debezium when “Record Type” is set to “Debezium” (this is the default value).

    Each Debezium record contains a schema and a payload field.

    The payload field contains the name of the operation (op field), the record status before the operation (before field)

    and the record status after the operation (after field).

    Example record:

    
                    {
                      "schema": {...},
                      "payload": {
                        "before": null,
                        "after": {
                          "id": 1,
                          "first_name": "Anne",
                          "last_name": "Kretchmar",
                          "email": "annek@noanswer.org"
                        },
                        "source": { ...},
                        "op": "c",
                        "ts_ms": 1559033904863
                      }
                    }
                
    

    For more information about Debezium records, please check the following documentation:

    GoldenGate record support

    The processor accepts CDC records generated by GoldenGate when “Record Type” is set to “GoldenGate”.

    The PutIcebergCDC processor supports the GoldenGate record format when the before and after states are included in the incoming record.

    The “op_type” field holds the operation type, “before” field stores the record before the operation, “after” field stores the record after the operation.

    Example record:

    
                    {
                      "table": "ORCL.ESHOP.CUSTOMER_ORDER_ITEM",
                      "op_type": "I",
                      "op_ts": "2019-05-31 04:24:34.000327",
                      "current_ts": "2019-05-31 04:24:39.650000",
                      "pos": "00000000020000004074",
                      "primary_keys": [
                        "ID"
                      ],
                      "tokens": {
                        "txid": "9.32.6726",
                        "csn": "13906131"
                      },
                      "before": null,
                      "after": {
                        "ID": 11,
                        "ID_CUSTOMER_ORDER": 11,
                        "DESCRIPTION": "Cars 3",
                        "QUANTITY": 2
                      }
                    }
                
    

    For more information about GoldenGate records, please check the following documentation:

    Custom record support

    Having “Record Type” set to “Custom” it is possible to define how values of operation type are interpreted, which value represents an insert, delete or update operation.
    In this case “Insert Operation Type”, “Delete Operation Type” and “Update Operation Type” properties need to be set.

    Commit retry properties

    Iceberg supports multiple concurrent writes using optimistic concurrency. The processor’s commit retry implementation is using exponential backoff with jitter and scale factor 2, and provides the following properties to configure the behaviour according to its usage.

    • Number Of Commit Retries (default: 10) - Number of retries that the processor is going to try to commit the new data files.
    • Minimum Commit Wait Time (default: 100 ms) - Minimum time that the processor is going to wait before each commit attempt.
    • Maximum Commit Wait Time (default: 2 sec) - Maximum time that the processor is going to wait before each commit attempt.
    • Maximum Commit Duration (default: 30 sec) - Maximum duration that the processor is going to wait before failing the current processor event’s commit.

    The NiFi side retry logic is built on top of the Iceberg commit retry logic which can be configured through table properties. See more: Table behavior properties

    Equality delete files

    Equality delete files contain the column values which are used to identify the records to be deleted. Note: Float and double fields can not be used in equality delete files.

    An example equality delete file looks like this, denoting that records with size = 2 and color = ‘red’ will be deleted.

    
                ┌─────────┬────────────┐
                │  size   │  color     │
                │ int32   │  varchar   │
                ├─────────┼────────────┤
                │     2   │ red        │
                └─────────┴────────────┘
            
    

    Equality Delete Field Strategy

    In case a primary key is defined on the table, it is more efficient to include only the primary (and partition) keys in the equality delete files, those fields are sufficient to identify the records that need to be deleted.

    It is also possible to include all fields in the equality delete file, although this solution might cause performance issues when the table contains several fields. Storing lots of fields to equality delete files also results huge delete files.

    The “Equality Delete Field Strategy” processor property controls which strategy is used, whether “Primary and Partition Keys” or “All Field” are stored in equality delete files.

Properties
Relationships
Name Description
success A FlowFile is routed to this relationship after all data operations were successful.
failure A FlowFile is routed to this relationship when a data operation is failed.
Writes Attributes
Name Description
iceberg.cdc.record.count The number of CDC records in the FlowFile.