ReportLineageToAtlas 2.3.0.4.10.0.0-147

Bundle
org.apache.nifi | nifi-atlas-nar
Description
Report NiFi flow data set level lineage to Apache Atlas. End-to-end lineages across NiFi environments and other systems can be reported if those are connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc. Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets, in addition to NiFi provenance events providing detailed event level lineage. See 'Additional Details' for further description and limitations.
Tags
atlas, lineage
Input Requirement
Supports Sensitive Dynamic Properties
false
  • Additional Details for ReportLineageToAtlas 2.3.0.4.10.0.0-147

    ReportLineageToAtlas

    Table of contents:

    Information reported to Atlas

    This reporting task stores two types of NiFi flow information, ‘NiFi flow structure’ and ‘NiFi data lineage’.

    ‘NiFi flow structure’ tells what components are running within a NiFi flow and how these are connected. It is reported by analyzing current NiFi flow structure, specifically NiFi component relationships.

    ‘NiFi data lineage’ tells what part of NiFi flow interacts with different DataSets such as HDFS files or Hive tables … etc. It is reported by analyzing NiFi provenance events.

    Technically each information is sent using Atlas REST API v2 as shown in above image.

    As both information types use the same NiFi Atlas Types and Namespaces concepts, it is recommended to start reading those sections first.

    NiFi Atlas Types

    This reporting task creates following NiFi specific types in Atlas Type system when it runs if these type definitions are not found.

    Green boxes represent sub-types of DataSet and blue ones are sub-types of Process. Gray lines represent entity ownership. Red lines represent lineage.

    • nifi_flow Represents a NiFi data flow.

    As shown in the above diagram, nifi_flow owns other nifi_component types. This owning relationship is defined by Atlas ‘owned’ constraint so that when a ’nifi_flow’ entity is removed, all owned NiFi component entities are removed in cascading manner.

    When this reporting task runs, it analyzes and traverse the entire flow structure, and create NiFi component entities in Atlas. At later runs, it compares the current flow structure with the one stored in Atlas to figure out if any changes have been made since the last time the flow was reported. The reporting task updates NiFi component entities in Atlas if needed. NiFi components those are removed from a NiFi flow also get deleted from Atlas. However, those entities can still be seen in Atlas search results or lineage graphs since Atlas uses ‘Soft Delete’ by default. See Atlas Delete Handler for further detail.

    Attributes:

    • qualifiedName: Root ProcessGroup ID@namespace (e.g. 86420a14-2fab-3e1e-4331-fb6ab42f58e0@ns1)
    • name: Name of the Root ProcessGroup.
    • url: URL of the NiFi instance. This can be specified via reporting task ‘NiFi URL for Atlas’ property.
    • nifi_flow_path Part of a NiFi data flow containing one or more processing NiFi components such as Processors and Remote Ports. The reporting task divides a NiFi flow into multiple flow paths. See Path Separation Logic for details.

    Attributes:

    • qualifiedName: The first NiFi component Id in a path@namespace (e.g. 529e6722-9b49-3b66-9c94-00da9863ca2d@ns1)
    • name: NiFi component names within a path are concatenated (e.g. GenerateFlowFile, PutFile, LogAttribute)
    • url: A deep link to the first NiFi component in corresponding NiFi UI
    • nifi_input/output_port Represents a Remote Port which can be accessed by RemoteProcessGroup via Site-to-Site protocol.

    Attributes:

    • qualifiedName: Port ID@namespace (e.g. 3f6d405e-6e3d-38c9-c5af-ce158f8e593d@ns1)
    • name: Name of the Port.
    • nifi_data Represents Unknown DataSets created by CREATE/SEND/RECEIVE NiFi provenance events those do not have particular provenance event analyzer.

    Attributes:

    • qualifiedName: ID of a Processor which generated the provenance event@namespace (e.g. db8bb12c-5cd3-3011-c971-579f460ebedf@ns1)
    • name: Name of the Processor.
    • nifi_queue A internal DataSet of NiFi flows which connects nifi_flow_paths. Atlas lineage graph requires a DataSet in between Process entities.

    Attributes:

    • qualifiedName: ID of the first Processor in the destination nifi_flow_path.
    • name: Name of the Processor.

    Namespaces

    An entity in Atlas can be identified by its GUID for any existing objects, or type name and unique attribute can be used if GUID is not known. Qualified name is commonly used as the unique attribute.

    One Atlas instance can be used to manage multiple environments and objects in different environments may have the same name. For example, a Hive table ‘request_logs’ in two different clusters, ‘cluster-A’ and ‘cluster-B’. For this reason the qualified names contain a so-called metadata namespace.

    It is common practice to provide the cluster name as the namespace, but it can be any arbitrary string.

    With this, a qualified name has ‘componentId@namespace’ format. E.g. A Hive table qualified name would be dbName.tableName@namespace (default.request_logs@cluster-A).

    From this NiFi reporting task standpoint, a namespace is needed to be resolved at following situations:

    • To register NiFi component entities. Which namespace should be used to represent the current NiFi environment?
    • To create lineages from NiFi component to other DataSets. Which environment does the DataSet resides in?

    To answer such questions, ReportLineageToAtlas reporting task provides a way to define mappings from IP address or hostname to a namespace. The mapping can be defined by Dynamic Properties with a name in ‘hostnamePattern.namespace’ format, having its value as a set of Regular Expression Patterns to match IP addresses or host names to a particular namespace.

    As an example, following mapping definition would resolve namespace ’namespace-A’ for IP address such as ‘192.168.30.123’ or hostname ’namenode1.a.example.com’, and ’namespace-B’ for ‘192.168.40.223’ or ’nifi3.b.example.com’.

    
    # Dynamic Property Name for namespace-A
    hostnamePattern.namespace-A
    # Value can have multiple Regular Expression patterns separated by new line
    192\.168\.30\.\d+
    [^\.]+\.a\.example\.com
    
    # Dynamic Property Name for namespace-B
    hostnamePattern.namespace-B
    # Values
    192\.168\.40\.\d+
    [^\.]+\.b\.example\.com
            
    

    If no namespace mapping matches, then a name defined at ‘Atlas Default Metadata Namespace’ is used.

    NiFi flow structure

    This section describes how a structure of NiFi flow is reported to Atlas.

    Path Separation Logic

    To provide a meaningful lineage granularity in Atlas, this reporting task divide a NiFi flow into paths. The logic has the following concepts:

    • Focuses only on Processors and Remote Ports. Local Input/Output Ports, Process Group hierarchy or Funnels do not contribute path separation.

    For example, following two flows are identical in path separation logic:

    + ```
    Remote Input Port -> Processor 0 -> Funnel -> Processor 1 -> Local Input Port -> Processor 2
    ```
    + ```
    Remote Input Port -> Processor 0 -> Processor 1 -> Processor 2
    ```Both flows will be treated as a single path that consists of Remote Input Port, Processor 0, 1 and 2\.
    
    • Any Processor with multiple incoming relationships from other Processors is treated like a ‘Common route’ or ‘Functional route’, and is managed as a separate path.

    For example, following flow:

    Processor 0 -> Processor 1 -> Processor 2
    Processor 3 -> Processor2
    

    Will produce following paths as result:

    Processor 0, 1
    Processor 2
    Processor 3
    
    • Self cyclic relationships are ignored.

    Based on these concepts, path separation is done by following steps:

    1. Select starting components (Processors and Remote Ports) those do not have any input relationship from other Processors.
    2. For each starting component, create a ’nifi_flow_path’. The same path may already exist if other path arrived here before.
    3. Traverse outgoing relationships.
    4. If any Processor with more than 1 incoming Processor relationships is found, then split the component as new ’nifi_flow_path’. When starting as a new path, a ’nifi-queue’ is created. The queue is added to the current path outputs, and the new path inputs. Back to step 2.
    5. Traverse outgoing paths as long as there is one.

    NiFi data lineage

    This section describes how NiFi data lineage is reported to Atlas.

    NiFi Lineage Strategy

    To meet different use-cases, this reporting task provides ‘NiFi Lineage Strategy’ property to control how to report Atlas the DataSet and Process lineage tracked by NiFi flow.

    NOTE: It is recommended to try possible options to see which strategy meets your use-case before running the reporting task at a production environment. Different strategies create entities differently, and if multiple strategies are used (or switched from one another), Atlas lineage graph would be noisy. As many entities will be created by this reporting task over time, it might be troublesome to clean entities to change strategy afterward, especially Atlas manages data reported by not only NiFi.

    • Simple Path Maps data I/O provenance events such as SEND/RECEIVE to ’nifi_flow_path’ created by NiFi flow structure analysis.

    It tracks DataSet lineage at ’nifi_flow_path’ process level, instead of event level, to report a simple data lineage graph in Atlas. If different DataSets go through the same ’nifi_flow_path’, all of those input DataSets are shown as if it is impacting every output DataSets. For example, if there are A.txt and B.txt processed by the same GetFile processor then eventually ingested to HDFS path-A and path-B respectively by PutHDFS using NiFi Expression Language to decide where to store FlowFiles. Then Atlas lineage graph will show as if both A.txt and B.txt are ingested to HDFS path-A, when you pick path-A to see which DataSets are ingested into it, because both A.txt and B.txt went through the same GetFile and PutHDFS processors.

    This strategy generates the least amount of data in Atlas. It might be useful when you prefer a big picture in Atlas that can summarize how each DataSets and Processes are connected among NiFi and other software. NiFi provenance events can be used to investigate more details if needed as it stores event (FlowFile) level complete lineage.

    • Complete Path Focuses on DROP provenance event type. Because it represents the end of a particular FlowFile lifecycle. By traversing provenance events backward from a DROP event, the entire lineage can be reported for a given FlowFile including where it is created, then where it goes.

    However, reporting complete flow path for every single FlowFile will produce too many entities in Atlas. Also, it may not be the best approach for Atlas as it is designed to manage DataSet level lineage rather than event level as of today. In order to keep the amount of data at minimum, this strategy calculates hash from Input and Output DataSets of a lineage path, so that the same complete path routes will become the same Atlas entity.

    If different FlowFiles went through the exact same route, then those provenance data only create a single ’nifi_flow_path’ Atlas entity. On the other hand, a single part of NiFi flow can generate different FlowFile lineage paths, those will be reported as different ’nifi_flow_path’ entities. Typically when NiFi Expression Language is used for NiFi Processor configuration to connect DataSets.

    NOTE: While Simple Path strategy can report lineage by looking at each individual NiFi provenance event record, Complete Path strategy has to query parent events. It needs more computing resource (CPU and I/O) when NiFi provenance event queries are performed.

    To illustrate the difference between lineage strategies, let’s look at a sample NiFi flow as shown in the screenshots below.

    With ‘Simple Path’, Atlas lineage is reported like below when ‘/tmp/input/A1.csv’ is selected. Since ‘Simple Path’ simply maps I/O events to a ’nifi_flow_path’, ‘/tmp/output/B1.csv’ is shown in the lineage graph because that file is written by the ‘GetFile, PutFile…’ process.

    With ‘Complete Path’, Atlas lineage is reported like below. This time, ‘GetFile, PutFile…’ process is not linked to ‘/tmp/output/B1.csv’ because ‘Complete Path’ strategy created two different ’nifi_flow_path’ entities one for ‘/tmp/input/A1.csv -> /tmp/output/A1.csv’ and another for ‘/tmp/input/B1.csv -> /tmp/output/B1.csv’.

    However, once the data records ingested from A.csv and B.csv got into a bigger DataSet, ’nifi-test’ Kafka topic in this example (or whatever DataSet such as a database table or a concatenated file … etc), record level lineage telling where it came from is no longer able to be tracked. So the resulting ‘/tmp/consumed/B_2..’ is shown in the same lineage graph, although file does not contain any data came from ‘/tmp/input/A1.csv’.

    NiFi Provenance Event Analysis

    To create lineage describing which NiFi component interacts with what DataSets, DataSet entity and Process entity need to be created in Atlas. Specifically, at least 3 entities are required to draw a lineage graph on Atlas UI. A Process entity, and a DataSet which is referred by a Process ‘inputs’ attribute, and a DataSet referred from ‘outputs’ attribute. For example:

    
                # With following entities
                guid: 1
                typeName: fs_path (extends DataSet)
                qualifiedName: /data/A1.csv@BranchOffice1
    
                guid: 2
                typeName: nifi_flow_path (extends Process)
                name: GetFile, PutHDFS
                qualifiedName: 529e6722-9b49-3b66-9c94-00da9863ca2d@BranchOffice1
                inputs: refer guid(1)
                outputs: refer guid(3)
    
                guid: 3
                typeName: hdfs_path (extends DataSet)
                qualifiedName: /data/input/A1.csv@Analytics
    
                # Atlas draws lineage graph
                /data/A1.csv -> GetFile, PutHDFS -> /data/input/A1.csv
            
    

    To identify such Process and DataSet Atlas entities, this reporting task uses NiFi Provenance Events. At least, the reporting task needs to derive following information from a NiFi Provenance event record:

    • typeName (e.g. fs_path, hive_table)
    • qualifiedName in uniqueId@namespace (e.g. /data/A1.csv@ns1)

    ’namespace’ in ‘qualifiedName’ attribute is resolved by mapping IP address or hostname available at NiFi Provenance event ’transitUri’ to a namespace. See Namespaces for detail.

    For ’typeName’ and ‘qualifiedName’, different analysis rules are needed for different DataSet. ReportLineageToAtlas provides an extension point called ‘NiFiProvenanceEventAnalyzer’ to implement such analysis logic for particular DataSets.

    When a Provenance event is analyzed, registered NiFiProvenanceEventAnalyzer implementations are searched in following order to find a best matching analyzer implementation:

    1. By component type (e.g. KafkaTopic)
    2. By transit URI protocol (e.g. HDFSPath)
    3. By event type, if none of above analyzers matches (e.g. Create)

    Supported DataSets and Processors

    Currently, following NiFi components are supported by this reporting task:

    Analyzer Covered NiFi components Atlas DataSet Description
    name eventType transitUri example typeName qualifiedName
    NiFiRemotePortClient Remote Process Group Input Port SEND * http://nifi1.example.com:8080/nifi-api/data-transfer/input-ports/35dbc0ab-015e-1000-144c-a8d71255027d/transactions/89335043-f105-4de7-a0ef-46f2ef0c7c51/flow-files * nifi://nifi1.example.com:8081/cb729f05-b2ee-4488-909d-c6696cc34588 nifi_input_port remotePortID@namespace (e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1)
    With ‘Simple Path’ strategy intermediate ’nifi_queue’ and ’nifi_flow_path’ are created as well (marked with + in the following example) upstream (nifi_flow_path) -> + queue (nifi_queue) -> + Remote Input Port (nifi_flowPath) -> remote target port (nifi_input_port) * For ’nifi_flow_path’: remoteProcessGroupInputPortID@namespace(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@ns1) NOTE: The remoteProcessGroupInputPortID is the client side component ID and different from the remote target port ID. Multiple Remote Process Group Input Ports can send to the same target remote input port. * For ’nifi_queue’: remoteProcessGroupInputPortID@namespace(e.g. f31a6b53-3077-4c59-144c-a8d71255027d@ns1)
    Remote Process Group Output Port RECEIVE * http://nifi1.example.com:8080/nifi-api/data-transfer/output-ports/45dbc0ab-015e-1000-144c-a8d71255027d/transactions/99335043-f105-4de7-a0ef-46f2ef0c7c51/flow-files * nifi://nifi1.example.com:8081/db729f05-b2ee-4488-909d-c6696cc34588 nifi_output_port remotePortID@namespace (e.g. 45dbc0ab-015e-1000-144c-a8d71255027d@ns1)
    With ‘Simple Path’ strategy intermediate ’nifi_flow_path’ and ’nifi_queue’ are created as well (marked with + in the following example) remote target port (nifi_output_port) -> + Remote Output Port (nifi_flow_path) -> + queue (nifi_queue) -> downstream (nifi_flow_path) * For ’nifi_flow_path’: remoteProcessGroupOutputPortID@namespace(e.g. 7375f8f6-4604-468d-144c-a8d71255027d@ns1) NOTE: The remoteProcessGroupOutputPortID is the client side component ID and different from the remote target port ID. Multiple Remote Process Group Output Ports can pull from the same target remote output port. * For ’nifi_queue’: downstreamPathGUID@namespace(e.g. bb530e58-ee14-3cac-144c-a8d71255027d@ns1)
    NiFiRemotePortServer Remote Input Port Remote Output Port RECEIVE SEND * http://nifi1.example.com:8080/nifi-api/data-transfer/input-ports/35dbc0ab-015e-1000-144c-a8d71255027d/transactions/89335043-f105-4de7-a0ef-46f2ef0c7c51/flow-files * nifi://nifi1.example.com:8081/cb729f05-b2ee-4488-909d-c6696cc34588 nifi_input_port nifi_output_port remotePortID@namespace(e.g. 35dbc0ab-015e-1000-144c-a8d71255027d@ns1)
    KafkaTopic PublishKafka ConsumeKafka PublishKafka_0_10 ConsumeKafka_0_10 PublishKafkaRecord_0_10 ConsumeKafkaRecord_0_10 SEND RECEIVE SEND RECEIVE SEND RECEIVE PLAINTEXT://kafka1.example.com:9092/sample-topic (Protocol can be either PLAINTEXT, SSL, SASL_PLAINTEXT or SASL_SSL) kafka_topic topicName@namespace(e.g. testTopic@ns1) **NOTE:**With Atlas earlier than 0.8.2, the same topic name in different clusters can not be created using the pre-built ‘kafka_topic’. See ATLAS-2286.
    PutHiveStreaming PutHiveStreaming SEND thrift://hive.example.com:9083 hive_table tableName@namespace(e.g. myTable@ns1)
    Hive2JDBC PutHiveQL SelectHiveQL SEND RECEIVE, FETCH jdbc:hive2://hive.example.com:10000/default hive_table tableName@namespace(e.g. myTable@ns1) The corresponding Processors parse Hive QL to set ‘query.input.tables’ and ‘query.output.tables’ FlowFile attributes. These attribute values are used to create qualified name.
    HDFSPath DeleteHDFS FetchHDFS FetchParquet GetHDFS GetHDFSSequenceFile PutHDFS PutORC PutParquet REMOTE_INVOCATION FETCH FETCH RECEIVE RECEIVE SEND SEND SEND hdfs://nn.example.com:8020/user/nifi/5262553828219 hdfs_path /path/fileName@namespace(e.g. /app/warehouse/hive/db/default@ns1)
    AwsS3Directory DeleteHDFS FetchHDFS FetchParquet GetHDFS GetHDFSSequenceFile PutHDFS PutORC PutParquet REMOTE_INVOCATION FETCH FETCH RECEIVE RECEIVE SEND SEND SEND s3a://mybucket/mydir aws_s3_pseudo_dir s3UrlWithoutObjectName@namespace(e.g. s3a://mybucket/mydir@ns1)
    HBaseTable FetchHBaseRow GetHBase PutHBaseCell PutHBaseJSON PutHBaseRecord ScanHBase FETCH RECEIVE SEND SEND SEND RECEIVE hbase://hmaster.example.com:16000/tableA/rowX hbase_table tableName@namespace(e.g. myTable@ns1)
    FilePath PutFile GetFile … etc SEND RECEIVE … etc file:///tmp/a.txt fs_path /path/fileName@hostname(e.g. /tmp/dir/filename.txt@host.example.com)
    unknown.CreateReceive, FetchSend, RemoteInvocation Other Processors those generates listed event types CREATE RECEIVE FETCH SEND REMOTE_INVOCATION nifi_data processorGuid@namespacedb8bb12c-5cd3-3011-c971-579f460ebedf@ns1

    How it runs in NiFi cluster

    When this reporting task runs in a NiFi cluster, following tasks are executed only by the primary node:

    • Create NiFi Atlas Types in Atlas type system
    • Maintain NiFi flow structure and metadata in Atlas which consists of NiFi component entities such as ’nifi_flow’, ’nifi_flow_path’ and ’nifi_input(output)_port’.

    While every node (including primary node) performs following:

    • Analyzes NiFi provenance events stored in a provenance event repository on it, to create lineage between ’nifi_flow_path’ and other DataSet (e.g. Hive tables or HDFS path).

    Limitations

    • Requires Atlas 0.8 incubating or later: This reporting task requires Atlas REST API version 2, which is introduced at Atlas 0.8-incubating. Older versions of Atlas are not supported.
    • Limited DataSets and Processors support: In order to report lineage to Atlas, this reporting task must know what a given processor does with a certain DataSet. Then create an ‘Atlas Object Id’ for a DataSet which uniquely identifies an entity in Atlas. Atlas Object Id has unique properties map, and mostly ‘qualifiedName’ is set in the unique properties map to identify an entity. The format of a qualifiedName depends on each DataSet.

    To create this Atlas Object ID, we have to implement Processor-specific code that analyzes configured properties. See Supported DataSets and Processors for details.

    • Restart NiFi is required to update some ReportingTask properties As underlying Atlas client library caches configurations when it runs the first time, some properties of this reporting task can not be updated by stopping, configure and restarting the reporting task.

    NiFi process needs to be restarted in such case.

    Atlas Server Configurations

    • Delete Handler: Atlas uses ‘SoftDeleteHandler’ by default which mark relationships deleted, but still can be seen in Atlas UI. Soft delete model is useful if you would like to capture every lineage ever defined, but if you prefer seeing current state of a NiFi flow, Hard delete would be more appropriate.

    To change this behavior, set following in ‘atlas-application.properties’ on Atlas server, then restart Atlas. HardDeleteHandlerV1 physically removes lineage:

    atlas.DeleteHandlerV1.impl=org.apache.atlas.repository.store.graph.v1.HardDeleteHandlerV1
    
Properties
Dynamic Properties
State Management
Scopes Description
LOCAL Stores the Reporting Task's last event Id so that on restart the task knows where it left off.