In-place upgrade with Airflow Operators and Libraries
This section details Airflow considerations to avoid issues during and after the upgrade. Before performing an in-place upgrade of the Airflow operators and libraries, check whether the environment is compatible. This allows you to determine if the DAGs or the Python package dependencies require changes.
Ensure that the catchup
option is not enabled for any user's Airflow
jobs. Before the backup starts, if the Airflow DAG catchup
options
are enabled, disable them manually.
Verify Airflow operators and libraries and DAGs in the newer CDE service:
- Create a new test CDE service with the latest CDE version.
- Optional:
Copy the DAGs from the current CDE for local checking:
- Update the DAG code for the latest Python version by using the tools mentioned in the Tools for porting Python from version 3.8 to version 3.11 section.
- Update and test the DAG code for the latest Airflow version as described in the Debugging Airflow DAGs locally section.
- Optional:
Obtain the requirements.txt for the existing Airflow
environment. The requirements.txt is required after the
upgrade to restore the environments.
To obtain the Airflow environment requirements.txt for a virtual cluster, perform the following steps.
- Obtain the environment information JSON from the Airflow environment
API endpoint.
- Copy the JOBS API URL. For more information, check the Determine the virtual cluster endpoint URL step in Configuring the CLI client.
-
Use the JOBS API URL as a base and add
/admin/airflow/env
.JOBS API URL example with
/admin/airflow/env
:https://t5kh5fjp.cde-hfrsp8ww.dex-priv.xcu2-8y8x.dev.cldr.work/dex/api/v1/admin/airflow/env
Example for the JSON:
cat pyenv.json { "status": "activated", "packages": [ { "type": "python-module", "name": "mypy", "version": "1.2.0", "created": "2024-04-03T16:58:07Z" }, { "type": "python-module", "name": "mypy-extensions", "version": "1.0.0", "created": "2024-04-03T16:58:07Z" }, { "type": "python-module", "name": "tomli", "version": "2.0.1", "created": "2024-04-03T16:58:07Z" } ], "requirements": "mypy\n" }
- From the returned pyenv.json, generate a
requirements.txt file with the jq
tool:
cat pyenv.json | jq -r '.requirements'> requirements.txt
Example for the generated requirements.txt:
mypy
Use the CDE CLI to find and download the requirements.txt file which belongs to the Airflow environment in a virtual cluster:- Find the resource name that belongs to the Python environment.
Optionally use jq to get the
name:
cde resource list --filter "type[eq]airflow-python-env" --filter "status[eq]active" --show-hidden=true | jq '.[0].name' cde-airflow-pyenv-1712224823
- Download the requirements.txt
file:
cde resource download --name cde-airflow-pyenv-1712224823 --resource-path requirements.txt
- Obtain the environment information JSON from the Airflow environment
API endpoint.
- Optional:
Verify the requirements.txt file against an Airflow constraints file.
- Get the Python version used for Airflow from Compatibility for Cloudera Data Engineering and Runtime components.
-
Check requirements.txt against the CDE constraint
file by creating a temporary Python virtual environment and use the
dry-run feature of pip. If pip does not support dry-run, perform the
check by installing the packages without the
–dry-run
flag:# create a temporary local python virtual environment using the same python version used in Airflow in CDE. python -m venv # source the environment source venv/bin/activate # check if there are any conflicts between the packages in the requirements.txt # file and the packages needed by airflow # At least pip 22.2 is needed for the dry-run feature. (venv) python -m pip install -r requirements.txt -c "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt" --dry-run
- Optional:
Create a compatible Python environment in a test virtual cluster in the test
Service.
Python packages might need to be updated to work with the Airflow service in the new CDE version.
- Backup and restore your Airflow jobs or create all the necessary Airflow jobs in the test CDE Service that you created in step 1.
- If the DAG parsing fails, go to the Airflow UI to identify the impacted DAGs.
- Fix the DAGs or update the dependencies in the requirements.txt to fix the issues.
- Conduct a test run of these DAGs if they call custom libraries at runtime to ensure that the code defined within the tasks is also valid.
- If the DAG execution fails, go to the Airflow job logs to identify the impacted DAGs.
-
Fix the DAGs or update the dependencies in the
requirements.txt to fix the issues.
Backport all DAG changes to the CDE service to be upgraded, so that DAGs are compatible with the previous and the new version of Airflow. This may require a retest on an older CDE service test virtual cluster, if the production environment cannot be updated easily. If changes are needed for the older CDE service, test the changes again on the newer CDE service. Verify if both services work as expected. To fix the compatibility issues regarding Python version change, see Python upgrade from version 3.8 to version 3.11 and to fix issues regarding Airflow, see Airflow upgrade from version 2.7.3 to version 2.9.3.
- Delete the test service.
- Optional: Delete all Airflow operators and libraries in all virtual clusters in the CDE service before executing the upgrade.
- Continue with the upgrade.
- Optional: After a successful upgrade, build and activate the Airflow operators and libraries in the virtual clusters from the updated and fixed requirements.txt.
- Verify that the Airflow jobs are properly running in the Airflow UI and CDE Jobs UI.
- Optional: Remove and retest backward compatible changes made in the DAGs for the previous CDE version. Keep the DAGs updated to the latest Airflow version in CDE.