Using CDE with an external Apache Airflow deployment

The Cloudera provider for Apache Airflow, available at the Cloudera GitHub repository, provides two Airflow operators for running Cloudera Data Engineering (CDE) and Cloudera Data Warehouse (CDW) jobs. You can install the provider on your existing Apache Airflow deployment to integrate.

  • The Cloudera provider for Apache Airflow is for use with existing Airflow deployments. If you want to use the embedded Airflow service provided by CDE, see Automating data pipelines with CDE and CDW using Apache Airflow.
  • The provider requires Python 3.6 or higher.
  • The provider requires the Python cryptography package version 3.3.2 or higher to address CVE-2020-36242. If an older version is installed, the plugin automatically updates the cryptography library.

This component provides two Airflow operators to be integrated in your DAGs:

  • CDEJobRunOperator, for running Cloudera Data Engineering jobs.
  • CDWOperator, for accessing Cloudera Data Warehouse
Install Cloudera Airflow provider
  1. Select one of the following installation methods:
    • Direct install

      Run the following command on your Airflow server:

      pip install <package_url>

      Replace <package_url> with the link to the desired wheel package at https://github.com/cloudera/cloudera-airflow-plugins/releases

      For example, to install version 1.0.0:

      pip install https://github.com/cloudera/cloudera-airflow-plugins/releases/download/v1.0.0/cloudera_airflow_provider-1.0.0-py3-none-any.whl
    • Local install

      Run the following commands on your Airflow server to install the latest version:

      git clone --depth 1 https://github.com/cloudera/cloudera-airflow-plugins.git
      cd cloudera-airflow-plugins/cloudera_airflow_provider 
      pip install .

      Run the following commands on your Airflow server to install a specific version:

      git clone --depth 1 --branch <version> https://github.com/cloudera/cloudera-airflow-plugins.git
      cd cloudera-airflow-plugins/cloudera_airflow_provider 
      pip install .

      Replace <version> with the provider version that you want to install. For example, to install version 1.0.0:

      git clone --depth 1 --branch v1.0.0 https://github.com/cloudera/cloudera-airflow-plugins.git
      cd cloudera-airflow-plugins/cloudera_airflow_provider 
      pip install .
Create a connection using the Airflow UI

Before you can run a CDE job from your Airflow deployment, you must configure a connection using the Airflow UI.

  1. From the CDE home page, go to Overview > Virtual Clusters > Cluster Details of the Virtual Cluster (VC) where you want the CDE job to run.
  2. Click JOBS API URL to copy the URL.
  3. Go to your Airflow web console (where you installed the Cloudera provider).
  4. Go to Admin > Connection.
  5. Click + Add a new record.
  6. Fill in connection details:
    Conn Id
    Create a unique connection identifier.
    Conn Type
    The type of the connection. From the drop-down, select
    • HTTP (if you are using Apache Airflow version 1)
    • HTTP or Cloudera Data engineering (if you are using Apache Airflow version 2)
    Host/Virtual API Endpoint
    URL of the host where you want the job to run. Paste here the JOBS API URL you copied in a previous step.
    Login/CDP Access Key

    Provide the CDP access key of the account for running jobs on the CDE VC.

    Password/CDP Private Key

    Provide the CDP private key of the account for running jobs on the CDE VC.

  7. Click Save.
  8. In the CDE web console, go to the CDE virtual cluster and click View Jobs > Create Job.
  9. Fill in the Job Details:
    Job Type
    Select the option matching your use case.
    Name
    Specify a name for the job.
    DAG File
    Provide a DAG file.
    Use the CDEJobRunOperator to specify a CDE job to run. The job definition in the DAG file must contain:
    connection_id
    The Conn Id you specified on the Airflow UI when creating the connection.
    task_id
    The ID that identifies the job within the DAG.
    dag
    The variable containing the dag object
    job_name
    The name of the CDE job to run. This job must exist in the CDE virtual cluster you are connecting to.
    For example:
    from cloudera.cdp.airflow.operators.cde_operator import CDEJobRunOperator
    ...
    t1 = CDEJobRunOperator(
        connection_id='cde-vc01-dev',
        task_id='ingest',
        dag=example_dag,
        job_name='etl-ingest-job'
    )
    
  10. Click Create and Run.