Data Ingress
A Processor that ingests data into NiFi has a single Relationship named
success
. This Processor generates new FlowFiles via the
ProcessSession create
method and does not pull FlowFiles from incoming
Connections. The Processor name starts with "Get" or "Listen,"
depending on whether it polls an external source or exposes some interface to which
external sources can connect. The name ends with the protocol used for communications.
Processors that follow this pattern include GetFile
,
GetSFTP
, ListenHTTP
, and
GetHTTP
.
This Processor may create or initialize a Connection Pool in a method that uses the
@OnScheduled
annotation. However, because communications problems may
prevent connections from being established or cause connections to be terminated,
connections themselves are not created at this point. Rather, the connections are created
or leased from the pool in the onTrigger
method.
The onTrigger
method of this Processor begins by leasing a
connection from the Connection Pool, if possible, or otherwise creates a connection to the
external service. When no data is available from the external source, the
yield
method of the ProcessContext is called by the Processor and the
method returns so that this Processor avoids continually running and depleting resources
without benefit. Otherwise, this Processor then creates a FlowFile via the
ProcessSession's create
method and assigns an appropriate filename
and path to the FlowFile (by adding the filename
and
path
attributes), as well as any other attributes that may be
appropriate. An OutputStream to the FlowFile's content is obtained via the
ProcessSession's write
method, passing a new OutputStreamCallback
(which is usually an anonymous inner class). From within this callback, the Processor is
able to write to the FlowFile and streams the content from the external resource to the
FlowFile's OutputStream. If the desire is to write the entire contents of an
InputStream to the FlowFile, the importFrom
method of ProcessSession
may be more convenient to use than the write
method.
When this Processor expects to receive many small files, it may be advisable to create several FlowFiles from a single session before committing the session. Typically, this allows the Framework to treat the content of the newly created FlowFiles much more efficiently.
This Processor generates a Provenance event indicating that it has received data and specifies from where the data came. This Processor should log the creation of the FlowFile so that the FlowFile's origin can be determined by analyzing logs, if necessary.
This Processor acknowledges receipt of the data and/or removes the data from the
external source in order to prevent receipt of duplicate files. This
is done only after the ProcessSession by which the FlowFile was created has been
committed! Failure to adhere to this principle may result in data loss, as
restarting NiFi before the session has been committed will result in the temporary file
being deleted. Note, however, that it is possible using this approach to receive duplicate
data because the application could be restarted after committing the session and before
acknowledging or removing the data from the external source. In general, though, potential
data duplication is preferred over potential data loss. The connection is finally returned
or added to the Connection Pool, depending on whether the connection was leased from the
Connection Pool to begin with or was created in the onTrigger
method.
If there is a communications problem, the connection is typically terminated and not
returned (or added) to the Connection Pool. Connections to remote systems are torn down
and the Connection Pool shutdown in a method annotated with the
@OnStopped
annotation so that resources can be reclaimed.