Creating a pipeline with additional Airflow configurations using CDE CLI
By creating a pipeline with additional Airflow configurations using the Cloudera Data Engineering (CDE) CLI, you can create multi-step pipelines with a combination of available operators. There are two ways to create this type of pipeline. The first method detailed below is recommended approach that we highly suggest customers use. The second is the alternative method that customers have used in the past, but is not recommended.
Airflow DAGs can be defined with parameters at the DAG-level or Task-level. These parameters can be overridden in the case of a manual run. A manual run is triggered explicitly by the user. It is recommended to use the Params approach so that default values can be used by the scheduled job instances as well.
An example of a DAG definition with additional Airflow configuration is as follows:
- Create a configuration such as the example shown below:
-
from airflow import DAG from airflow.models.param import Param with DAG( "my_dag", params={ # an int with a default value "int_param": Param(10, type="integer", minimum=0, maximum=20), # a required param which can be of multiple types # a param must have a default value "dummy": Param(5, type=["null", "number", "string"]), # an enum param, must be one of three values "enum_param": Param("foo", enum=["foo", "bar", 42]), # a param which uses json-schema formatting "email": Param( default="example@example.com", type="string", format="idn-email", minLength=5, maxLength=255, ), }, ): # prints <class 'str'> by default # prints <class 'int'> if render_template_as_native_obj=True my_operator = PythonOperator( task_id="template_type", op_args=[ "{{ params.int_param }}", ], python_callable=( lambda x: print(type(x)) ), )
In this case, nothing needs to be done on thecde job create
step. Values can be additionally overridden in a manual run, through the--config
flag of thecde job run
command. For example:cde job run --name my_pipeline --config key1=my_new_value1
-