Task Group of Apache Airflow in 5 minutes
Introduction
Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.
Earlier in Apache Airflow complex DAG written in Sub DAG format and its worst concept, But after Apache Airflow 2.0 complex DAG was shifted to TaskGroup and it is so easy to monitoring and also writing.
For example ( SubDagOperator ) :
import datetime
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.empty import EmptyOperator
from airflow.operators.subdag import SubDagOperator
DAG_NAME = "example_subdag_operator"
with DAG(
dag_id=DAG_NAME,
default_args={"retries": 2},
start_date=datetime.datetime(2023, 04, 26),
schedule="@once",
tags=["example"],
) as dag:
start = EmptyOperator(
task_id="start",
)
section_1 = SubDagOperator(
task_id="section-1",
subdag=subdag(DAG_NAME, "section-1", dag.default_args),
)
some_other_task = EmptyOperator(
task_id="some-other-task",
)
section_2 = SubDagOperator(
task_id="section-2",
subdag=subdag(DAG_NAME, "section-2", dag.default_args),
)
end = EmptyOperator(
task_id="end",
)
start >> section_1 >> some_other_task >> section_2 >> end
Output :
Create Simple task groups
TaskGroup was coming in Apache Airflow 2.0 and it is easy to write. To use task groups, run the following import statement:
from airflow.utils.task_group import TaskGroup
For your first example, you’ll instantiate a task group using a with
statement and provide a group_id
. Inside your task group, you'll define your two tasks, Section_1
and Section_2
, and their respective dependencies
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.task_group import TaskGroup
#firstly define default arguments like owner name,dag start_date ,etc....
args = {
'owner': 'airflow',
'start_date': datetime(2023, 4, 26),
'retries': 2,
'depends_on_past': False,
'retry_delay': timedelta(seconds=5),
}
#Creating Main DAG with help of With Clause
with DAG("Simple_TaskGroup_Example",default_args=args) as dag:
start=DummyOperator(task_id="Start")
end=DummyOperator(task_id="End")
with TaskGroup("Section_1",tooltip="TASK_GROUP_EV_description") as Section_1:
t1=BashOperator(
task_id="Section-1-Task-1",
bash_command='echo "Section-1-Task-1"'
)
t2=BashOperator(
task_id="Section-1-Task-2",
bash_command='echo "Section-1-Task-2"'
)
with TaskGroup("Section_2",tooltip="TASK_GROUP_EV_description") as Section_2:
t1=BashOperator(
task_id="Section-2-Task-1",
bash_command='echo "Section-2-Task-1"'
)
t2=BashOperator(
task_id="Section-2-Task-2",
bash_command='echo "Section-2-Task-2"'
)
#serially run TaskGroup DAG in following dependencies
#start>>Section_1>>Section_2>>end
#Parallel run TaskGroup DAG in following dependencies
start>>Section_1>>end
start>>Section_2>>end
Output :
Note : u can replace DummyOperator/BashOperator to another Operator like SSHOperator, PythonOperator, etc.
Create Nested Task Group
In Airflow 2.0, you can use the Nested TaskGroup DAG to create multiple TaskGroup inside a TaskGroup’s like Nested .Tasks defined within a TaskGroup block are still part of the main DAG.
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from airflow.utils.task_group import TaskGroup
args = {
'owner': 'airflow',
'start_date': datetime(2023, 4, 26),
'retries': 2,
'depends_on_past': False,
'retry_delay': timedelta(seconds=10)
}
with DAG("Nested_Task_Group_Example",default_args=args) as dag:
start=DummyOperator(task_id="Start")
end=DummyOperator(task_id="End")
with TaskGroup("Simple_Task_Group",tooltip="task_group_description") as simple_task_group:
t1=DummyOperator(task_id="Task-1")
t2=DummyOperator(task_id="Task-2")
t2_1=DummyOperator(task_id="Task-2.1")
t2_2=DummyOperator(task_id="Task-2.2")
t3=DummyOperator(task_id="Task-3")
t2>>t2_1
t2>>t2_2
with TaskGroup("Nested_Task_Group",tooltip="Nested_task_group") as nested_task_group:
t4=DummyOperator(task_id="Task-4")
t5=DummyOperator(task_id="Task-5")
t5_1=DummyOperator(task_id="Task-5.1")
t5_2=DummyOperator(task_id="Task-5.2")
t6=DummyOperator(task_id="Task-6")
t5>>t5_1
t5>>t5_2
#note : Don't need to define dependencies of Nested_Task_Group they are automatically taken
start>>simple_task_group>>end
Thank You