Automating data pipelines using Apache Airflow DAG files in Cloudera Data Engineering
Cloudera Data Engineering (CDE) enables you to automate a workflow or data pipeline using Apache Airflow Python DAG files. CDE currently supports two Airflow operators; one to run a CDE job and one to access Cloudera Data Warehouse (CDW).
To complete these steps, you must have a running CDE virtual cluster and have access to a CDW virtual warehouse. CDE currently supports only Apache Hive virtual warehouses. To determine the CDW hostname to use for the connection:
- Navigate to the Cloudera Data Warehouse Overview page by clicking the Data Warehouse tile in the Cloudera Data Platform (CDP) management console.
- In the Virtual Warehouses column, find the warehouse you want to connect to.
- Click the three-dot menu for the selected warehouse, and then click Copy JDBC URL.
- Paste the URL into a text editor, and make note of the hostname. For
example, the hostname for the following JDBC URL is emphasized in
Create a connection to an existing CDW virtual warehouse using the
embedded Airflow UI:
- Navigate to the Cloudera Data Engineering Overview page by clicking the Data Engineering tile in the Cloudera Data Platform (CDP) management console.
- In the Environments column, select the environment containing the virtual cluster where you are creating the pipeline, then click the Cluster Details icon for the virtual cluster.
- Click the AIRFLOW UI link.
- In the Admin menu, click Connection.
- Click the plus sign to add a new record, and then fill
in the fields:
- Conn Id
- Create a unique connection identifier, such as
- Conn Type
- Select Hive Client Wrapper.
- Enter the host from the JDBC connection URL.
- Enter your workload username and password.
- Click Save.
- Create an Airflow DAG file in Python. Import the CDE and CDW
operators, and define the steps
- CDE job run operator
- Use CDEJobRunOperator to specify a CDE job
to run. This job must already
from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator ... ingest_step1 = CDEJobRunOperator( task_id='ingest', retries=3, dag=dag, job_name='etl-ingest-job' )
- Query definition
- You can define a query for later reference as
cdw_query = """ show databases; """
- CDW operator
- Use CDWOperator to execute a query against
the CDW virtual warehouse. Specify the connection ID you
configured in Airflow, and the query you want to
from cloudera.cdp.airflow.operators.cdw_operator import CDWOperator ... dw_step3 = CDWOperator( task_id='dataset-etl-cdw', dag=dag, cli_conn_id='cdw-hive-aws-demo', hql=cdw_query, schema='default', ### CDW related args ### use_proxy_user=False, query_isolation=True )
- Create a CDE job. Select the Airflow job
type, provide a name for the job, and upload the DAG file you created.
You can also upload the DAG file as a resource, and specify the
resource name during job creation. You can choose to create the job
and run it immediately, or to only create the job.For more information on creating CDE jobs, see Creating jobs in Cloudera Data Engineering.
For more information on using resources, see Managing Cloudera Data Engineering job resources using the CLI.