Creating an Airflow pipeline with custom files using CDE CLI [Technical Preview]
By creating a pipeline in CDE using the CLI, you can add custom files that are available for tasks. This is a technical preview.
For use
cases
where custom files need to be accessed within an Airflow task, you need to first
upload the custom files to a CDE resource, and then specify it in the job creation
parameter using the --airflow-file-mount-<n>-resource
option.
These files are available only to the jobs in which they are linked.
The general form of the command is:
cde job create \
--name <my_job_name> \
--type airflow \
--mount-1-resource <my_dag_resource> \
--dag-file <my_dag_file.py> \
--airflow-file-mount-n-resource <my_file_resource> \
--airflow-file-mount-n-prefix <my_custom_prefix> # Optional
In the --airflow-file-mount-n-resource
parameter,
n
is an integer number (beginning at 1). This allows you to
specify multiple ...-resource
parameters, to mount multiple
resources.
Each resource is mounted at /app/mount/<prefix>
. If you do not
need to specify a custom prefix, the mount point of your resource will be based on
the resource name. For example, if the name of your CDE resource is 'my_resource',
the files in the resource will be made available within Airflow under
/app/mount/my_resource
.
If you do want to specify a custom prefix for your resource's mount point, use
the optional --airflow-file-mount-n-prefix
parameter,
specifying n
as the same number as the corresponding
--airflow-file-mount-n-resource
parameter.
cde resource create --name my_pipeline_resource
cde resource upload --name my_pipeline_resource --local-path my_pipeline_dag.py
cde resource create --name my_file_resource
cde resource upload --name my_file_resource --local-path my_file.conf
cde job create --name my_pipeline --type airflow --dag-file my_pipeline_dag.py --mount-1-resource my_pipeline_resource --airflow-file-mount-1-resource my_file_resource
/app/mount/<resource_name or
resource_alias>
/<file_name>,
like in the following
example: read_conf = BashOperator(
task_id=read_conf,
bash_command=”cat /app/mount/my_file_resource/my_file.conf”
)
cde job create --name my_pipeline --type airflow --dag-file my_pipeline_dag.py --mount-1-resource my_pipeline_resource --airflow-file-mount-1-resource my_file_resource --airflow-file-mount-1-prefix my_custom_prefix
read_conf = BashOperator(
task_id=read_conf,
bash_command=”cat /app/mount/my_custom_prefix/my_file.conf”
)