RocksDB FlowFile Repository

This implementation makes use of the RocksDB key-value store. It uses periodic synchronization to ensure that no created or received data is lost (as long as nifi.flowfile.repository.rocksdb.accept.data.loss is set false). In the event of a failure (e.g. power loss), work done on FlowFiles through the system (i.e. routing and transformation) may still be lost. Specifically, the record of these actions may be lost, reverting the affected FlowFiles to a previous, valid state. From there, they will resume their path through the flow as normal. This guarantee comes at the expense of a delay on operations that add new data to the system. This delay is configurable (as nifi.flowfile.repository.rocksdb.sync.period), and can be tuned to the individual system.

The configuration parameters for this repository fall in to two categories, "NiFi-centric" and "RocksDB-centric". The NiFi-centric settings have to do with the operations of the FlowFile Repository and its interaction with NiFi. The RocksDB-centric settings directly correlate to settings on the underlying RocksDB repo. More information on these settings can be found in the RocksDB documentation: https://github.com/facebook/rocksdb/wiki/RocksJava-Basics.

To use this implementation, set nifi.flowfile.repository.implementation to org.apache.nifi.controller.repository.RocksDBFlowFileRepository.

NiFi-centric Configuration Properties:

Property

Description

nifi.flowfile.repository.directory

The location of the FlowFile Repository. The default value is`./flowfile_repository`.

nifi.flowfile.repository.rocksdb.sync.warning.period

How often to log warnings if unable to sync. The default value is 30 seconds.

nifi.flowfile.repository.rocksdb.claim.cleanup.period

How often to mark content claims destructible (so they can be removed from the content repo). The default value is 30 seconds.

nifi.flowfile.repository.rocksdb.deserialization.threads

How many threads to use on startup restoring the FlowFile state. The default value is 16.

nifi.flowfile.repository.rocksdb.deserialization.buffer.size

Size of the buffer to use on startup restoring the FlowFile state. The default value is 1000.

nifi.flowfile.repository.rocksdb.sync.period

Frequency at which to force a sync to disk. This is the maximum period a data creation operation may block if nifi.flowfile.repository.rocksdb.accept.data.loss is false. The default value is 10 milliseconds.

nifi.flowfile.repository.rocksdb.accept.data.loss

Whether to accept the loss of received / created data. Setting this true increases throughput if loss of data is acceptable. The default value is false.

nifi.flowfile.repository.rocksdb.enable.stall.stop

Whether to enable the stall / stop of writes to the repository based on configured limits. Enabling this feature allows the system to protect itself by restricting (delaying or denying) operations that increase the total FlowFile count on the node to prevent the system from being overwhelmed. The default value is false.

nifi.flowfile.repository.rocksdb.stall.period

The period of time to stall when the specified criteria are encountered. The default value is 100 milliseconds.

nifi.flowfile.repository.rocksdb.stall.flowfile.count

The FlowFile count at which to begin stalling writes to the repo. The default value is 800000.

nifi.flowfile.repository.rocksdb.stall.heap.usage.percent

The heap usage at which to begin stalling writes to the repo. The default value is 95%.

nifi.flowfile.repository.rocksdb.stop.flowfile.count

The FlowFile count at which to begin stopping the creation of new FlowFiles. The default value is 1100000.

nifi.flowfile.repository.rocksdb.stop.heap.usage.percent

The heap usage at which to begin stopping the creation of new FlowFiles. The default value is 99.9%.

nifi.flowfile.repository.rocksdb.remove.orphaned.flowfiles.on.startup

Whether to allow the repository to remove FlowFiles it cannot identify on startup. As this is often the result of a configuration or synchronization error, it is disabled by default. This should only be enabled if you are absolutely certain you want to lose the data in question. The default value is false.

nifi.flowfile.repository.rocksdb.enable.recovery.mode

Whether to enable "recovery mode". This limits the number of FlowFiles loaded into the graph at a time, while not actually removing any FlowFiles (or content) from the system. This allows for the recovery of a system that is encountering OutOfMemory errors or similar on startup. This should not be enabled unless necessary to recover a system, and should be disabled as soon as that has been accomplished.

WARNING: While in recovery mode, do not make modifications to the graph. Changes to the graph may result in the inability to restore further FlowFiles from the repository. The default value is false.

nifi.flowfile.repository.rocksdb.recovery.mode.flowfile.count

The number of FlowFiles to load into the graph when in "recovery mode". As FlowFiles leave the system, additional FlowFiles will be loaded up to this limit. This setting does not prevent FlowFiles from coming into the system via normal means. The default value is 5000.

RocksDB-centric Configuration Properties:

Property

Description

nifi.flowfile.repository.rocksdb.parallel.threads

The number of threads to use for flush and compaction. A good value is the number of cores. See RockDB DBOptions.setIncreaseParallelism() for more information. The default value is 8.

nifi.flowfile.repository.rocksdb.max.write.buffer.number

The maximum number of write buffers that are built up in memory. See RockDB ColumnFamilyOptions.setMaxWriteBufferNumber() / max_write_buffer_number for more information. The default value is 4.

nifi.flowfile.repository.rocksdb.write.buffer.size

The amount of data to build up in memory before converting to a sorted on disk file. Larger values increase performance, especially during bulk loads. Up to max_write_buffer_number write buffers may be held in memory at the same time, so you may wish to adjust this parameter to control memory usage. See RockDB ColumnFamilyOptions.setWriteBufferSize() / write_buffer_size for more information. The default value is 256 MB.

nifi.flowfile.repository.rocksdb.level.0.slowdown.writes.trigger

A soft limit on number of level-0 files. Writes are slowed at this point. A values less than 0 means no write slow down will be triggered by the number of files in level-0. See RocksDB ColumnFamilyOptions.setLevel0SlowdownWritesTrigger() / level0_slowdown_writes_trigger for more information. The default value is 20.

nifi.flowfile.repository.rocksdb.level.0.stop.writes.trigger

The maximum number of level-0 files. Writes will be stopped at this point. See RocksDB ColumnFamilyOptions.setLevel0StopWritesTrigger() / level0_stop_writes_trigger for more information. The default value is 40.

nifi.flowfile.repository.rocksdb.delayed.write.bytes.per.second

The limited write rate to the DB if slowdown is triggered. RocksDB may decide to slow down more if the compaction gets behind further. See RocksDB DBOptions.setDelayedWriteRate() for more information. The default value is 16 MB.

nifi.flowfile.repository.rocksdb.max.background.flushes

Specifies the maximum number of concurrent background flush jobs. See RocksDB DBOptions.setMaxBackgroundFlushes() / max_background_flushes for more information. The default value is 1.

nifi.flowfile.repository.rocksdb.max.background.compactions

Specifies the maximum number of concurrent background compaction jobs. See RocksDB DBOptions.setMaxBackgroundCompactions() / max_background_compactions for more information. The default value is 1.

nifi.flowfile.repository.rocksdb.min.write.buffer.number.to.merge

The minimum number of write buffers to merge together before writing to storage. See RocksDB ColumnFamilyOptions.setMinWriteBufferNumberToMerge() / min_write_buffer_number_to_merge for more information. The default value is 1.

nifi.flowfile.repository.rocksdb.stat.dump.period

The period at which to dump rocksdb.stats to the log. See RocksDB DBOptions.setStatsDumpPeriodSec() / stats_dump_period_sec for more information. The default value is 600 sec.