Route Streams Based on Content (One-to-Many)
The previous description of Route Based on Content (One-to-Many) provides an abstraction for creating a very powerful Processor. However, it assumes that each FlowFile will be routed in its entirety to zero or more Relationships. What if the incoming data format is a "stream" of many different pieces of information - and we want to send different pieces of this stream to different Relationships? For example, imagine that we want to have a RouteCSV Processor such that it is configured with multiple Regular Expressions. If a line in the CSV file matches a Regular Expression, that line should be included in the outbound FlowFile to the associated relationship. If a Regular Expression is associated with the Relationship "has-apples" and that Regular Expression matches 1,000 of the lines in the FlowFile, there should be one outbound FlowFile for the "has-apples" relationship that has 1,000 lines in it. If a different Regular Expression is associated with the Relationship "has-oranges" and that Regular Expression matches 50 lines in the FlowFile, there should be one outbound FlowFile for the "has-oranges" relationship that has 50 lines in it. I.e., one FlowFile comes in and two FlowFiles come out. The two FlowFiles may contain some of the same lines of text from the original FlowFile, or they may be entirely different. This is the type of Processor that we will discuss in this section.
This Processor's name starts with "Route" and ends with the name of the data type that it routes. In our example here, we are routing CSV data, so the Processor is named RouteCSV. This Processor supports dynamic properties. Each user-defined property has a name that maps to the name of a Relationship. The value of the Property is in the format necessary for the "Match Criteria." In our example, the value of the property must be a valid Regular Expression.
This Processor maintains an internal ConcurrentMap
where the key is a
Relationship
and the value is of a type dependent on the format of the
Match Criteria. In our example, we would maintain a ConcurrentMap<Relationship,
Pattern>
. This Processor overrides the onPropertyModified
method. If the new value supplied to this method (the third argument) is null, the
Relationship whose name is defined by the property name (the first argument) is removed from
the ConcurrentMap. Otherwise, the new value is processed (in our example, by calling
Pattern.compile(newValue)
) and this value is added to the ConcurrentMap
with the key again being the Relationship whose name is specified by the property
name.
This Processor will override the customValidate
method. In this
method, it will retrieve all Properties from the ValidationContext
and
count the number of PropertyDescriptors that are dynamic (by calling
isDynamic()
on the PropertyDescriptor). If the number of dynamic
PropertyDescriptors is 0, this indicates that the user has not added any Relationships, so
the Processor returns a ValidationResult
indicating that the Processor is
not valid because it has no Relationships added.
The Processor returns all of the Relationships specified by the user when its
getRelationships
method is called and will also return an
unmatched
Relationship. Because this Processor will have to read and
write to the Content Repository (which can be relatively expensive), if this Processor is
expected to be used for very high data volumes, it may be advantageous to add a Property
that allows the user to specify whether or not they care about the data that does not match
any of the Match Criteria.
When the onTrigger
method is called, the Processor obtains a FlowFile
via ProcessSession.get
. If no data is available, the Processor returns.
Otherwise, the Processor creates a Map<Relationship, FlowFile>
. We
will refer to this Map as flowFileMap
. The Processor reads the incoming
FlowFile by calling ProcessSession.read
and provides an
InputStreamCallback
. From within the Callback, the Processor reads the
first piece of data from the FlowFile. The Processor then evaluates each of the Match
Criteria against this piece of data. If a particular criteria (in our example, a Regular
Expression) matches, the Processor obtains the FlowFile from flowFileMap
that belongs to the appropriate Relationship. If no FlowFile yet exists in the Map for this
Relationship, the Processor creates a new FlowFile by calling
session.create(incomingFlowFile)
and then adds the new FlowFile to
flowFileMap
. The Processor then writes this piece of data to the
FlowFile by calling session.append
with an
OutputStreamCallback
. From within this OutputStreamCallback, we have
access to the new FlowFile's OutputStream, so we are able to write the data to the new
FlowFile. We then return from the OutputStreamCallback. After iterating over each of the
Match Criteria, if none of them match, we perform the same routines as above for the
unmatched
relationship (unless the user configures us to not write out
unmatched data). Now that we have called session.append
, we have a new
version of the FlowFile. As a result, we need to update our flowFileMap
to associate the Relationship with the new FlowFile.
If at any point, an Exception is thrown, we will need to route the incoming FlowFile to
failure
. We will also need to remove each of the newly created
FlowFiles, as we won't be transferring them anywhere. We can accomplish this by calling
session.remove(flowFileMap.values())
. At this point, we will log the
error and return.
Otherwise, if all is successful, we can now iterate through the
flowFileMap
and transfer each FlowFile to the corresponding
Relationship. The original FlowFile is then either removed or routed to an
original
relationship. For each of the newly created FlowFiles, we also
emit a Provenance ROUTE event indicating which Relationship the FlowFile went to. It is also
helpful to include in the details of the ROUTE event how many pieces of information were
included in this FlowFile. This allows DataFlow Managers to easily see when looking at the
Provenance Lineage view how many pieces of information went to each of the relationships for
a given input FlowFile.
Additionally, some Processors may need to "group" the data that is sent to
each Relationship so that each FlowFile that is sent to a relationship has the same value.
In our example, we may wan to allow the Regular Expression to have a Capturing Group and if
two different lines in the CSV match the Regular Expression but have different values for
the Capturing Group, we want them to be added to two different FlowFiles. The matching value
could then be added to each FlowFile as an Attribute. This can be accomplished by modifying
the flowFileMap
such that it is defined as Map<Relationship,
Map<T, FlowFile>>
where T
is the type of the
Grouping Function (in our example, the Group would be a String
because it
is the result of evaluating a Regular Expression's Capturing Group).