Adding Functionality to Apache NiFi
Also available as:
PDF
loading table of contents...

Data Egress

A Processor that publishes data to an external source has two Relationships: success and failure. The Processor name starts with "Put" followed by the protocol that is used for data transmission. Processors that follow this pattern include PutEmail, PutSFTP, and PostHTTP (note that the name does not begin with "Put" because this would lead to confusion, since PUT and POST have special meanings when dealing with HTTP).

This Processor may create or initialize a Connection Pool in a method that uses the @OnScheduled annotation. However, because communications problems may prevent connections from being established or cause connections to be terminated, connections themselves are not created at this point. Rather, the connections are created or leased from the pool in the onTrigger method.

The onTrigger method first obtains a FlowFile from the ProcessSession via the get method. If no FlowFile is available, the method returns without obtaining a connection to the remote resource.

If at least one FlowFile is available, the Processor obtains a connection from the Connection Pool, if possible, or otherwise creates a new connection. If the Processor is neither able to lease a connection from the Connection Pool nor create a new connection, the FlowFile is routed to failure, the event is logged, and the method returns.

If a connection was obtained, the Processor obtains an InputStream to the FlowFile's content by invoking the read method on the ProcessSession and passing an InputStreamCallback (which is often an anonymous inner class) and from within that callback transmits the contents of the FlowFile to the destination. The event is logged along with the amount of time taken to transfer the file and the data rate at which the file was transferred. A SEND event is reported to the ProvenanceReporter by obtaining the reporter from the ProcessSession via the getProvenanceReporter method and calling the send method on the reporter. The connection is returned or added to the Connection Pool, depending on whether the connection was leased from the pool or newly created by the onTrigger method.

If there is a communications problem, the connection is typically terminated and not returned (or added) to the Connection Pool. If there is an issue sending the data to the remote resource, the desired approach for handling the error depends on a few considerations. If the issue is related to a network condition, the FlowFile is generally routed to failure. The FlowFile is not penalized because there is not necessary a problem with the data. Unlike the case of the Data Ingress Processor, we typically do not call yield on the ProcessContext. This is because in the case of ingest, the FlowFile does not exist until the Processor is able to perform its function. However, in the case of a Put Processor, the DataFlow Manager may choose to route failure to a different Processor. This can allow for a "backup" system to be used in the case of problems with one system or can be used for load distribution across many systems.

If a problem occurs that is data-related, one of two approaches should be taken. First, if the problem is likely to sort itself out, the FlowFile is penalized and then routed to failure. This is the case, for instance, with PutFTP, when a FlowFile cannot be transferred because of a file naming conflict. The presumption is that the file will eventually be removed from the directory so that the new file can be transferred. As a result, we penalize the FlowFile and route to failure so that we can try again later. In the other case, if there is an actual problem with the data (such as the data does not conform to some required specification), a different approach may be taken. In this case, it may be advantageous to break apart the failure relationship into a failure and a communications failure relationship. This allows the DataFlow Manager to determine how to handle each of these cases individually. It is important in these situations to document well the differences between the two Relationships by clarifying it in the "description" when creating the Relationship.

Connections to remote systems are torn down and the Connection Pool shutdown in a method annotated with @OnStopped so that resources can be reclaimed.