JoinEnrichment

Description:

Joins together Records from two different FlowFiles where one FlowFile, the 'original' contains arbitrary records and the second FlowFile, the 'enrichment' contains additional data that should be used to enrich the first. See Additional Details for more information on how to configure this processor and the different use cases that it aims to accomplish.

Additional Details...

Tags:

fork, join, enrichment, record, sql, wrap, recordpath, merge, combine, streams

Properties:

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

Display NameAPI NameDefault ValueAllowable ValuesDescription
Original Record ReaderOriginal Record ReaderController Service API:
RecordReaderFactory
Implementations: JASN1Reader
JsonTreeReader
GrokReader
Syslog5424Reader
CiscoEmblemSyslogMessageReader
AvroReader
JsonPathReader
CEFReader
IPFIXReader
WindowsEventLogReader
XMLReader
ScriptedReader
ReaderLookup
YamlTreeReader
ParquetReader
CSVReader
EBCDICRecordReader
ExcelReader
SyslogReader
The Record Reader for reading the 'original' FlowFile
Enrichment Record ReaderEnrichment Record ReaderController Service API:
RecordReaderFactory
Implementations: JASN1Reader
JsonTreeReader
GrokReader
Syslog5424Reader
CiscoEmblemSyslogMessageReader
AvroReader
JsonPathReader
CEFReader
IPFIXReader
WindowsEventLogReader
XMLReader
ScriptedReader
ReaderLookup
YamlTreeReader
ParquetReader
CSVReader
EBCDICRecordReader
ExcelReader
SyslogReader
The Record Reader for reading the 'enrichment' FlowFile
Record WriterRecord WriterController Service API:
RecordSetWriterFactory
Implementations: FreeFormTextRecordSetWriter
ParquetRecordSetWriter
AvroRecordSetWriter
JsonRecordSetWriter
ScriptedRecordSetWriter
XMLRecordSetWriter
CSVRecordSetWriter
RecordSetWriterLookup
The Record Writer to use for writing the results. If the Record Writer is configured to inherit the schema from the Record, the schema that it will inherit will be the result of merging both the 'original' record schema and the 'enrichment' record schema.
Join StrategyJoin StrategyWrapper
  • Wrapper The output is a Record that contains two fields: (1) 'original', containing the Record from the original FlowFile and (2) 'enrichment' containing the corresponding Record from the enrichment FlowFile. Records will be correlated based on their index in the FlowFile. If one FlowFile has more Records than the other, a null value will be used.
  • SQL The output is derived by evaluating a SQL SELECT statement that allows for two tables: 'original' and 'enrichment'. This allows for SQL JOIN statements to be used in order to correlate the Records of the two FlowFiles, so the index in which the Record is encountered in the FlowFile does not matter.
  • Insert Enrichment Fields The enrichment is joined together with the original FlowFile by placing all fields of the enrichment Record into the corresponding Record from the original FlowFile. Records will be correlated based on their index in the FlowFile.
Specifies how to join the two FlowFiles into a single FlowFile
SQLSQLSELECT original.*, enrichment.* FROM original LEFT OUTER JOIN enrichment ON original.id = enrichment.idThe SQL SELECT statement to evaluate. Expression Language may be provided, but doing so may result in poorer performance. Because this Processor is dealing with two FlowFiles at a time, it's also important to understand how attributes will be referenced. If both FlowFiles have an attribute with the same name but different values, the Expression Language will resolve to the value provided by the 'enrichment' FlowFile.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)

This Property is only considered if the [Join Strategy] Property has a value of "SQL".
Default Decimal Precisiondbf-default-precision10When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'precision' denoting number of available digits is required. Generally, precision is defined by column data type definition or database engines default. However undefined precision (0) can be returned from some database engines. 'Default Decimal Precision' is used when writing those undefined precision numbers.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)

This Property is only considered if the [Join Strategy] Property has a value of "SQL".
Default Decimal Scaledbf-default-scale0When a DECIMAL/NUMBER value is written as a 'decimal' Avro logical type, a specific 'scale' denoting number of available decimal digits is required. Generally, scale is defined by column data type definition or database engines default. However when undefined precision (0) is returned, scale can also be uncertain with some database engines. 'Default Decimal Scale' is used when writing those undefined numbers. If a value has more decimals than specified scale, then the value will be rounded-up, e.g. 1.53 becomes 2 with scale 0, and 1.5 with scale 1.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)

This Property is only considered if the [Join Strategy] Property has a value of "SQL".
Insertion Record PathInsertion Record Path/Specifies where in the 'original' Record the 'enrichment' Record's fields should be inserted. Note that if the RecordPath does not point to any existing field in the original Record, the enrichment will not be inserted.
Supports Expression Language: true (will be evaluated using flow file attributes and Environment variables)

This Property is only considered if the [Join Strategy] Property has a value of "Insert Enrichment Fields".
Maximum number of BinsMaximum number of Bins10000Specifies the maximum number of bins that can be held in memory at any one time
TimeoutTimeout10 minSpecifies the maximum amount of time to wait for the second FlowFile once the first arrives at the processor, after which point the first FlowFile will be routed to the 'timeout' relationship.

Relationships:

NameDescription
timeoutIf one of the incoming FlowFiles (i.e., the 'original' FlowFile or the 'enrichment' FlowFile) arrives to this Processor but the other does not arrive within the configured Timeout period, the FlowFile that did arrive is routed to this relationship.
joinedThe resultant FlowFile with Records joined together from both the original and enrichment FlowFiles will be routed to this relationship
failureIf both the 'original' and 'enrichment' FlowFiles arrive at the processor but there was a failure in joining the records, both of those FlowFiles will be routed to this relationship.
originalBoth of the incoming FlowFiles ('original' and 'enrichment') will be routed to this Relationship. I.e., this is the 'original' version of both of these FlowFiles.

Reads Attributes:

None specified.

Writes Attributes:

NameDescription
mime.typeSets the mime.type attribute to the MIME Type specified by the Record Writer
record.countThe number of records in the FlowFile

State management:

This component does not store state.

Restricted:

This component is not restricted.

Input requirement:

This component requires an incoming relationship.

System Resource Considerations:

ResourceDescription
MEMORYThis Processor will load into heap all FlowFiles that are on its incoming queues. While it loads the FlowFiles themselves, and not their content, the FlowFile attributes can be very memory intensive. Additionally, if the Join Strategy is set to SQL, the SQL engine may require buffering the entire contents of the enrichment FlowFile for each concurrent task. See Processor's Additional Details for more details and for steps on how to mitigate these concerns.

See Also:

ForkEnrichment