ReportLineageToAtlas

Table of contents:

Information reported to Atlas

This reporting task stores two types of NiFi flow information, 'NiFi flow structure' and 'NiFi data lineage'.

'NiFi flow structure' tells what components are running within a NiFi flow and how these are connected. It is reported by analyzing current NiFi flow structure, specifically NiFi component relationships.

'NiFi data lineage' tells what part of NiFi flow interacts with different DataSets such as HDFS files or Hive tables ... etc. It is reported by analyzing NiFi provenance events.

Technically each information is sent using different protocol, Atlas REST API v2, and Notification via a Kafka topic as shown in above image.

As both information types use the same NiFi Atlas Types and Namespaces concepts, it is recommended to start reading those sections first.

NiFi Atlas Types

This reporting task creates following NiFi specific types in Atlas Type system when it runs if these type definitions are not found.

Green boxes represent sub-types of DataSet and blue ones are sub-types of Process. Gray lines represent entity ownership. Red lines represent lineage.

Namespaces

An entity in Atlas can be identified by its GUID for any existing objects, or type name and unique attribute can be used if GUID is not known. Qualified name is commonly used as the unique attribute.

One Atlas instance can be used to manage multiple environments and objects in different environments may have the same name. For example, a Hive table 'request_logs' in two different clusters, 'cluster-A' and 'cluster-B'. For this reason the qualified names contain a so-called metadata namespace.

It's common practice to provide the cluster name as the namespace, but it can be any arbitrary string.

With this, a qualified name has 'componentId@namespace' format. E.g. A Hive table qualified name would be dbName.tableName@namespace (default.request_logs@cluster-A).

From this NiFi reporting task standpoint, a namespace is needed to be resolved at following situations:

To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from IP address or hostname to a namespace. The mapping can be defined by Dynamic Properties with a name in 'hostnamePattern.namespace' format, having its value as a set of Regular Expression Patterns to match IP addresses or host names to a particular namespace.

As an example, following mapping definition would resolve namespace 'namespace-A' for IP address such as '192.168.30.123' or hostname 'namenode1.a.example.com', and 'namespace-B' for '192.168.40.223' or 'nifi3.b.example.com'.

# Dynamic Property Name for namespace-A
hostnamePattern.namespace-A
# Value can have multiple Regular Expression patterns separated by new line
192\.168\.30\.\d+
[^\.]+\.a\.example\.com

# Dynamic Property Name for namespace-B
hostnamePattern.namespace-B
# Values
192\.168\.40\.\d+
[^\.]+\.b\.example\.com
        

If no namespace mapping matches, then a name defined at 'Atlas Default Metadata Namespace' is used.

NiFi flow structure

This section describes how a structure of NiFi flow is reported to Atlas.

Path Separation Logic

To provide a meaningful lineage granularity in Atlas, this reporting task divide a NiFi flow into paths. The logic has following concepts:

Based on these concepts, path separation is done by following steps:

  1. Select starting components (Processor and RootGroup InputPort) those do not have any input relationship from other Processors.
  2. For each starting component, create a 'nifi_flow_path'. The same path may already exist if other path arrived here before.
  3. Traverse outgoing relationships.
  4. If any Processor with more than 1 incoming Processor relationships is found, then split the component as new 'nifi_flow_path'. When starting as a new path, a 'nifi-queue' is created. The queue is added to the current path outputs, and the new path inputs. Back to step 2.
  5. Traverse outgoing paths as long as there is one.

NiFi data lineage

This section describes how NiFi data lineage is reported to Atlas.

NiFi Lineage Strategy

To meet different use-cases, this reporting task provides 'NiFi Lineage Strategy' property to control how to report Atlas the DataSet and Process lineage tracked by NiFi flow.

NOTE:It is recommended to try possible options to see which strategy meets your use-case before running the reporting task at a production environment. Different strategies create entities differently, and if multiple strategies are used (or switched from one another), Atlas lineage graph would be noisy. As many entities will be created by this reporting task over time, it might be troublesome to clean entities to change strategy afterward especially Atlas manages data reported by not only NiFi.

In order to test or debug how this reporting task behaves, Atlas Server Emulator may be useful, instead of sending data to a real Atlas.

To illustrate the difference between lineage strategies, let's look at a sample NiFi flow as shown in the screenshots below.

With 'Simple Path', Atlas lineage is reported like below when '/tmp/input/A1.csv' is selected. Since 'Simple Path' simply maps I/O events to a 'nifi_flow_path', '/tmp/output/B1.csv' is shown in the lineage graph because that file is written by the 'GetFile, PutFile...' process.

