Executing SQL queries on Cloudera Data Warehouse or Data Hub instance using Apache Airflow in Cloudera Data Engineering
The following steps are for using the Airflow service provided with each CDE virtual cluster. For information about using your own Airflow deployment, see Using CDE with an external Apache Airflow deployment.
To run this job, an existing Airflow connection is required. For more information
about how to create an airflow connection, see the following topics:
- For SQL Operators, see Creating a connection to Cloudera Data Warehouse or Data Hub instance for SQL Operator.
- For CDW Operators, see Creating a connection to Cloudera Data Warehouse for CDW Operator.
Create an Airflow DAG file in Python. Import the required operators and define
the tasks and dependencies.
- The following example DAG file uses the connection named
“impala-test”, and executes a “SHOW
TABLES” query on the Impala Virtual Warehouse using the
SQLExecuteQueryOperator:
import logging from airflow import DAG from datetime import datetime, timedelta from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator with DAG( dag_id="imp_dag", start_date=datetime(2024, 2, 9), schedule_interval=timedelta(days=1), catchup=False, ) as dag: execute_query = SQLExecuteQueryOperator( task_id="execute_query", conn_id="impala-test", sql=f"SHOW TABLES", split_statements=True, return_last=False, ) execute_query
For more advanced use cases, see Airflow documentation about the SQL Operators.
- The following example DAG file uses the connection named
“cdw-hive”, and executes a “SHOW
TABLES” query on the Impala Virtual Warehouse using the
CdwExecuteQueryOperator:
from airflow import DAG from cloudera.airflow.providers.operators.cdw import CdwExecuteQueryOperator from pendulum import datetime default_args = { 'owner': 'dag_owner', 'depends_on_past': False, 'start_date':datetime(2024, 2, 9) } example_dag = DAG( 'example-cdwoperator', default_args=default_args, schedule_interval=None, catchup=False, is_paused_upon_creation=False ) cdw_query = """ USE default; SHOW TABLES; """ cdw_step = CdwExecuteQueryOperator( task_id='cdw-test', dag=example_dag, cli_conn_id='cdw-hive', hql=cdw_query, # The following values `schema`, `query_isolation` # are the default values, just presented here for the example. schema='default', query_isolation=True ) cdw_step
For more information about the CdwExecuteQueryOperator, see the GitHub page.