Creating Airflow jobs using Cloudera Data Engineering
Learn about how to create Airflow jobs using Cloudera Data Engineering.
An Airflow job in Cloudera Data Engineering consists of an Airflow DAG file and various optional resources. Jobs can be run on demand or scheduled.
Here is a complete DAG
file:
from dateutil import parser
from datetime import datetime, timedelta
from datetime import timezone
from airflow import DAG
from cloudera.airflow.providers.operators.cde import CdeRunJobOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
default_args = {
'owner': 'psherman',
'retry_delay': timedelta(seconds=5),
'depends_on_past': False,
'start_date':datetime(2024, 2, 10, tz="UTC"),
}
example_dag = DAG(
'airflow-pipeline-demo',
default_args=default_args,
schedule_interval='@daily',
catchup=False,
is_paused_upon_creation=False
)
ingest_step1 = CdeRunJobOperator(
connection_id='cde-vc01-dev',
task_id='ingest',
retries=3,
dag=example_dag,
job_name='etl-ingest-job'
)
prep_step2 = CdeRunJobOperator(
task_id='data_prep',
job_name='insurance-claims-job'
)
cdw_query = """
show databases;
"""
dw_step3 = SQLExecuteQueryOperator(
task_id="dataset-etl-cdw",
conn_id="cde-vw-demo",
sql=cdw_query
)
ingest_step1 >> prep_step2 >> dw_step3
Here are some examples of things you can define in the DAG file:
- CDE Run Job Operator
- Use CdeRunJobOperator to specify a Cloudera Data Engineering job to run. This job must already exist
in the virtual cluster specified by the
connection_id. If noconnection_idis specified, Cloudera Data Engineering looks for the job in the virtual cluster where the Airflow job runs. For more information, see Running Airflow jobs on other Cloudera Data Engineering Virtual Clusters.from cloudera.airflow.providers.operators.cde import CdeRunJobOperator ... ingest_step1 = CdeRunJobOperator( connection_id='cde-vc01-dev', task_id='ingest', retries=3, dag=example_dag, job_name='etl-ingest-job' ) - Query definition
- You can define a query string or template within the DAG file for later
reference as follows:
cdw_query = """ show databases; """ - SQLExecuteQueryOperator
- Use SQLExecuteQueryOperator to execute a query against
the Cloudera Data Warehouse virtual warehouse. Specify the
connection ID you configured in Airflow, and the query you want to
run.
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator ... dw_step3 = SQLExecuteQueryOperator( task_id='dataset-etl-cdw', conn_id='cde-vw-demo', sql=cdw_query ) - Task dependencies
- After you have defined the tasks, specify the dependencies as
follows:
ingest_step1 >> prep_step2 >> dw_step3For more information on task dependencies, see Task Dependencies in the Apache Airflow documentation.
