|
| 1 | +title: Apache Airflow testing with Pytest |
| 2 | +--- |
| 3 | +categories: |
| 4 | +airflow |
| 5 | +cc-catalog |
| 6 | +cc-search |
| 7 | +open-source |
| 8 | +product |
| 9 | +testing |
| 10 | +--- |
| 11 | +author: mathemancer |
| 12 | +--- |
| 13 | +pub_date: 2020-01-23 |
| 14 | +--- |
| 15 | +body: |
| 16 | + |
| 17 | +CC Catalog is a project that gathers information about images from around the |
| 18 | +internet, and stores the information so that these images can eventually be |
| 19 | +indexed in [CC Search][cc_search]. A portion of the process is directed by |
| 20 | +[Apache Airflow][airflow], which is a tool commonly used to organize workflows |
| 21 | +and data pipelines. |
| 22 | + |
| 23 | +The nature of Airflow leads to some particular challenges when it comes to |
| 24 | +testing, and special care must be taken to make tests independent from the |
| 25 | +global state of the system where they are run. This blog post will describe a |
| 26 | +few of the challenges we faced when writing tests for Airflow jobs, and some |
| 27 | +tricks we used to solve those challenges. |
| 28 | + |
| 29 | +[cc_search]: https://ccsearch.creativecommons.org/ |
| 30 | +[airflow]: https://airflow.apache.org/ |
| 31 | + |
| 32 | +## Brief description of Apache Airflow |
| 33 | + |
| 34 | +Apache Airflow is an open source piece of software that loads Directed Acyclic |
| 35 | +Graphs (DAGs) defined via python files. The DAG is what defines a given |
| 36 | +workflow. The nodes are pieces of jobs that need to be accomplished, and the |
| 37 | +directed edges of the graph define dependencies between the various pieces. By |
| 38 | +default, the Airflow daemon only looks for DAGs to load from a global location |
| 39 | +in the user's home folder: `~/airflow/dags/`. When a DAG is 'run', i.e., the |
| 40 | +tasks defined by the nodes of the DAG are each performed in the order defined by |
| 41 | +the directed edges of the DAG, the Airflow daemon stores information about the |
| 42 | +dag run in `~/airflow/`. The daemon also stores general information about what |
| 43 | +DAGs exist on the system, and all of their current statuses in that directory. |
| 44 | +For more details, please see [the documentation][airflow_docs_top] |
| 45 | + |
| 46 | +[airflow_docs_top]: https://airflow.apache.org/docs/stable/ |
| 47 | + |
| 48 | +## Challenge: Localize Airflow to the project directory |
| 49 | + |
| 50 | +Even when installed using `pip` within a [`virtualenv`][virtualenv] environment, |
| 51 | +all airflow commands will be run against the default locations in the user's |
| 52 | +home directory. In particular, if you want to test a DAG from your project |
| 53 | +directory, the method given in the [Airflow documentation][airflow_docs] is to |
| 54 | +copy the dag into the default location `~/airflow/dags/`, and use the |
| 55 | +command-line `airflow` tool to run the tasks defined by the nodes. The |
| 56 | +information about success and failure of the tests will be stored by the Airflow |
| 57 | +daemon in the `~/airflow/` directory. We'd rather keep all input and output |
| 58 | +from our tests to the project directory instead. This helps avoid any side |
| 59 | +effects which might arise by running tests for different projects, and also |
| 60 | +ensures that tests can't affect anything in the default directory, which may be |
| 61 | +used for production in many cases. |
| 62 | + |
| 63 | +The solution is to choose a directory in your project, and set the environment |
| 64 | +variable `$AIRFLOW_HOME` whenever you run the tests, or use the `airflow` |
| 65 | +command on the project DAGs. I recommend you add the command |
| 66 | +```bash |
| 67 | +export AIRFLOW_HOME=/your/desired/full/path/ |
| 68 | +``` |
| 69 | +to a script (ours is called `env.sh`) that will be run in any shell dealing with |
| 70 | +the 'localized' Airflow instance, because forgetting to set the variable for |
| 71 | +even one `airflow` command will corrupt the DAG states stored in the global |
| 72 | +area. Note that setting this variable is necessary even when running in a |
| 73 | +`virtualenv` environment. |
| 74 | + |
| 75 | +Now that you have `$AIRFLOW_HOME` set, you'll likely want to load some DAGs that |
| 76 | +you've written. This is made easier if you put the files defining them into a |
| 77 | +`dags` directory in the directory denoted by `$AIRFLOW_HOME`. I.e., it's wise |
| 78 | +to structure the project sub-directory dealing with Airflow and Airflow DAGs |
| 79 | +similarly to the default location, but in your project directory. At this |
| 80 | +point, you should have some `$AIRFLOW_HOME` directory as a subdirectory of your |
| 81 | +project directory, and then some `$AIRFLOW_HOME/dags` directory, where you keep |
| 82 | +any python files defining Airflow DAGs, and their dependencies. Another |
| 83 | +advantage of this structure is it's likely the directory structure you'll use in |
| 84 | +production, and replicating simplifies deployment. |
| 85 | + |
| 86 | +Finally, Airflow will leave a number of files in the `$AIRFLOW_HOME` directory |
| 87 | +which you are not likely to want to track in source control (e.g., `git`). |
| 88 | +These files are: |
| 89 | + |
| 90 | +* `$AIRFLOW_HOME/airflow.cfg` |
| 91 | +* `$AIRFLOW_HOME/airflow.db` |
| 92 | +* `$AIRFLOW_HOME/logs/` |
| 93 | +* `$AIRFLOW_HOME/unittests.cfg` |
| 94 | + |
| 95 | +Add these files to `.gitignore` or the equivalent. |
| 96 | + |
| 97 | +[virtualenv]: https://github.com/pypa/virtualenv |
| 98 | +[airflow_docs]: https://airflow.apache.org/docs/stable/tutorial.html#testing |
| 99 | + |
| 100 | +## Smoketesting: Can the Airflow daemon load the DAGs? |
| 101 | + |
| 102 | +Note that we're using `pytest` for our unit testing, and so most examples assume |
| 103 | +this. |
| 104 | + |
| 105 | +The most basic test you'll want is to determine whether your DAGs can load |
| 106 | +without errors. To do this, you can use the following function: |
| 107 | + |
| 108 | +```python |
| 109 | +from airflow.models import DagBag |
| 110 | + |
| 111 | +def test_dags_load_with_no_errors(): |
| 112 | + dag_bag = DagBag(include_examples=False) |
| 113 | + dag_bag.process_file('common_api_workflows.py') |
| 114 | + assert len(dag_bag.import_errors) == 0 |
| 115 | +``` |
| 116 | + |
| 117 | +We initialize a `DagBag` (this loads DAG files). With the `process_file` method, |
| 118 | +we instruct the Airflow daemon to attempt to load any DAGs defined in the |
| 119 | +`common_api_workflows.py` file. We then check to make sure loading the DAGs |
| 120 | +didn't produce any errors. |
| 121 | + |
| 122 | +## Hint: Use functions to create DAGs. |
| 123 | + |
| 124 | +This will increase testability. You can test the function, bypassing the need to |
| 125 | +load the DAG into the `DagBag` (except when you're actually testing that it |
| 126 | +*can* be loaded). This may seem obvious, but none of the Airflow documentation |
| 127 | +uses this pattern. Here is an example of a function that creates a simple dag, |
| 128 | +and a test of the function: |
| 129 | + |
| 130 | +```python |
| 131 | +from airflow import DAG |
| 132 | +from airflow.operators.bash_operator import BashOperator |
| 133 | + |
| 134 | +def create_dag( |
| 135 | + source, |
| 136 | + script_location, |
| 137 | + dag_id, |
| 138 | + crontab_str=None, |
| 139 | + default_args=DAG_DEFAULT_ARGS): |
| 140 | + |
| 141 | + dag = DAG( |
| 142 | + dag_id=dag_id, |
| 143 | + default_args=default_args, |
| 144 | + schedule_interval=crontab_str, |
| 145 | + catchup=False |
| 146 | + ) |
| 147 | + |
| 148 | + with dag: |
| 149 | + start_task = BashOperator( |
| 150 | + task_id='{}_{}'.format(source, status), |
| 151 | + bash_command='echo Starting {} workflow'.format(status), |
| 152 | + dag=dag |
| 153 | + ) |
| 154 | + |
| 155 | + run_task = BashOperator( |
| 156 | + task_id='get_{}_images'.format(source), |
| 157 | + bash_command='python {} --mode default'.format(script_location), |
| 158 | + dag=dag |
| 159 | + ) |
| 160 | + |
| 161 | + start_task >> run_task |
| 162 | + |
| 163 | + return dag |
| 164 | + |
| 165 | +def test_create_dag_creates_correct_dependencies(): |
| 166 | + dag = create_dag( |
| 167 | + 'test_source', |
| 168 | + 'test_script_location', |
| 169 | + 'test_dag_id' |
| 170 | + ) |
| 171 | + start_id = 'test_source_starting' |
| 172 | + run_id = 'get_test_source_images' |
| 173 | + start_task = dag.get_task(start_id) |
| 174 | + assert start_task.upstream_task_ids == set() |
| 175 | + assert start_task.downstream_task_ids == set([run_id]) |
| 176 | + run_task = dag.get_task(run_id) |
| 177 | + assert run_task.upstream_task_ids == set([start_id]) |
| 178 | + assert run_task.downstream_task_ids == set([]) |
| 179 | +``` |
| 180 | + |
| 181 | +Here, we assume that `DAG_DEFAULT_ARGS` is defined earlier in the file. See the |
| 182 | +Airflow documentation for details about default DAG arguments. Now, this |
| 183 | +function is testable (great!) but it doesn't acutally make the DAG it creates |
| 184 | +known to the Airflow daemon. To do that, we have to create the created dag into |
| 185 | +the global scope of the module defined by the file, which can be done with the |
| 186 | +following snippet: |
| 187 | +```python |
| 188 | +globals()[dag_id] = create_dag( |
| 189 | + source, |
| 190 | + script_location, |
| 191 | + dag_id |
| 192 | +) |
| 193 | +``` |
| 194 | +Here, it's assumed that `source`, `script_location`, and `dag_id` are defined |
| 195 | +earlier in the python file. |
| 196 | + |
| 197 | +We hope that these hints are helpful to the reader. For more, and for the |
| 198 | +context around the snippets shown here, please take a look at |
| 199 | +[the repo][frozen_cccatalog] |
| 200 | + |
| 201 | +[frozen_cccatalog]: https://github.com/creativecommons/cccatalog/tree/c4b80600eb5695cc294e1791ba90bdc3a408b7b9/src/cc_catalog_airflow |
0 commit comments