Creating Airflow jobs using Cloudera Data Engineering
Learn about how to create Airflow jobs using Cloudera Data Engineering.
An Airflow job in Cloudera Data Engineering consists of an Airflow DAG file and various optional resources. Jobs can be run on demand or scheduled.
Here is a complete DAG
file:
from dateutil import parser
from datetime import datetime, timedelta
from datetime import timezone
from airflow import DAG
from cloudera.airflow.providers.operators.cde import CdeRunJobOperator
default_args = {
'owner': 'psherman',
'retry_delay': timedelta(seconds=5),
'depends_on_past': False,
'start_date':datetime(2024, 2, 10, tz="UTC"),
}
example_dag = DAG(
'airflow-pipeline-demo',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
is_paused_upon_creation=False
)
ingest_step1 = CdeRunJobOperator(
connection_id='cde-vc01-dev',
task_id='ingest',
retries=3,
dag=example_dag,
job_name='etl-ingest-job'
)
prep_step2 = CdeRunJobOperator(
task_id='data_prep',
job_name='insurance-claims-job'
)
ingest_step1 >> prep_step2
Here are some examples of things you can define in the DAG file:
- CDE Run Job Operator
- Use CdeRunJobOperator to specify a Cloudera Data Engineering job to run. This job must already exist
in the virtual cluster specified by the
connection_id. If noconnection_idis specified, Cloudera Data Engineering looks for the job in the virtual cluster where the Airflow job runs.from cloudera.airflow.providers.operators.cde import CdeRunJobOperator ... ingest_step1 = CdeRunJobOperator( connection_id='cde-vc01-dev', task_id='ingest', retries=3, dag=example_dag, job_name='etl-ingest-job' ) - Email Alerts
- Add the following parameters to the DAG default_args to
send email alerts for job failures or missed service-level agreements or
both.
'email_on_failure': True, 'email': 'abc@example.com', 'email_on_retry': True, 'sla': timedelta(seconds=30) - Task dependencies
- After you have defined the tasks, specify the dependencies as
follows:
ingest_step1 >> prep_step2For more information on task dependencies, see Task Dependencies in the Apache Airflow documentation.
