Data Model

This processor executes an Apache Flume sink. FlowFiles are wrapped in Flume's Event interface. The content of the FlowFile becomes the body of the Event and the attributes of the FlowFile become Event headers. The following special headers are also set:

Flume Event Header FlowFile Attribute
nifi.entry.date FlowFile#getEntryDate()
nifi.id FlowFile#getId()
nifi.last.queue.date FlowFile#getLastQueueDate()
nifi.lineage.start.date FlowFile#getLineageStartDate()
nifi.size FlowFile#getSize()

Warning

In NiFi, the contents of a FlowFile are accessed via a stream, but in Flume it is stored in a byte array. This means the full content will be loaded into memory when a FlowFile is processed by the ExecuteFlumeSink processor. You should consider the typical size of the FlowFiles you'll process and the batch size, if any, your sink is configured with when setting NiFi's heap size.

Configuration Details

This processor is designed to execute arbitrary Flume sinks. Most of the details of configuring the sink is deferred to Flume's built-in configuration system. For details on the available settings for each sink type, refer to the Flume User Guide. Configuring the Flume sink is a four step process:

  1. Set the Sink Type property to a valid Flume sink type.
  2. Set the Agent Name property to the name of the agent in your Flume configuration. This is the prefix of the properties in the Flume configuration file. Example: tier1
  3. Set the Sink Name property to the name of the sink in your Flume configuration. If Agent Name is tier1, then the Sink Name is the value of the tier1.sinks property. Example: sink-1
  4. Copy and paste the configuration for the sink from your Flume configuration file into the Flume Configuration property. Assuming you're using the same Agent Name and Sink Name as in the examples above, this will be all of the properties that start with tier1.sinks.sink-1. Do not copy the tier1.sinks.sink-1.type or tier1.sinks.sink-1.channel properties.

Usage Example

Assuming you had the following existing Flume configuration file:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Then you'd configure the ExecuteFlumeSink as follows:

Property Value
Sink Type hdfs
Agent Name a1
Sink Name k1
Flume Configuration a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute