Task Group of Apache Airflow in 5 minutes

Akshay Thakare
4 min readApr 27, 2023

--

Introduction

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

Earlier in Apache Airflow complex DAG written in Sub DAG format and its worst concept, But after Apache Airflow 2.0 complex DAG was shifted to TaskGroup and it is so easy to monitoring and also writing.

For example ( SubDagOperator ) :

import datetime
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator

DAG_NAME = "example_subdag_operator"

with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2023, 04, 26),
schedule="@once",
tags=["example"],
) as dag:

start = EmptyOperator(
task_id="start",
)

section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
)

some_other_task = EmptyOperator(
task_id="some-other-task",
)

section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)

end = EmptyOperator(
task_id="end",
)

start >> section_1 >> some_other_task >> section_2 >> end

Output :

Apache Airflow Sub DAG Operator without zoom
Zoom into Sub DAG
Apache Airflow Sub DAG Operator with zoom

Create Simple task groups

TaskGroup was coming in Apache Airflow 2.0 and it is easy to write. To use task groups, run the following import statement:

from airflow.utils.task_group import TaskGroup

For your first example, you’ll instantiate a task group using a with statement and provide a group_id. Inside your task group, you'll define your two tasks, Section_1 and Section_2, and their respective dependencies

from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup

#firstly define default arguments like owner name,dag start_date ,etc....
args = {
'owner': 'airflow',
'start_date': datetime(2023, 4, 26),
'retries': 2,
'depends_on_past': False,
'retry_delay': timedelta(seconds=5),
}

#Creating Main DAG with help of With Clause
with DAG("Simple_TaskGroup_Example",default_args=args) as dag:

start=DummyOperator(task_id="Start")
end=DummyOperator(task_id="End")


with TaskGroup("Section_1",tooltip="TASK_GROUP_EV_description") as Section_1:

t1=BashOperator(
task_id="Section-1-Task-1",
bash_command='echo "Section-1-Task-1"'
)
t2=BashOperator(
task_id="Section-1-Task-2",
bash_command='echo "Section-1-Task-2"'
)

with TaskGroup("Section_2",tooltip="TASK_GROUP_EV_description") as Section_2:

t1=BashOperator(
task_id="Section-2-Task-1",
bash_command='echo "Section-2-Task-1"'
)
t2=BashOperator(
task_id="Section-2-Task-2",
bash_command='echo "Section-2-Task-2"'
)


#serially run TaskGroup DAG in following dependencies
#start>>Section_1>>Section_2>>end

#Parallel run TaskGroup DAG in following dependencies
start>>Section_1>>end
start>>Section_2>>end

Output :

Tree view
Graph view
After Zoom Graph view

Note : u can replace DummyOperator/BashOperator to another Operator like SSHOperator, PythonOperator, etc.

Create Nested Task Group

In Airflow 2.0, you can use the Nested TaskGroup DAG to create multiple TaskGroup inside a TaskGroup’s like Nested .Tasks defined within a TaskGroup block are still part of the main DAG.

from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.utils.task_group import TaskGroup

args = {
'owner': 'airflow',
'start_date': datetime(2023, 4, 26),
'retries': 2,
'depends_on_past': False,
'retry_delay': timedelta(seconds=10)
}


with DAG("Nested_Task_Group_Example",default_args=args) as dag:

start=DummyOperator(task_id="Start")
end=DummyOperator(task_id="End")

with TaskGroup("Simple_Task_Group",tooltip="task_group_description") as simple_task_group:
t1=DummyOperator(task_id="Task-1")
t2=DummyOperator(task_id="Task-2")
t2_1=DummyOperator(task_id="Task-2.1")
t2_2=DummyOperator(task_id="Task-2.2")
t3=DummyOperator(task_id="Task-3")

t2>>t2_1
t2>>t2_2

with TaskGroup("Nested_Task_Group",tooltip="Nested_task_group") as nested_task_group:
t4=DummyOperator(task_id="Task-4")
t5=DummyOperator(task_id="Task-5")
t5_1=DummyOperator(task_id="Task-5.1")
t5_2=DummyOperator(task_id="Task-5.2")
t6=DummyOperator(task_id="Task-6")

t5>>t5_1
t5>>t5_2

#note : Don't need to define dependencies of Nested_Task_Group they are automatically taken
start>>simple_task_group>>end
Tree view
Graph view
Tree view after zoom into TaskGroup
Tree view after zoom into Nested TaskGroup

Thank You

--

--

No responses yet