Adding Functionality to Apache NiFi
Also available as:
PDF
loading table of contents...

Split Content (One-to-Many)

This Processor generally requires no user configuration, with the exception of the size of each Split to create. The onTrigger method obtains a FlowFile from its input queues. A List of type FlowFile is created. The original FlowFile is read via the ProcessSession's read method, and an InputStreamCallback is used. Within the InputStreamCallback, the content is read until a point is reached at which the FlowFile should be split. If no split is needed, the Callback returns, and the original FlowFile is routed to success. In this case, a Provenance ROUTE event is emitted. Typically, ROUTE events are not emitted when routing a FlowFile to success because this generates a very verbose lineage that becomes difficult to navigate. However, in this case,the event is useful because we would otherwise expect a FORK event and the absence of any event is likely to cause confusion. The fact that the FlowFile was not split but was instead transferred to success is logged, and the method returns.

If a point is reached at which a FlowFile needs to be split, a new FlowFile is created via the ProcessSession's create(FlowFile) method or the clone(FlowFile, long, long) method. The next section of code depends on whether the create method is used or the clone method is used. Both methods are described below. Which solution is appropriate must be determined on a case-by-case basis.

The Create Method is most appropriate when the data will not be directly copied from the original FlowFile to the new FlowFile. For example, if only some of the data will be copied, or if the data will be modified in some way before being copied to the new FlowFile, this method is necessary. However, if the content of the new FlowFile will be an exact copy of a portion of the original FlowFile, the Clone Method is much preferred.

Create Method If using the create method, the method is called with the original FlowFile as the argument so that the newly created FlowFile will inherit the attributes of the original FlowFile and a Provenance FORK event will be created by the framework.

The code then enters a try/finally block. Within the finally block, the newly created FlowFile is added to the List of FlowFiles that have been created. This is done within a finally block so that if an Exception is thrown, the newly created FlowFile will be appropriately cleaned up. Within the try block, the callback initiates a new callback by calling the ProcessSession's write method with an OutputStreamCallback. The appropriate data is then copied from the InputStream of the original FlowFile to the OutputStream for the new FlowFile.

Clone Method If the content of the newly created created FlowFile is to be only a contiguous subset of the bytes of the original FlowFile, it is preferred to use the clone(FlowFile, long, long) method instead of the create(FlowFile) method of the ProcessSession. In this case, the offset of the original FlwoFile at which the new FlowFile's content should begin is passed as the second argument to the clone method. The length of the new FlowFile is passed as the third argument to the clone method. For example, if the original FlowFile was 10,000 bytes and we called clone(flowFile, 500, 100), the FlowFile that would be returned to us would be identical to flowFile with respect to its attributes. However, the content of the newly created FlowFile would be 100 bytes in length and would start at offset 500 of the original FlowFile. That is, the contents of the newly created FlowFile would be the same as if you had copied bytes 500 through 599 of the original FlowFile.

After the clone has been created, it is added to the List of FlowFiles.

This method is much more highly preferred than the Create method, when applicable, because no disk I/O is required. The framework is able to simply create a new FlowFile that references a subset of the original FlowFile's content, rather than actually copying the data. However, this is not always possible. For example, if header information must be copied from the beginning of the original FlowFile and added to the beginning of each Split, then this method is not possible.

Both Methods Regardless of whether the Clone Method or the Create Method is used, the following is applicable:

If at any point in the InputStreamCallback, a condition is reached in which processing cannot continue (for example, the input is malformed), a ProcessException should be thrown. The call to the ProcessSession's read method is wrapped in a try/catch block where ProcessException is caught. If an Exception is caught, a log message is generated explaining the error. The List of newly created FlowFiles is removed via the ProcessSession's remove method. The original FlowFile is routed to failure.

If no problems arise, the original FlowFile is routed to original and all newly created FlowFiles are updated to include the following attributes:

Attribute Name Description

split.parent.uuid

The UUID of the original FlowFile

split.index

A one-up number indicating which FlowFile in the list this is (the first FlowFile created will have a value 0, the second will have a value 1, etc.)

split.count

The total number of split FlowFiles that were created

The newly created FlowFiles are routed to success; this event is logged; and the method returns.