Creating Atlas entity type definitions for Flink

Before submitting Flink jobs to collect their metadata, you need to create Atlas entity type definitions for Flink. In the command line, you need to connect to the atlas server and add the predefined type definitions. You also need to enable Atlas for Flink in Cloudera Manager.

Atlas does not include the metadata source of Flink by default. The administrator must manually upload the entity type definitions to the cluster to be able to start the Flink metadata collection.

  1. Upload the designed entity type definitions to the cluster using the Atlas REST API.
    curl -k -u <atlas_admin>:<atlas_admin_pwd> --location --request POST 'https://<atlas_server_host>:<atlas_server_port>/api/atlas/v2/types/typedefs' \
    --header 'Content-Type: application/json' \
    --data-raw '{
        "enumDefs": [],
        "structDefs": [],
        "classificationDefs": [],
        "entityDefs": [
            {
                "name": "flink_application",
                "superTypes": [
                    "Process"
                ],
                "serviceType": "flink",
                "typeVersion": "1.0",
                "attributeDefs": [
                    {
                        "name": "id",
                        "typeName": "string",
                        "cardinality": "SINGLE",
                        "isIndexable": true,
                        "isOptional": false,
                        "isUnique": true
                    },
                    {
                        "name": "startTime",
                        "typeName": "date",
                        "cardinality": "SINGLE",
                        "isIndexable": false,
                        "isOptional": true,
                        "isUnique": false
                    },
                    {
                        "name": "endTime",
                        "typeName": "date",
                        "cardinality": "SINGLE",
                        "isIndexable": false,
                        "isOptional": true,
                        "isUnique": false
                    },
                    {
                        "name": "conf",
                        "typeName": "map<string,string>",
                        "cardinality": "SINGLE",
                        "isIndexable": false,
                        "isOptional": true,
                        "isUnique": false
                    },
                    {
                        "name": "inputs",
                        "typeName": "array<string>",
                        "cardinality": "LIST",
                        "isIndexable": false,
                        "isOptional": false,
                        "isUnique": false
                    },
                    {
                        "name": "outputs",
                        "typeName": "array<string>",
                        "cardinality": "LIST",
                        "isIndexable": false,
                        "isOptional": false,
                        "isUnique": false
                    }
                ]
            }
        ],
        "relationshipDefs": []
    }'
    
  2. Log in to Cloudera Manager.
  3. Go to Flink>Configuration.
  4. Search for 'enable atlas' in the search bar.
  5. Enable Atlas Metadata Collection.

The Flink client notifies Atlas about the metadata of the job on successful submission.