With 'Complete Path', Atlas lineage is reported like below. This time, 'GetFile, PutFile...' process is not linked to '/tmp/output/B1.csv' because 'Complete Path' strategy created two different 'nifi_flow_path' entities one for '/tmp/input/A1.csv -> /tmp/output/A1.csv' and another for '/tmp/input/B1.csv -> /tmp/output/B1.csv'.

However, once the data records ingested from A.csv and B.csv got into a bigger DataSet, 'nifi-test' Kafka topic in this example (or whatever DataSet such as a database table or a concatenated file ... etc), record level lineage telling where it came from is no longer able to be tracked. So the resulting '/tmp/consumed/B_2..' is shown in the same lineage graph, although file does not contain any data came from '/tmp/input/A1.csv'.

NiFi Provenance Event Analysis

To create lineage describing which NiFi component interacts with what DataSets, DataSet entity and Process entity need to be created in Atlas. Specifically, at least 3 entities are required to draw a lineage graph on Atlas UI. A Process entity, and a DataSet which is referred by a Process 'inputs' attribute, and a DataSet referred from 'outputs' attribute. For example:

            # With following entities
            guid: 1
            typeName: fs_path (extends DataSet)
            qualifiedName: /data/A1.csv@BranchOffice1

            guid: 2
            typeName: nifi_flow_path (extends Process)
            name: GetFile, PutHDFS
            qualifiedName: 529e6722-9b49-3b66-9c94-00da9863ca2d@BranchOffice1
            inputs: refer guid(1)
            outputs: refer guid(3)

            guid: 3
            typeName: hdfs_path (extends DataSet)
            qualifiedName: /data/input/A1.csv@Analytics

            # Atlas draws lineage graph
            /data/A1.csv -> GetFile, PutHDFS -> /data/input/A1.csv
        

To identify such Process and DataSet Atlas entities, this reporting task uses NiFi Provenance Events. At least, the reporting task needs to derive following information from a NiFi Provenance event record:

'namespace' in 'qualifiedName' attribute is resolved by mapping ip-address or hostname available at NiFi Provenance event 'transitUri' to a namespace. See Namespaces for detail.

For 'typeName' and 'qualifiedName', different analysis rules are needed for different DataSet. ReportLineageToAtlas provides an extension point called 'NiFiProvenanceEventAnalyzer' to implement such analysis logic for particular DataSets.

When a Provenance event is analyzed, registered NiFiProvenanceEventAnalyzer implementations are searched in following order to find a best matching analyzer implementation:

  1. By component type (e.g. KafkaTopic)
  2. By transit URI protocol (e.g. HDFSPath)
  3. By event type, if none of above analyzers matches (e.g. Create)

Supported DataSets and Processors

Currently, following NiFi components are supported by this reporting task:

Analyzer covered NiFi components Atlas DataSet Description
name eventType transitUri example typeName qualifiedName
NiFiRemotePort Remote Input Port SEND
  • http://nifi1.example.com:8080/nifi-api/data-transfer/input-ports/35dbc0ab-015e-1000-144c-a8d71255027d/transactions/89335043-f105-4de7-a0ef-46f2ef0c7c51/flow-files
  • nifi://nifi1.example.com:8081/cb729f05-b2ee-4488-909d-c6696cc34588
nifi_input_port rootGroupPortGUID@namespace (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1)
With 'Simple Path' strategy intermediate 'nifi_queue' and 'nifi_flow_path' are created as well (marked with + in the following example)
upstream (nifi_flow_path)
  -> + queue (nifi_queue)
  -> + Remote Input Port
     (nifi_flowPath)
  -> remote target port
     (nifi_input_port)
                    
remoteInputPortGUID@namespace
(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@ns1)

NOTE: The remoteInputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Input Ports can send to the same target remote input port.

Remote Output Port RECEIVE
  • http://nifi1.example.com:8080/nifi-api/data-transfer/output-ports/45dbc0ab-015e-1000-144c-a8d71255027d/transactions/99335043-f105-4de7-a0ef-46f2ef0c7c51/flow-files
  • nifi://nifi1.example.com:8081/db729f05-b2ee-4488-909d-c6696cc34588
