This reporting task can be used to issue SQL queries against various NiFi metrics information, modeled as tables, and transmit the query results to some specified destination. The query may make use of the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, CONNECTION_STATUS_PREDICTIONS, or PROVENANCE tables, and can use any functions or capabilities provided by Apache Calcite, including JOINs, aggregate functions, etc.
The results are transmitted to the destination using the configured Record Sink service, such as SiteToSiteReportingRecordSink (for sending via the Site-to-Site protocol) or DatabaseRecordSink (for sending the query result rows to an relational database).
The reporting task can uniquely handle items from the bulletin and provenance repositories. This means that an item will only be processed once when the query is set to unique. The query can be set to unique by defining a time window with special sql placeholders ($bulletinStartTime, $bulletinEndTime, $provenanceStartTime, $provenanceEndTime) that the reporting task will evaluate runtime. See the SQL Query Examples section.
Below is a list of definitions for all the "tables" supported by this reporting task. Note that these are not persistent/materialized tables, rather they are non-materialized views for which the sources are re-queried at every execution. This means that a query executed twice may return different results, for example if new status information is available, or in the case of JVM_METRICS (for example), a new snapshot of the JVM at query-time.
Column | Data Type |
id | String |
groupId | String |
name | String |
sourceId | String |
sourceName | String |
destinationId | String |
destinationName | String |
backPressureDataSizeThreshold | String |
backPressureBytesThreshold | long |
backPressureObjectThreshold | long |
isBackPressureEnabled | boolean |
inputCount | int |
inputBytes | long |
queuedCount | int |
queuedBytes | long |
outputCount | int |
outputBytes | long |
maxQueuedCount | int |
maxQueuedBytes | long |
Column | Data Type |
id | String |
groupId | String |
name | String |
processorType | String |
averageLineageDuration | long |
bytesRead | long |
bytesWritten | long |
bytesReceived | long |
bytesSent | long |
flowFilesRemoved | int |
flowFilesReceived | int |
flowFilesSent | int |
inputCount | int |
inputBytes | long |
outputCount | int |
outputBytes | long |
activeThreadCount | int |
terminatedThreadCount | int |
invocations | int |
processingNanos | long |
runStatus | String |
executionNode | String |
Column | Data Type |
bulletinId | long |
bulletinCategory | String |
bulletinGroupId | String |
bulletinGroupName | String |
bulletinGroupPath | String |
bulletinLevel | String |
bulletinMessage | String |
bulletinNodeAddress | String |
bulletinNodeId | String |
bulletinSourceId | String |
bulletinSourceName | String |
bulletinSourceType | String |
bulletinTimestamp | Date |
bulletinFlowFileUuid | String |
Column | Data Type |
id | String |
groupId | String |
name | String |
bytesRead | long |
bytesWritten | long |
bytesReceived | long |
bytesSent | long |
bytesTransferred | long |
flowFilesReceived | int |
flowFilesSent | int |
flowFilesTransferred | int |
inputContentSize | long |
inputCount | int |
outputContentSize | long |
outputCount | int |
queuedContentSize | long |
activeThreadCount | int |
terminatedThreadCount | int |
queuedCount | int |
versionedFlowState | String |
processingNanos | long |
The JVM_METRICS table has dynamic columns in the sense that the "garbage collector runs" and
"garbage collector time columns" appear for each Java garbage collector in the JVM.
The column names end with the name of the garbage collector substituted for the
expression below:
Column | Data Type |
jvm_daemon_thread_count | int |
jvm_thread_count | int |
jvm_thread_states_blocked | int |
jvm_thread_states_runnable | int |
jvm_thread_states_terminated | int |
jvm_thread_states_timed_waiting | int |
jvm_uptime | long |
jvm_head_used | double |
jvm_heap_usage | double |
jvm_non_heap_usage | double |
jvm_file_descriptor_usage | double |
jvm_gc_runs_<garbage_collector_name> | long |
jvm_gc_time_<garbage_collector_name> | long |
Column | Data Type |
connectionId | String |
predictedQueuedBytes | long |
predictedQueuedCount | int |
predictedPercentBytes | int |
predictedPercentCount | int |
predictedTimeToBytesBackpressureMillis | long |
predictedTimeToCountBackpressureMillis | long |
predictionIntervalMillis | long |
Column | Data Type |
eventId | long |
eventType | String |
timestampMillis | long |
durationMillis | long |
lineageStart | long |
details | String |
componentId | String |
componentName | String |
componentType | String |
processGroupId | String |
processGroupName | String |
entityId | String |
entityType | String |
entitySize | long |
previousEntitySize | long |
updatedAttributes | Map<String,String> |
previousAttributes | Map<String,String> |
contentPath | String |
previousContentPath | String |
parentIds | Array<String> |
childIds | Array<String> |
transitUri | String |
remoteIdentifier | String |
alternateIdentifier | String |
Column | Data Type |
actionId | int |
actionTimestamp | long |
actionUserIdentity | String |
actionSourceId | String |
actionSourceName | String |
actionSourceType | String |
actionOperation | String |
configureDetailsName | String |
configureDetailsPreviousValue | String |
configureDetailsValue | String |
connectionSourceId | String |
connectionSourceName | String |
connectionSourceType | String |
connectionDestinationId | String |
connectionDestinationName | String |
connectionDestinationType | String |
connectionRelationship | String |
moveGroup | String |
moveGroupId | String |
movePreviousGroup | String |
movePreviousGroupId | String |
purgeEndDate | long |
Example: Select all fields from the CONNECTION_STATUS
Example: Select connection IDs where time-to-backpressure (based on queue count) is less than 5 minutes:
SELECT connectionId FROM CONNECTION_STATUS_PREDICTIONS WHERE predictedTimeToCountBackpressureMillis < 300000
Example: Get the unique bulletin categories associated with errors:
SELECT DISTINCT(bulletinCategory) FROM BULLETINS WHERE bulletinLevel = "ERROR"
Example: Select all fields from the BULLETINS
table with time window:
SELECT * from BULLETINS WHERE bulletinTimestamp > $bulletinStartTime AND bulletinTimestamp <= $bulletinEndTime
Example: Select all fields from the PROVENANCE
table with time window:
SELECT * from PROVENANCE where timestampMillis > $provenanceStartTime and timestampMillis <= $provenanceEndTime
Example: Select connection-related fields from the FLOW_CONFIG_HISTORY
SELECT connectionSourceName, connectionDestinationName, connectionRelationship from FLOW_CONFIG_HISTORY