Python

【Airflow】タスクのグループ化にはSubDAGではなくTaskGroupを使うべき

はじめに

この記事ではApache Airflowのバージョン2で追加されたTaskGroupについて記述しています。

TaskGroupとは

Airflowの複数のタスクをUI上で纏めて表示したり、DAGで設定する各タスクのデフォルト引数を上書きすることができます。
今回は引数の上書きなどの詳細は扱わず、簡単にグループ化の記法のみを紹介します。
ちなみに、タスクをグループとして表現できる別の方法としてSubDAGがありますが、使いづらくバグの原因となりやすかったのか現在は非推奨の機能になっており、いかなる場合でもTaskGroupを使うように推奨されています。

環境

Python3.10、Airflow 2.5.0を使用しました。

TaskGroupを表示してみた

サンプルコードの前にUI上での表示を見てみます。
全体としては以下のような形のDAGを作成しました。

group-1とgroup-2をそれぞれクリックするとグループ化されたタスクが展開されます。
group-2にはタスクだけでなく別のグループも入っていて入れ子構造になっています。

勿論、入れ子になっているグループも展開できます。

サンプルコード

以下が上記のDAGのコードになります。
具体的なタスクの中身は省略しています。
TaskFlow APIを利用した@taskなどのデコレータによる実装になっています。
task_idが重複しているように見えるかもしれませんが、TaskGroup内で作成したタスクについてはgroup_idがprefixとしてtask_idに付与されるため、重複していません。

import pendulum

from airflow.decorators import dag, task, task_group


def make_tasks(n): # タスクをn個作成する関数
    @task()
    def task_base():
        return
    
    return [task_base.override(task_id=f"task-{i}")() for i in range(n)]


@dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="Asia/Tokyo"),
    catchup=False,
    tags=[],
)
def dag(): # DAGの定義
    tasks = make_tasks(2)

    @task_group(group_id='group-1')
    def task_group_1(): # タスクグループの定義 1
        tasks = make_tasks(2)

    tg1 = task_group_1()

    @task_group(group_id='group-2')
    def task_group_2(): # タスクグループの定義 2
        tasks = make_tasks(3)

        @task_group(group_id='group-3')
        def task_group_3(): # タスクグループの定義 3
            tasks = make_tasks(2)
            tasks[0] >> tasks[1]

        tg3 = task_group_3()
        tasks[0] >> tasks[2]
        tasks[1] >> tg3 >> tasks[2]

    tg2 = task_group_2()

    tasks[0] >> tg1 >> tg2 >> tasks[1]

dag()

おわりに

SubDAGと比べるとTaskGroupは変な副作用もなくずっと分かりやすくて使いやすいです。
今回はTaskGroupに関する日本語の情報が少なかったので記事を書いてみました。
お役に立てれば幸いです。

参考

DAGs — Airflow Documentation

Y.N
GRIでデータ分析やアルゴリズム開発、ForecastFlowの開発に携わっています。