nifi_output_port rootGroupPortGUID@namespace (e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@ns1)
With 'Simple Path' strategy intermediate 'nifi_flow_path' and 'nifi_queue' are created as well (marked with + in the following example)
remote target port
(nifi_output_port)
  -> + Remote Output Port
     (nifi_flow_path)
  -> + queue (nifi_queue)
  -> downstream
     (nifi_flow_path)
                    
  • For 'nifi_flow_path': remoteOutputPortGUID@namespace
    (e.g. 7375f8f6-4604-468d-144c-a8d71255027d@ns1)

    NOTE: The remoteOutputPortGUID is the client side component ID and different from the remote target port GUID. Multiple Remote Output Ports can pull from the same target remote output port.

  • For 'nifi_queue': downstreamPathGUID@namespace
    (e.g. bb530e58-ee14-3cac-144c-a8d71255027d@ns1)
NiFiRootGroupPort Root group Input Port
Root group Output Port
RECEIVE
SEND
  • http://nifi1.example.com:8080/nifi-api/data-transfer/input-ports/35dbc0ab-015e-1000-144c-a8d71255027d/transactions/89335043-f105-4de7-a0ef-46f2ef0c7c51/flow-files
  • nifi://nifi1.example.com:8081/cb729f05-b2ee-4488-909d-c6696cc34588
nifi_input_port
nifi_output_port
rootGroupPortGUID@namespace
(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1)
KafkaTopic PublishKafka
ConsumeKafka
PublishKafka_0_10
ConsumeKafka_0_10
PublishKafkaRecord_0_10
ConsumeKafkaRecord_0_10
SEND
RECEIVE
SEND
RECEIVE
SEND
RECEIVE
PLAINTEXT://kafka1.example.com:9092/sample-topic
(Protocol can be either PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL)
kafka_topic topicName@namespace
(e.g. testTopic@ns1)
NOTE:With Atlas earlier than 0.8.2, the same topic name in different clusters can not be created using the pre-built 'kafka_topic'. See ATLAS-2286.
PutHiveStreaming PutHiveStreaming SEND thrift://hive.example.com:9083 hive_table tableName@namespace
(e.g. myTable@ns1)
Hive2JDBC PutHiveQL
SelectHiveQL
SEND
RECEIVE, FETCH
jdbc:hive2://hive.example.com:10000/default hive_table tableName@namespace
(e.g. myTable@ns1)
The corresponding Processors parse Hive QL to set 'query.input.tables' and 'query.output.tables' FlowFile attributes. These attribute values are used to create qualified name.
HDFSPath DeleteHDFS
FetchHDFS
FetchParquet
GetHDFS
GetHDFSSequenceFile
PutHDFS
PutORC
PutParquet
REMOTE_INVOCATION
FETCH
FETCH
RECEIVE
RECEIVE
SEND
SEND
SEND
hdfs://nn.example.com:8020/user/nifi/5262553828219 hdfs_path /path/fileName@namespace
(e.g. /app/warehouse/hive/db/default@ns1)
AwsS3Directory DeleteHDFS
FetchHDFS
FetchParquet
GetHDFS
GetHDFSSequenceFile
PutHDFS
PutORC
PutParquet
REMOTE_INVOCATION
FETCH
FETCH
RECEIVE
RECEIVE
SEND
SEND
SEND
s3a://mybucket/mydir aws_s3_pseudo_dir s3UrlWithoutObjectName@namespace
(e.g. s3a://mybucket/mydir@ns1)
HBaseTable FetchHBaseRow
GetHBase
PutHBaseCell
PutHBaseJSON
PutHBaseRecord
ScanHBase
FETCH
RECEIVE
SEND
SEND
SEND
RECEIVE
hbase://hmaster.example.com:16000/tableA/rowX hbase_table tableName@namespace
(e.g. myTable@ns1)
FilePath PutFile
GetFile
... etc
SEND
RECEIVE
... etc
file:///tmp/a.txt fs_path /path/fileName@hostname
(e.g. /tmp/dir/filename.txt@host.example.com)
unknown.Create
Receive, Fetch
Send, RemoteInvocation
Other Processors those generates listed event types CREATE
RECEIVE
FETCH
SEND
REMOTE_INVOCATION
nifi_data processorGuid@namespace
db8bb12c-5cd3-3011-c971-579f460ebedf@ns1

How it runs in NiFi cluster

When this reporting task runs in a NiFi cluster, following tasks are executed only by the primary node: While every node (including primary node) performs following:

Limitations

Atlas Server Configurations

Atlas Server Emulator

If you have Apache NiFi project source code on your local machine, you can run Atlas Server Emulator which is included in 'nifi-atlas-reporting-task' test module. The emulator listens on 21000 port for Atlas REST API v2, and 9092 port for Kafka by default. A running NiFi instance can use the emulator to report information from this reporting task. It can be helpful when you need to debug how the reporting task works, or try out different reporting strategies.

See Apache Atlas Server Emulator readme file for further details.