Executing SQL queries on Cloudera Data Warehouse or Data Hub instance using Apache Airflow in Cloudera Data Engineering

The following steps are for using the Airflow service provided with each CDE virtual cluster. For information about using your own Airflow deployment, see Using CDE with an external Apache Airflow deployment.

To run this job, an existing Airflow connection is required. For more information about how to create an airflow connection, see the following topics:
Create an Airflow DAG file in Python. Import the required operators and define the tasks and dependencies.
  • The following example DAG file uses the connection named “impala-test”, and executes a “SHOW TABLES” query on the Impala Virtual Warehouse using the SQLExecuteQueryOperator:
    import logging
    from airflow import DAG
    from datetime import datetime, timedelta
    from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
     
    with DAG(
    dag_id="imp_dag",
    start_date=datetime(2024, 2, 9),
    schedule_interval=timedelta(days=1),
    catchup=False,
    ) as dag:
    
      execute_query = SQLExecuteQueryOperator(
        task_id="execute_query",
        conn_id="impala-test",
        sql=f"SHOW TABLES",
        split_statements=True,
        return_last=False,
    )
    
      execute_query
    

    For more advanced use cases, see Airflow documentation about the SQL Operators.

  • The following example DAG file uses the connection named “cdw-hive”, and executes a “SHOW TABLES” query on the Impala Virtual Warehouse using the CdwExecuteQueryOperator:
    from airflow import DAG
    from cloudera.airflow.providers.operators.cdw import CdwExecuteQueryOperator
    from pendulum import datetime
    
    
    default_args = {
        'owner': 'dag_owner',
        'depends_on_past': False,
        'start_date':datetime(2024, 2, 9)
    }
    
    example_dag = DAG(
        'example-cdwoperator',
        default_args=default_args,
        schedule_interval=None,
        catchup=False,
        is_paused_upon_creation=False
    )
    
    cdw_query = """
    USE default;
    SHOW TABLES;
    """
    
    cdw_step = CdwExecuteQueryOperator(
        task_id='cdw-test',
        dag=example_dag,
        cli_conn_id='cdw-hive',
        hql=cdw_query,
        # The following values `schema`, `query_isolation`
        # are the default values, just presented here for the example.
        schema='default',
        query_isolation=True
    )
    
    cdw_step
    

    For more information about the CdwExecuteQueryOperator, see the GitHub page.