Automating data pipelines using Apache Airflow in Cloudera Data Engineering

Cloudera Data Engineering (CDE) enables you to automate a workflow or data pipeline using Apache Airflow Python DAG files. Each CDE virtual cluster includes an embedded instance of Apache Airflow. You can also use CDE with your own Airflow deployment. CDE on CDP Private Cloud currently supports only the CDE job run operator.

The following instructions are for using the Airflow service provided with each CDE virtual cluster. For instructions on using your own Airflow deployment, see Using the Cloudera provider for Apache Airflow.

  1. Create an Airflow DAG file in Python. Import the CDE operator and define the tasks and dependencies.
    For example, here is a complete DAG file:
    from dateutil import parser
    from datetime import datetime, timedelta
    from datetime import timezone
    from airflow import DAG
    from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator
    
    
    default_args = {
        'owner': 'psherman',
        'retry_delay': timedelta(seconds=5),
        'depends_on_past': False,
        'start_date': parser.isoparse('2021-05-25T07:33:37.393Z').replace(tzinfo=timezone.utc)
    }
    
    example_dag = DAG(
        'airflow-pipeline-demo',
        default_args=default_args, 
        schedule_interval='@daily', 
        catchup=False, 
        is_paused_upon_creation=False
    )
    
    ingest_step1 = CDEJobRunOperator(
        connection_id='cde-vc01-dev',
        task_id='ingest',
        retries=3,
        dag=example_dag,
        job_name='etl-ingest-job'
    )
    
    prep_step2 = CDEJobRunOperator(
        task_id='data_prep',
        dag=example_dag,
        job_name='insurance-claims-job'
    )
    
    
    ingest_step1 >> prep_step2
    

    Here are some examples of things you can define in the DAG file:

    CDE job run operator
    Use CDEJobRunOperator to specify a CDE job to run. This job must already exist in the virtual cluster specified by the connection_id. If no connection_id is specified, CDE looks for the job in the virtual cluster where the Airflow job runs.
    from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator
    ...
    ingest_step1 = CDEJobRunOperator(
        connection_id='cde-vc01-dev',
        task_id='ingest',
        retries=3,
        dag=example_dag,
        job_name='etl-ingest-job'
    )
    
    Task dependencies
    After you have defined the tasks, specify the dependencies as follows:
    ingest_step1 >> prep_step2

    For more information on task dependencies, see Task Dependencies in the Apache Airflow documentation.

    For a tutorial on creating Apache Airflow DAG files, see the Apache Airflow documentation.
  2. Create a CDE job. Select the Airflow job type, provide a name for the job, and upload the DAG file you created. You can also upload the DAG file as a resource, and specify the resource name during job creation. You can choose to create the job and run it immediately, or to only create the job.
    For more information on creating CDE jobs, see Creating jobs in Cloudera Data Engineering.

    For more information on using resources, see Managing Cloudera Data Engineering job resources using the CLI.