Creating Atlas entity type definitons for Flink

Before submitting Flink jobs to collect their metadata, you need to create Atlas entity type definitions for Flink.

Atlas does not include the metadata source of Flink by default. The administrator must manually upload the entity type definitions to the cluster to be able to start the Flink metadata collection.

  1. Upload the designed entity type definitions to the cluster using the Atlas REST API.
    curl -k -u <atlas_admin>:<atlas_admin_pwd> --location --request POST 'https://<atlas_server_host>:<atlas_server_port>/api/atlas/v2/types/typedefs' \
    --header 'Content-Type: application/json' \
    --data-raw '{
        "enumDefs": [],
        "structDefs": [],
        "classificationDefs": [],
        "entityDefs": [
            {
                "name": "flink_application",
                "superTypes": [
                    "Process"
                ],
                "serviceType": "flink",
                "typeVersion": "1.0",
                "attributeDefs": [
                    {
                        "name": "id",
                        "typeName": "string",
                        "cardinality": "SINGLE",
                        "isIndexable": true,
                        "isOptional": false,
                        "isUnique": true
                    },
                    {
                        "name": "startTime",
                        "typeName": "date",
                        "cardinality": "SINGLE",
                        "isIndexable": false,
                        "isOptional": true,
                        "isUnique": false
                    },
                    {
                        "name": "endTime",
                        "typeName": "date",
                        "cardinality": "SINGLE",
                        "isIndexable": false,
                        "isOptional": true,
                        "isUnique": false
                    },
                    {
                        "name": "conf",
                        "typeName": "map<string,string>",
                        "cardinality": "SINGLE",
                        "isIndexable": false,
                        "isOptional": true,
                        "isUnique": false
                    },
                    {
                        "name": "inputs",
                        "typeName": "array<string>",
                        "cardinality": "LIST",
                        "isIndexable": false,
                        "isOptional": false,
                        "isUnique": false
                    },
                    {
                        "name": "outputs",
                        "typeName": "array<string>",
                        "cardinality": "LIST",
                        "isIndexable": false,
                        "isOptional": false,
                        "isUnique": false
                    }
                ]
            }
        ],
        "relationshipDefs": []
    }'
    
  2. Log in to Cloudera Manager.
  3. Go to Flink>Configuration.
  4. Search for 'enable atlas' in the search bar.
  5. Enable Atlas Metadata Collection.

The Flink client notifies Atlas about the metadata of the job on successful submission.