Creating Airflow jobs using Cloudera Data Engineering

Learn about how to create Airflow jobs using Cloudera Data Engineering.

An Airflow job in Cloudera Data Engineering consists of an Airflow DAG file and various optional resources. Jobs can be run on demand or scheduled.

Create an Airflow DAG file in Python. Import the Cloudera Data Engineering operator and define the tasks and dependencies.
Here is a complete DAG file:
from dateutil import parser
from datetime import datetime, timedelta
from datetime import timezone
from airflow import DAG
from cloudera.airflow.providers.operators.cde import CdeRunJobOperator


default_args = {
    'owner': 'psherman',
    'retry_delay': timedelta(seconds=5),
    'depends_on_past': False,
    'start_date':datetime(2024, 2, 10, tz="UTC"),
}

example_dag = DAG(
    'airflow-pipeline-demo',
    default_args=default_args, 
    schedule_interval='@daily', 
    catchup=False, 
    is_paused_upon_creation=False
)

ingest_step1 = CdeRunJobOperator(
    connection_id='cde-vc01-dev',
    task_id='ingest',
    retries=3,
    dag=example_dag,
    job_name='etl-ingest-job'
)

prep_step2 = CdeRunJobOperator(
    task_id='data_prep',
    job_name='insurance-claims-job'
)


ingest_step1 >> prep_step2

Here are some examples of things you can define in the DAG file:

CDE Run Job Operator
Use CdeRunJobOperator to specify a Cloudera Data Engineering job to run. This job must already exist in the virtual cluster specified by the connection_id. If no connection_id is specified, Cloudera Data Engineering looks for the job in the virtual cluster where the Airflow job runs.
from cloudera.airflow.providers.operators.cde import CdeRunJobOperator
...
ingest_step1 = CdeRunJobOperator(
    connection_id='cde-vc01-dev',
    task_id='ingest',
    retries=3,
    dag=example_dag,
    job_name='etl-ingest-job'
)
Email Alerts
Add the following parameters to the DAG default_args to send email alerts for job failures or missed service-level agreements or both.
'email_on_failure': True,
'email': 'abc@example.com',
'email_on_retry': True,
'sla': timedelta(seconds=30)                
Task dependencies
After you have defined the tasks, specify the dependencies as follows:
ingest_step1 >> prep_step2

For more information on task dependencies, see Task Dependencies in the Apache Airflow documentation.

  1. In the Cloudera Management Console, click the Data Engineering tile and click Overview.
  2. In the Cloudera Data Engineering Services column, select the service that contains the virtual cluster that you want to create a job for.
  3. In the Virtual Clusters column, locate the virtual cluster that you want to use and click the View Jobs icon.
  4. In the left navigation menu, click Jobs.
  5. Click the Create Job button.
  6. Provide the Job Details:
    1. Select Airflow for the job type. The available fields on the user interface updates automatically.
    2. Specify the Name.
    3. Click the File option and select the way you want to provide the DAG file. Using the File option, you can do the following:
      • Upload the DAG file to a new resource.
      • Use the DAG file from a previously created resource.
      • Upload any other resources required for the job.

      For more information about the Editor, see Creating an Airflow DAG using the Pipeline UI.

  7. If you do not want to run the job immediately, click the Create and Run drop-down menu and then click Create. Otherwise, click Create and Run to run the job immediately.