Route Streams Based on Content (One-to-Many)
The previous description of Route Based on Content (One-to-Many) provides an abstraction for creating a very powerful Processor. However, it assumes that each FlowFile will be routed in its entirety to zero or more Relationships. What if the incoming data format is a "stream" of many different pieces of information - and we want to send different pieces of this stream to different Relationships? For example, imagine that we want to have a RouteCSV Processor such that it is configured with multiple Regular Expressions. If a line in the CSV file matches a Regular Expression, that line should be included in the outbound FlowFile to the associated relationship. If a Regular Expression is associated with the Relationship "has-apples" and that Regular Expression matches 1,000 of the lines in the FlowFile, there should be one outbound FlowFile for the "has-apples" relationship that has 1,000 lines in it. If a different Regular Expression is associated with the Relationship "has-oranges" and that Regular Expression matches 50 lines in the FlowFile, there should be one outbound FlowFile for the "has-oranges" relationship that has 50 lines in it. I.e., one FlowFile comes in and two FlowFiles come out. The two FlowFiles may contain some of the same lines of text from the original FlowFile, or they may be entirely different. This is the type of Processor that we will discuss in this section.
This Processor's name starts with "Route" and ends with the name of the data type that it routes. In our example here, we are routing CSV data, so the Processor is named RouteCSV. This Processor supports dynamic properties. Each user-defined property has a name that maps to the name of a Relationship. The value of the Property is in the format necessary for the "Match Criteria." In our example, the value of the property must be a valid Regular Expression.
This Processor maintains an internal ConcurrentMap
where the key is
a Relationship
and the value is of a type dependent on the format of
the Match Criteria. In our example, we would maintain a
ConcurrentMap<Relationship, Pattern>
. This Processor overrides
the onPropertyModified
method. If the new value supplied to this method
(the third argument) is null, the Relationship whose name is defined by the property name
(the first argument) is removed from the ConcurrentMap. Otherwise, the new value is
processed (in our example, by calling Pattern.compile(newValue)
) and
this value is added to the ConcurrentMap with the key again being the Relationship whose
name is specified by the property name.
This Processor will override the customValidate
method. In this
method, it will retrieve all Properties from the ValidationContext
and
count the number of PropertyDescriptors that are dynamic (by calling
isDynamic()
on the PropertyDescriptor). If the number of dynamic
PropertyDescriptors is 0, this indicates that the user has not added any Relationships, so
the Processor returns a ValidationResult
indicating that the Processor
is not valid because it has no Relationships added.
The Processor returns all of the Relationships specified by the user when its
getRelationships
method is called and will also return an
unmatched
Relationship. Because this Processor will have to read and
write to the Content Repository (which can be relatively expensive), if this Processor is
expected to be used for very high data volumes, it may be advantageous to add a Property
that allows the user to specify whether or not they care about the data that does not
match any of the Match Criteria.
When the onTrigger
method is called, the Processor obtains a
FlowFile via ProcessSession.get
. If no data is available, the Processor
returns. Otherwise, the Processor creates a Map<Relationship,
FlowFile>
. We will refer to this Map as flowFileMap
. The
Processor reads the incoming FlowFile by calling ProcessSession.read
and provides an InputStreamCallback
. From within the Callback, the
Processor reads the first piece of data from the FlowFile. The Processor then evaluates
each of the Match Criteria against this piece of data. If a particular criteria (in our
example, a Regular Expression) matches, the Processor obtains the FlowFile from
flowFileMap
that belongs to the appropriate Relationship. If no
FlowFile yet exists in the Map for this Relationship, the Processor creates a new FlowFile
by calling session.create(incomingFlowFile)
and then adds the new
FlowFile to flowFileMap
. The Processor then writes this piece of data
to the FlowFile by calling session.append
with an
OutputStreamCallback
. From within this OutputStreamCallback, we have
access to the new FlowFile's OutputStream, so we are able to write the data to the
new FlowFile. We then return from the OutputStreamCallback. After iterating over each of
the Match Criteria, if none of them match, we perform the same routines as above for the
unmatched
relationship (unless the user configures us to not write
out unmatched data). Now that we have called session.append
, we have a
new version of the FlowFile. As a result, we need to update our
flowFileMap
to associate the Relationship with the new
FlowFile.
If at any point, an Exception is thrown, we will need to route the incoming FlowFile
to failure
. We will also need to remove each of the newly created
FlowFiles, as we won't be transferring them anywhere. We can accomplish this by
calling session.remove(flowFileMap.values())
. At this point, we will
log the error and return.
Otherwise, if all is successful, we can now iterate through the
flowFileMap
and transfer each FlowFile to the corresponding
Relationship. The original FlowFile is then either removed or routed to an
original
relationship. For each of the newly created FlowFiles, we
also emit a Provenance ROUTE event indicating which Relationship the FlowFile went to. It
is also helpful to include in the details of the ROUTE event how many pieces of
information were included in this FlowFile. This allows DataFlow Managers to easily see
when looking at the Provenance Lineage view how many pieces of information went to each of
the relationships for a given input FlowFile.
Additionally, some Processors may need to "group" the data that is sent to
each Relationship so that each FlowFile that is sent to a relationship has the same value.
In our example, we may wan to allow the Regular Expression to have a Capturing Group and
if two different lines in the CSV match the Regular Expression but have different values
for the Capturing Group, we want them to be added to two different FlowFiles. The matching
value could then be added to each FlowFile as an Attribute. This can be accomplished by
modifying the flowFileMap
such that it is defined as
Map<Relationship, Map<T, FlowFile>>
where
T
is the type of the Grouping Function (in our example, the Group
would be a String
because it is the result of evaluating a Regular
Expression's Capturing Group).