Automating data pipelines using Apache Airflow in Cloudera Data Engineering
Cloudera Data Engineering (CDE) enables you to automate a workflow or data pipeline using Apache Airflow Python DAG files. Each CDE virtual cluster includes an embedded instance of Apache Airflow. You can also use CDE with your own Airflow deployment. CDE on CDP Private Cloud currently supports only the CDE job run operator.
The following instructions are for using the Airflow service provided with each CDE virtual cluster. For instructions on using your own Airflow deployment, see Using the Cloudera provider for Apache Airflow.
- Create an Airflow DAG file in Python. Import the CDE operator and define the
tasks and dependencies.For example, here is a complete DAG file:
from dateutil import parser from datetime import datetime, timedelta from datetime import timezone from airflow import DAG from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator default_args = { 'owner': 'psherman', 'retry_delay': timedelta(seconds=5), 'depends_on_past': False, 'start_date': parser.isoparse('2021-05-25T07:33:37.393Z').replace(tzinfo=timezone.utc) } example_dag = DAG( 'airflow-pipeline-demo', default_args=default_args, schedule_interval='@daily', catchup=False, is_paused_upon_creation=False ) ingest_step1 = CDEJobRunOperator( connection_id='cde-vc01-dev', task_id='ingest', retries=3, dag=example_dag, job_name='etl-ingest-job' ) prep_step2 = CDEJobRunOperator( task_id='data_prep', dag=example_dag, job_name='insurance-claims-job' ) ingest_step1 >> prep_step2
Here are some examples of things you can define in the DAG file:
- CDE job run operator
- Use CDEJobRunOperator to specify a CDE job
to run. This job must already exist in the virtual cluster
specified by the
connection_id
. If noconnection_id
is specified, CDE looks for the job in the virtual cluster where the Airflow job runs.from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator ... ingest_step1 = CDEJobRunOperator( connection_id='cde-vc01-dev', task_id='ingest', retries=3, dag=example_dag, job_name='etl-ingest-job' )
- Kubernetes Pod Operator
- Kubernetes Pod Operator allows you to create and run pods in a Kubernetes
cluster using Kubernetes API. The following is an example of mounting a privileged
container onto a pod.
import pendulum from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.task_group import TaskGroup from airflow.operators.dummy_operator import DummyOperator from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { "owner": "dag_owner", "depends_on_past": False, "email": ["airflow@example.com"], "email_on_failure": False, "start_date": pendulum.datetime(2021, 1, 1, tz="UTC"), "email_on_retry": False, "user": "notused", } ID = "11" DAG_ID = "kubeop-"+ID TASK_ID = "taskid-t"+ID # namespace NS = "dex-app-444qz29j" # apache-airflow-providers-cncf-kubernetes operator # only pod security context can be set before v4.4.0 # security_context (dict) -- security options the pod should run with (PodSecurityContext). # v4.4.0 # 4b26c8c541 2022-09-09 # feat(KubernetesPodOperator): Add support of container_security_context (#25530) # in airflow 2.2.5 # apache-airflow-providers-cncf-kubernetes==3.0.0 # in airflow 2.3.4 # apache-airflow-providers-cncf-kubernetes==4.3.0 with DAG( DAG_ID, schedule_interval=None, default_args=default_args, is_paused_upon_creation=False, catchup=False, ) as dag: dummy = DummyOperator(task_id='dummy') k = KubernetesPodOperator( name="hello-dry-run", image="debian", cmds=["bash", "-cx"], arguments=["echo sleeping 60s ... && sleep 60 && echo done"], task_id=TASK_ID, namespace=NS, annotations={"dag_id": DAG_ID, "task_id": TASK_ID, "execution_date": '2022-01-01T00:00:00+00:00'}, is_delete_operator_pod=True, in_cluster=True, container_security_context={'privileged': True, 'capabilities': {'add': ['SYS_ADMIN']}}, security_context={'runAsNonRoot': False}, ) dummy >> k
- Email Alerts
- Add the following parameters to the DAG default_args to
send email alerts for job failures or missed service-level agreements or both.
'email_on_failure': True, 'email': 'abc@example.com', 'email_on_retry': True, 'sla': timedelta(seconds=30)
- Task dependencies
- After you have defined the tasks, specify the dependencies as
follows:
ingest_step1 >> prep_step2
For more information on task dependencies, see Task Dependencies in the Apache Airflow documentation.
For a tutorial on creating Apache Airflow DAG files, see the Apache Airflow documentation. -
Create a CDE job.
- In the Cloudera Data Platform (CDP) console, click the Data Engineering tile. The CDE Home page displays.
- In the CDE Home page, in Jobs, click Create New under Airflow or click Jobs in the left navigation menu and then click Create Job.
-
Select the Airflow job type.If you are creating the job from the Home page, select the virtual cluster where you want to create the job.
- Name: Provide a name for the job.
-
DAG File: Use an existing file or add a DAG file to an
existing resource or create a resource and upload it.
- Select from Resource: Click Select from Resource to select a DAG file from an existing resource.
- Upload: Click Upload to upload a DAG file to an existing resource or to a new resource that you can create by selecting Create a resource from the Select a Resource dropdown list. Specify the resource name and upload the DAG file to it.
- Click Create and Run to create the job and run it immediately, or click the dropdown button and select Create to create the job.