Creating an Airflow pipeline with custom files using CDE CLI [Technical Preview]
By creating a pipeline in Cloudera Data Engineering using the CLI, you can add custom files that are available for tasks. This is a technical preview.
For use
cases
where custom files need to be accessed within an Airflow task, you need to first
upload the custom files to a Cloudera Data Engineering resource, and then specify it in the job creation
parameter using the --airflow-file-mount-<n>-resource
option.
These files are available only to the jobs in which they are linked.
The general form of the command is:
cde job create \ --name <my_job_name> \ --type airflow \ --mount-1-resource <my_dag_resource> \ --dag-file <my_dag_file.py> \ --airflow-file-mount-n-resource <my_file_resource> \ --airflow-file-mount-n-prefix <my_custom_prefix> # Optional
In the --airflow-file-mount-n-resource
parameter,
n
is an integer number (beginning at 1). This allows you to
specify multiple ...-resource
parameters, to mount multiple
resources.
Each resource is mounted at /app/mount/<prefix>
. If you do not
need to specify a custom prefix, the mount point of your resource will be based on
the resource name. For example, if the name of your Cloudera Data Engineering resource is 'my_resource',
the files in the resource will be made available within Airflow under
/app/mount/my_resource
.
If you do want to specify a custom prefix for your resource's mount point, use
the optional --airflow-file-mount-n-prefix
parameter,
specifying n
as the same number as the corresponding
--airflow-file-mount-n-resource
parameter.
cde resource create --name my_pipeline_resource cde resource upload --name my_pipeline_resource --local-path my_pipeline_dag.py cde resource create --name my_file_resource cde resource upload --name my_file_resource --local-path my_file.conf cde job create --name my_pipeline --type airflow --dag-file my_pipeline_dag.py --mount-1-resource my_pipeline_resource --airflow-file-mount-1-resource my_file_resource
/app/mount/<resource_name or
resource_alias>
/<file_name>,
like in the following
example: read_conf = BashOperator( task_id=read_conf, bash_command=”cat /app/mount/my_file_resource/my_file.conf” )
cde job create --name my_pipeline --type airflow --dag-file my_pipeline_dag.py --mount-1-resource my_pipeline_resource --airflow-file-mount-1-resource my_file_resource --airflow-file-mount-1-prefix my_custom_prefix
read_conf = BashOperator( task_id=read_conf, bash_command=”cat /app/mount/my_custom_prefix/my_file.conf” )