PutIcebergCDC

Description:

This processor uses Iceberg API to parse and load CDC (Change Data Capture) records into Iceberg tables. The incoming data sets are parsed with Record Reader Controller Service and ingested into an Iceberg table using the configured catalog service and provided table information. A CDC operation type and before/after records need to be specified in the incoming record to identify the change which is applied to Iceberg table. The processor requires Iceberg 2.0 tables. The target Iceberg table should already exist and it must have the same schema which before/after records have in the incoming CDC record. To avoid 'small file problem' it is recommended pre-appending a MergeRecord processor.

Additional Details...

Tags:

iceberg, put, table, store, record, parse, orc, parquet, avro

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Catalog Servicecatalog-serviceController Service API:
IcebergCatalogService
Implementations: HadoopCatalogService
HiveCatalogService
Specifies the Controller Service to use for handling references to table’s metadata files.
Catalog Namespacecatalog-namespaceThe namespace of the catalog.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)
Table Nametable-nameThe name of the Iceberg table to write to.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)
Equality Delete Field StrategyEquality Delete Field StrategyAll Fields
  • Primary and Partition Keys Primary and partition keys will be used in equality delete files.
  • All Fields All fields will be used in equality delete files. Note: This solution may have bad impact on performance in case the table contains huge number of columns which are written to equality files.
Columns for equality delete files will be selected based on the strategy. Note: double and float type of columns can not be used in equality delete files.
Unmatched Column Behaviorunmatched-column-behaviorFail on Unmatched Columns
  • Ignore Unmatched Columns Any column in the database that does not have a field in the document will be assumed to not be required.  No notification will be logged
  • Warn on Unmatched Columns Any column in the database that does not have a field in the document will be assumed to not be required.  A warning will be logged
  • Fail on Unmatched Columns A flow will fail if any column in the database that does not have a field in the document.  An error will be logged
If an incoming record does not have a field mapping for all of the database table's columns, this property specifies how to handle the situation.
File Formatfile-format
  • AVRO
  • PARQUET
  • ORC
File format to use when writing Iceberg data files. If not set, then the 'write.format.default' table property will be used, default value is parquet.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)
Record TypeRecord TypeDebezium
  • Debezium The incoming record is in Debezium format. Supported operation types are: c (create), r (read - interpreted as insert), d (delete), u (update)
  • GoldenGate The incoming record is in GoldenGate format. Supported operation types are: I (insert), D (delete), U (update)
  • Custom The incoming record is in custom format. The supported operation type values for insert, update and delete need to be specified.
Specifies the type of the incoming CDC record. In case Custom record type is chosen, the used operation type values need to be specified for insert, delete and update.
Record Readerrecord-readerController Service API:
RecordReaderFactory
Implementations: JASN1Reader
JsonTreeReader
GrokReader
Syslog5424Reader
CiscoEmblemSyslogMessageReader
AvroReader
JsonPathReader
CEFReader
IPFIXReader
WindowsEventLogReader
XMLReader
ScriptedReader
ReaderLookup
YamlTreeReader
ParquetReader
CSVReader
EBCDICRecordReader
ExcelReader
SyslogReader
Specifies the Controller Service to use for parsing incoming data and determining the data's schema.
Insert Operation TypeInsert Operation TypeSpecifies the operation type values used which denote an insert operation in the incoming record.

This Property is only considered if the [Record Type] Property has a value of "Custom".
Delete Operation TypeDelete Operation TypeSpecifies the operation type values used which denote a delete operation in the incoming record.

This Property is only considered if the [Record Type] Property has a value of "Custom".
Update Operation TypeUpdate Operation TypeSpecifies the operation type values used which denote an update operation in the incoming record.

This Property is only considered if the [Record Type] Property has a value of "Custom".
Operation RecordPathOperation RecordPathThis property denotes a RecordPath that will be evaluated against each incoming Record in order to determine the operation type. The RecordPath must evaluate to one of the valid Iceberg Operation Types, or the incoming FlowFile will be routed to failure.

This Property is only considered if the [Record Type] Property has a value of "Custom".
Before Data RecordPathBefore Data RecordPathThis property denotes a RecordPath that will be evaluated against each incoming Record and marks the record state before the operation.

This Property is only considered if the [Record Type] Property has a value of "Custom".
After Data RecordPathAfter Data RecordPathThis property denotes a RecordPath that will be evaluated against each incoming Record and marks the record state after the operation.

This Property is only considered if the [Record Type] Property has a value of "Custom".
Failure StrategyFailure StrategyRoute to Failure
  • Route to Failure The FlowFile containing the Records that failed to insert will be routed to the 'failure' relationship
  • Rollback Session If any operation fails, all FlowFiles in the session will be rolled back to their input queue. This means that if data cannot be pushed, it will block any subsequent data from be pushed to Iceberg as well until the issue is resolved.
If one or more Records cannot be processed or the operation can not be applied to Iceberg table, specifies how to handle the failure.
Maximum File Sizemaximum-file-sizeThe maximum size that a file can be, if the file size is exceeded a new file will be generated with the remaining data. If not set, then the 'write.target-file-size-bytes' table property will be used, default value is 512 MB.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)
Kerberos User Servicekerberos-user-serviceController Service API:
KerberosUserService
Implementations: KerberosTicketCacheUserService
KerberosKeytabUserService
KerberosPasswordUserService
Specifies the Kerberos User Controller Service that should be used for authenticating with Kerberos.
Number of Commit Retriesnumber-of-commit-retries10Number of times to retry a commit before failing.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)
Minimum Commit Wait Timeminimum-commit-wait-time100 msMinimum time to wait before retrying a commit.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)
Maximum Commit Wait Timemaximum-commit-wait-time2 secMaximum time to wait before retrying a commit.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)
Maximum Commit Durationmaximum-commit-duration30 secTotal retry timeout period for a commit.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)

Relationships:

NameDescription
successA FlowFile is routed to this relationship after all data operations were successful.
failureA FlowFile is routed to this relationship when a data operation is failed.

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
iceberg.cdc.record.countThe number of CDC records in the FlowFile.

State management:

This component does not store state.

Restricted:

This component is not restricted.

System Resource Considerations:

None specified.