Making use of Python globals to dynamically create Airflow DAGs

Flávio Teixeira
3 min readJan 27, 2020

Creating a single dag dynamically, for example when you need a number of similar tasks to be ran at the same time, is simple. Just use any loop, create tasks, set their upstreams and voilà: the dag is ready.
Obs.: if you are not familiar with this, check this article.

Imagine now that you have a bunch of tasks that also need to run at the same time, but they need to be independent and one cannot wait for another to finish to start again. What would you do?

The first rational answer is to create a dag for each task. The problem with this solution is that you are forced to maintain a lot of files with duplicated code, so when you need to change anything (from new features to bug fixes) you need to check every single file for changes.

You can make use of globals to create a lot of DAGs in a single python file.

Before I start showing you this, I need to say that I do not encourage the use of globals in a daily basis. It can bring a lot of problems to your python codes, like concurrency issues, implicit coupling and zero access control, but for this case and using in a sane way, we are safe.

Python has a function called globals() that returns variables you can set and get values just like they were in a dictionary. After setting the values, you can even call the variables just like they were instantiated normally.

>>> globals()['__name__']
'__main__'
>>> globals()['test'] = 'test'
>>> globals()['test']
'test'
>>> test
'test'

Knowing how to use globals, the process of creating the DAGs dynamically is the same of creating tasks dynamically, but now the dag instantiation is included within the loop and all variables (tasks and dag) are going to be declared directly to a globals value.

for code in range(5):
globals()[f'dag_{code}'] = DAG(
dag_id=f'dag_{code}',
default_args=args,
schedule_interval='0/10 * * * *')
globals()[f'start_task_{code}'] = DummyOperator(
dag=globals()[f'dag_{code}'],
task_id=f'start_task')
globals()[f'end_task_{code}'] = DummyOperator(
dag=globals()[f'dag_{code}'],
task_id=f'end_task')
globals()[f'start_task_{code}'] >> globals()[f'end_task_{code}']

When the scheduler runs this file, the result will be this:

With this nature, the DAGs will have the same functionality, will run in parallel, share the same source code and will not be dependent on each other.

Some considerations:
- This is a workaround for very specific cases.
- If for any purpose you reduce the number of DAGs, Airflow will not delete additional old ones, this need to be done manually by clicking the trash icon next to the DAG. If you add more, Airflow will handle the creation.

The script used in this example can be found here.

Feel free to access my other repositories, I post a lot of snippets and personal projects that could help you!

Follow me here to receive weekly tips and contributions, mainly about data engineering.

Thanks for reading :)

--

--

Flávio Teixeira

Data engineer, gamer and addicted to technology. Currently working at Riot Games