Creating a pipeline with additional Airflow configurations using CDE CLI
By creating a pipeline with additional Airflow configurations using the Cloudera Data Engineering (CDE) CLI, you can create multi-step pipelines with a combination of available operators. There are two ways to create this type of pipeline. The first method detailed below is recommended approach that we highly suggest customers use. The second is the alternative method that customers have used in the past, but is not recommended.
Airflow DAGs can be defined with parameters at the DAG-level or Task-level. These parameters can be overridden in the case of a manual run. A manual run is triggered explicitly by the user. It is recommended to use the Params approach so that default values can be used by the scheduled job instances as well.
An example of a DAG definition with additional Airflow configuration is as follows:
- Create a configuration such as the example shown below:
-
from airflow import DAG from airflow.models.param import Param with DAG( "my_dag", params={ # an int with a default value "int_param": Param(10, type="integer", minimum=0, maximum=20), # a required param which can be of multiple types # a param must have a default value "dummy": Param(5, type=["null", "number", "string"]), # an enum param, must be one of three values "enum_param": Param("foo", enum=["foo", "bar", 42]), # a param which uses json-schema formatting "email": Param( default="example@example.com", type="string", format="idn-email", minLength=5, maxLength=255, ), }, ): # prints <class 'str'> by default # prints <class 'int'> if render_template_as_native_obj=True my_operator = PythonOperator( task_id="template_type", op_args=[ "{{ params.int_param }}", ], python_callable=( lambda x: print(type(x)) ), )
In this case, nothing needs to be done on thecde job create
step. Values can be additionally overridden in a manual run, through the--config
flag of thecde job run
command. For example:cde job run --name my_pipeline --config key1=my_new_value1
-
For historical reasons CDE supports the {{ dag_run.conf }}
object as well. In this case, the option, --config
key=value
in the cde job create
command, is
used to define default values whenever the user triggers a manual run using
cde job run
without specifying these parameters in the
run command. This config
option can be repeated to define
multiple parameters.
- Create a configuration such as the example shown below:
The configuration can be used in a DAG as shown below:cde resource create --name my_pipeline_resource cde resource upload --name my_pipeline_resource --local-path my_pipeline_dag.py cde job create --name my_pipeline --type airflow --dag-file my_pipeline_dag.py --mount-1-resource my_pipeline_resource -–config key1=value1 —-config key2=value2
my_bash_task = BashOperator( task_id="my_bash_task", bash_command="echo key1_value: {{ dag_run.conf[‘key1’] }} key2_value: {{ dag_run.conf[‘key2’] }}", dag=dag, )
The configuration can also be overridden for manual runs in the same manner as described in the Recommended section on this page.