Creating Airflow jobs using Cloudera Data Engineering

Learn about how to create Airflow jobs using Cloudera Data Engineering.

An Airflow job in Cloudera Data Engineering consists of an Airflow DAG file and various optional resources. Jobs can be run on demand or scheduled.

Create an Airflow DAG file in Python. Import the Cloudera Data Engineering and Cloudera Data Warehouse operators and define the tasks and dependencies.
Here is a complete DAG file:
from dateutil import parser
from datetime import datetime, timedelta
from datetime import timezone
from airflow import DAG
from cloudera.airflow.providers.operators.cde import CdeRunJobOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator

default_args = {
    'owner': 'psherman',
    'retry_delay': timedelta(seconds=5),
    'depends_on_past': False,
    'start_date':datetime(2024, 2, 10, tz="UTC"),
}

example_dag = DAG(
    'airflow-pipeline-demo',
    default_args=default_args, 
    schedule_interval='@daily', 
    catchup=False, 
    is_paused_upon_creation=False
)

ingest_step1 = CdeRunJobOperator(
    connection_id='cde-vc01-dev',
    task_id='ingest',
    retries=3,
    dag=example_dag,
    job_name='etl-ingest-job'
)

prep_step2 = CdeRunJobOperator(
    task_id='data_prep',
    job_name='insurance-claims-job'
)

cdw_query = """
show databases;
"""

dw_step3 = SQLExecuteQueryOperator(
    task_id="dataset-etl-cdw",
    conn_id="cde-vw-demo",
    sql=cdw_query
)


ingest_step1 >> prep_step2 >> dw_step3

Here are some examples of things you can define in the DAG file:

CDE Run Job Operator
Use CdeRunJobOperator to specify a Cloudera Data Engineering job to run. This job must already exist in the virtual cluster specified by the connection_id. If no connection_id is specified, Cloudera Data Engineering looks for the job in the virtual cluster where the Airflow job runs. For more information, see Running Airflow jobs on other Cloudera Data Engineering Virtual Clusters.
from cloudera.airflow.providers.operators.cde import CdeRunJobOperator
...
ingest_step1 = CdeRunJobOperator(
    connection_id='cde-vc01-dev',
    task_id='ingest',
    retries=3,
    dag=example_dag,
    job_name='etl-ingest-job'
)
Query definition
You can define a query string or template within the DAG file for later reference as follows:
cdw_query = """
show databases;
"""
SQLExecuteQueryOperator
Use SQLExecuteQueryOperator to execute a query against the Cloudera Data Warehouse virtual warehouse. Specify the connection ID you configured in Airflow, and the query you want to run.
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
...
dw_step3 = SQLExecuteQueryOperator(
    task_id='dataset-etl-cdw',
    conn_id='cde-vw-demo',
    sql=cdw_query
)
Task dependencies
After you have defined the tasks, specify the dependencies as follows:
ingest_step1 >> prep_step2 >> dw_step3

For more information on task dependencies, see Task Dependencies in the Apache Airflow documentation.

  1. In the Cloudera Management Console, click the Data Engineering tile and click Overview.
  2. In the Cloudera Data Engineering Services column, select the service that contains the virtual cluster that you want to create a job for.
  3. In the Virtual Clusters column, locate the virtual cluster that you want to use and click the View Jobs icon.
  4. In the left navigation menu, click Jobs.
  5. Click the Create Job button.
  6. Provide the Job Details:
    1. Select Airflow for the job type. The available fields on the user interface updates automatically.
    2. Specify the Name.
    3. Click the File option and select the way you want to provide the DAG file. Using the File option, you can do the following:
      • Upload the DAG file to a new resource.
      • Use the DAG file from a previously created resource.
      • Upload any other resources required for the job.

      For more information about the Editor, see Creating an Airflow DAG using the Pipeline UI.

  7. If you do not want to run the job immediately, click the Create and Run drop-down menu and then click Create. Otherwise, click Create and Run to run the job immediately.