Automating data pipelines using Apache Airflow in Cloudera Data Engineering

Cloudera Data Engineering (CDE) enables you to automate a workflow or data pipeline using Apache Airflow Python DAG files. Each CDE virtual cluster includes an embedded instance of Apache Airflow. You can also use CDE with your own Airflow deployment. CDE currently supports two Airflow operators; one to run a CDE job and one to access Cloudera Data Warehouse (CDW).

To determine the CDW hostname to use for the connection:

  1. Navigate to the Cloudera Data Warehouse Overview page by clicking the Data Warehouse tile in the Cloudera Data Platform (CDP) management console.
  2. In the Virtual Warehouses column, find the warehouse you want to connect to.
  3. Click the three-dot menu for the selected warehouse, and then click Copy JDBC URL.
  4. Paste the URL into a text editor, and make note of the hostname. For example, the hostname portion of the following JDBC URL is emphasized in italics:
    jdbc:hive2://hs2-aws-2-hive.env-k5ip0r.dw.ylcu-atmi.cloudera.site/default;transportMode=http;httpPath=cliservice;ssl=true;retries=3;

The following instructions are for using the Airflow service provided with each CDE virtual cluster. For instructions on using your own Airflow deployment, see Using the Cloudera provider for Apache Airflow.

  1. Create a connection to an existing CDW virtual warehouse using the embedded Airflow UI:
    1. In the Cloudera Data Platform (CDP) console, click the Data Engineering tile. The CDE Home page displays.
    2. Click Administration in the left navigation menu, select the service containing the virtual cluster you are using, and then in the Virtual Clusters column, click Cluster Details for the virtual cluster.
    3. Click AIRFLOW UI.
    4. From the Airflow UI, click the Connection link from the Admin menu.
    5. Click the plus sign to add a new record, and then fill in the fields:
      To create a connection to an existing CDW virtual warehouse:
      Conn Id
      Create a unique connection identifier, such as cdw-hive-demo.
      Conn Type
      Select Impala or HiveServer2, depending on the use-case.
      Host
      Enter the hostname from the JDBC connection URL. Do not enter the full JDBC URL.
      Schema
      default
      Login/Password
      Impala supports JWT token based authentication. If JWT token is used Login and Password should be left empty. Enter your workload username and password.
      Extra
      Extra arguments should contain the following options:
      If JWT token based authentication is used:
      {"auth_mechanism": "JWT", "use_ssl": "True", "use_http_transport": "True", "http_path": "cliservice", "jwt": "JWT_TOKEN"}
      If workload username and pasword is used:
      {"auth_mechanism": "LDAP", "use_ssl": "True", "use_http_transport": "True", "http_path": "cliservice"}
      To create a connection to an existing virtual cluster in a different environment:
      Conn Id
      Create a unique connection identifier.
      Conn Type
      The type of the connection. From the drop-down, select Cloudera Data engineering
      Host/Virtual API Endpoint
      The JOBS API URL of the host where you want the job to run.
      Login/CDP Access Key

      Provide the CDP access key of the account for running jobs on the CDE virtual cluster.

      Password/CDP Private Key

      Provide the CDP private key of the account for running jobs on the CDE virtual cluster.

    6. Click Save.
  2. Create an Airflow DAG file in Python. Import the CDE and CDW operators and define the tasks and dependencies.
    Here is a complete DAG file:
    from dateutil import parser
    from datetime import datetime, timedelta
    from datetime import timezone
    from airflow import DAG
    from cloudera.airflow.providers.operators.cde import CdeRunJobOperator
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    
    default_args = {
        'owner': 'psherman',
        'retry_delay': timedelta(seconds=5),
        'depends_on_past': False,
        'start_date':datetime(2024, 2, 10, tz="UTC"),
    }
    
    example_dag = DAG(
        'airflow-pipeline-demo',
        default_args=default_args, 
        schedule_interval='@daily', 
        catchup=False, 
        is_paused_upon_creation=False
    )
    
    ingest_step1 = CdeRunJobOperator(
        connection_id='cde-vc01-dev',
        task_id='ingest',
        retries=3,
        dag=example_dag,
        job_name='etl-ingest-job'
    )
    
    prep_step2 = CdeRunJobOperator(
        task_id='data_prep',
        job_name='insurance-claims-job'
    )
    
    cdw_query = """
    show databases;
    """
    
    dw_step3 = SQLExecuteQueryOperator(
        task_id="dataset-etl-cdw",
        conn_id="cde-vw-demo",
        sql=cdw_query
    )
    
    
    ingest_step1 >> prep_step2 >> dw_step3
    

    Here are some examples of things you can define in the DAG file:

    CDE job run operator
    Use CdeRunJobOperator to specify a CDE job to run. This job must already exist in the virtual cluster specified by the connection_id. If no connection_id is specified, CDE looks for the job in the virtual cluster where the Airflow job runs.
    from cloudera.cdp.airflow.operators.cde_operator import CdeRunJobOperator
    ...
    ingest_step1 = CdeRunJobOperator(
        connection_id='cde-vc01-dev',
        task_id='ingest',
        retries=3,
        dag=example_dag,
        job_name='etl-ingest-job'
    )
    
    Query definition
    You can define a query string or template within the DAG file for later reference as follows:
    cdw_query = """
    show databases;
    """
    SQLExecuteQueryOperator
    Use SQLExecuteQueryOperator to execute a query against the CDW virtual warehouse. Specify the connection ID you configured in Airflow, and the query you want to run.
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
    ...
    dw_step3 = SQLExecuteQueryOperator(
        task_id='dataset-etl-cdw',
        conn_id='cde-vw-demo',
        sql=cdw_query
    )
    
    Task dependencies
    After you have defined the tasks, specify the dependencies as follows:
    ingest_step1 >> prep_step2 >> dw_step3

    For more information on task dependencies, see Task Dependencies in the Apache Airflow documentation.

    For a tutorial on creating Apache Airflow DAG files, see the Apache Airflow documentation.
  3. Create a CDE job.
    1. In the Cloudera Data Platform (CDP) console, click the Data Engineering tile. The CDE Home page displays.
    2. In the left navigation menu click Jobs. The Jobs page is displayed.
    3. Click Create Job. The Job Details page is displayed.
    4. Select the Airflow job type.
    5. Name: Provide a name for the job.
    6. DAG File: Use an existing file or add a DAG file to an existing resource or create a resource and upload it.
      1. Select from Resource: Click Select from Resource to select a DAG file from an existing resource.
      2. Upload: Click Upload to upload a DAG file to an existing resource or to a new resource that you can create by selecting Create a resource from the Select a Resource dropdown list. Specify the resource name and upload the DAG file to it.
  4. Click Create and Run to create the job and run it immediately, or click the dropdown button and select Create to create the job.