Summary

This reporting task can be used to issue SQL queries against various NiFi metrics information, modeled as tables, and transmit the query results to some specified destination. The query may make use of the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions or capabilities provided by Apache Calcite, including JOINs, aggregate functions, etc.

The results are transmitted to the destination using the configured Record Sink service, such as SiteToSiteReportingRecordSink (for sending via the Site-to-Site protocol) or DatabaseRecordSink (for sending the query result rows to an relational database).

The reporting task can uniquely handle items from the bulletin and provenance repositories. This means that an item will only be processed once when the query is set to unique. The query can be set to unique by defining a time window with special sql placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime, $provenanceEndTime) that the reporting task will evaluate runtime. See the SQL Query Examples section.


Table Definitions

Below is a list of definitions for all the "tables" supported by this reporting task. Note that these are not persistent/materialized tables, rather they are non-materialized views for which the sources are re-queried at every execution. This means that a query executed twice may return different results, for example if new status information is available, or in the case of JVM_METRICS (for example), a new snapshot of the JVM at query-time.


CONNECTION_STATUS

ColumnData Type
idString
groupIdString
nameString
sourceIdString
sourceNameString
destinationIdString
destinationNameString
backPressureDataSizeThresholdString
backPressureBytesThresholdlong
backPressureObjectThresholdlong
isBackPressureEnabledboolean
inputCountint
inputByteslong
queuedCountint
queuedByteslong
outputCountint
outputByteslong
maxQueuedCountint
maxQueuedByteslong

PROCESSOR_STATUS

ColumnData Type
idString
groupIdString
nameString
processorTypeString
averageLineageDurationlong
bytesReadlong
bytesWrittenlong
bytesReceivedlong
bytesSentlong
flowFilesRemovedint
flowFilesReceivedint
flowFilesSentint
inputCountint
inputByteslong
outputCountint
outputByteslong
activeThreadCountint
terminatedThreadCountint
invocationsint
processingNanoslong
runStatusString
executionNodeString

BULLETINS

ColumnData Type
bulletinIdlong
bulletinCategoryString
bulletinGroupIdString
bulletinGroupNameString
bulletinGroupPathString
bulletinLevelString
bulletinMessageString
bulletinNodeAddressString
bulletinNodeIdString
bulletinSourceIdString
bulletinSourceNameString
bulletinSourceTypeString
bulletinTimestampDate
bulletinFlowFileUuidString

PROCESS_GROUP_STATUS

ColumnData Type
idString
groupIdString
nameString
bytesReadlong
bytesWrittenlong
bytesReceivedlong
bytesSentlong
bytesTransferredlong
flowFilesReceivedint
flowFilesSentint
flowFilesTransferredint
inputContentSizelong
inputCountint
outputContentSizelong
outputCountint
queuedContentSizelong
activeThreadCountint
terminatedThreadCountint
queuedCountint
versionedFlowStateString
processingNanoslong

JVM_METRICS

The JVM_METRICS table has dynamic columns in the sense that the "garbage collector runs" and "garbage collector time columns" appear for each Java garbage collector in the JVM.
The column names end with the name of the garbage collector substituted for the <garbage_collector_name> expression below:

ColumnData Type
jvm_daemon_thread_countint
jvm_thread_countint
jvm_thread_states_blockedint
jvm_thread_states_runnableint
jvm_thread_states_terminatedint
jvm_thread_states_timed_waitingint
jvm_uptimelong
jvm_head_useddouble
jvm_heap_usagedouble
jvm_non_heap_usagedouble
jvm_file_descriptor_usagedouble
jvm_gc_runs_<garbage_collector_name>long
jvm_gc_time_<garbage_collector_name>long

CONNECTION_STATUS_PREDICTIONS

ColumnData Type
connectionIdString
predictedQueuedByteslong
predictedQueuedCountint
predictedPercentBytesint
predictedPercentCountint
predictedTimeToBytesBackpressureMillislong
predictedTimeToCountBackpressureMillislong
predictionIntervalMillislong

PROVENANCE

ColumnData Type
eventIdlong
eventTypeString
timestampMillislong
durationMillislong
lineageStartlong
detailsString
componentIdString
componentNameString
componentTypeString
processGroupIdString
processGroupNameString
entityIdString
entityTypeString
entitySizelong
previousEntitySizelong
updatedAttributesMap<String,String>
previousAttributesMap<String,String>
contentPathString
previousContentPathString
parentIdsArray<String>
childIdsArray<String>
transitUriString
remoteIdentifierString
alternateIdentifierString

FLOW_CONFIG_HISTORY

ColumnData Type
actionIdint
actionTimestamplong
actionUserIdentityString
actionSourceIdString
actionSourceNameString
actionSourceTypeString
actionOperationString
configureDetailsNameString
configureDetailsPreviousValueString
configureDetailsValueString
connectionSourceIdString
connectionSourceNameString
connectionSourceTypeString
connectionDestinationIdString
connectionDestinationNameString
connectionDestinationTypeString
connectionRelationshipString
moveGroupString
moveGroupIdString
movePreviousGroupString
movePreviousGroupIdString
purgeEndDatelong


SQL Query Examples

Example: Select all fields from the CONNECTION_STATUS table:

SELECT * FROM CONNECTION_STATUS


Example: Select connection IDs where time-to-backpressure (based on queue count) is less than 5 minutes:

SELECT connectionId FROM CONNECTION_STATUS_PREDICTIONS WHERE predictedTimeToCountBackpressureMillis < 300000


Example: Get the unique bulletin categories associated with errors:

SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = "ERROR"

Example: Select all fields from the BULLETINS table with time window:

SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND bulletinTimestamp <= $bulletinEndTime

Example: Select all fields from the PROVENANCE table with time window:

SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime

Example: Select connection-related fields from the FLOW_CONFIG_HISTORY table:

SELECT connectionSourceName, connectionDestinationName, connectionRelationship from FLOW_CONFIG_HISTORY