Debugging Airflow DAGs locally

You can learn about how to debug Airflow DAGs locally to identify any compatibility issues while upgrading Cloudera Data Engineering.

The example is based on the Airflow DAG debug documentation.

  1. Add the following two lines at the end of the DAG file that you want to test:
    if __name__ == "__main__":
    	dag.test()

    For example,

    from datetime import timezone
    from dateutil import parser
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import PythonOperator
    from airflow import DAG
    
    default_args = {
    	'owner': 'test',
    	'depends_on_past': False,
    	'email': ['airflow@example.com'],
    	'email_on_failure': False,
    	'start_date':  parser.isoparse('2020-11-11T20:20:04.268Z').replace(tzinfo=timezone.utc),
    	'email_on_retry': False,
    	'user': 'notused'
    }
    
    dag = DAG(
    	'some-dag-3',
    	default_args=default_args,
    	catchup=False,
    	schedule_interval=None,
    	is_paused_upon_creation=False,
    )
    
    def print_function():
    	print("Task executed.")
    
    print_task = PythonOperator(
    	task_id='print_task',
    	dag=dag,
    	python_callable=print_function
    )
    
    echo = BashOperator(
    	task_id='run_this_first',
    	bash_command='echo 1',
    	dag=dag,
    )
    
    echo >> print_task
    
    if __name__ == "__main__":
    	dag.test()
  2. Prepare a DAG testing environment by running the following commands:
    python -m venv venv-airflow
    source venv-airflow/bin/activate
    
     (venv-airflow) airflow pip install 'apache-airflow[http,celery,kubernetes]==2.9.3' --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.9.3/constraints-3.11.txt"
    
    # check the configured DB used for testing the DAGs (a local sqlite db should be enough)	
    airflow config get-value database sql_alchemy_conn
    sqlite:////Users/$USER/airflow/airflow.db
    
    # if there are DB errors missing tables or other reset the DB
    airflow db reset
    
  3. Run the DAG and check for the warnings. For example,
    python -W default example_dag.py
    /Users/$USER/workload/airflow/bashop_pythonop_dag.py:4 DeprecationWarning: The `airflow.operators.bash_operator.BashOperator` class is deprecated. Please use `'airflow.operators.bash.BashOperator'`.
    /Users/$USER/workload/airflow/bashop_pythonop_dag.py:5 DeprecationWarning: The `airflow.operators.python_operator.PythonOperator` class is deprecated. Please use `'airflow.operators.python.PythonOperator'`.
    /Users/$USER/workload/airflow/bashop_pythonop_dag.py:19 RemovedInAirflow3Warning: Param `schedule_interval` is deprecated and will be removed in a future release. Please use `schedule` instead.
    [2024-08-22T12:20:33.673+0200] {dag.py:4199} INFO - dagrun id: some-dag-3
    [2024-08-22T12:20:33.678+0200] {dag.py:4215} INFO - created dagrun <DagRun some-dag-3@2024-08-2210:20:33.616669+00:00:manual__2024-08-22T10:20:33.616669+00:00,state:ru
    ...
    

    where the -W flag checks for DeprecationWarnings in the code. For more information, see Check for DeprecationWarning in your code.