Infra/Airflow

airflow에서 TaskGroup 적용하기

향식이 2023. 8. 21. 23:03

TaskGroup이란

태스크 그룹은 여러 개의 task를 하나의 그룹으로 묶어 관리하는 airflow 기능이다. 이를 통해 코드의 재사용성을 높이고 복잡한 워크플로우를 더 구조화된 방식으로 작성할 수 있게 된다.

 

예시 코드

태스크 그룹으로 설정하는 코드는 매우 간단하다 !

with TaskGroup(group_id='GROUP_ID', dag=dag) as GROUP_ID_PYTHON :

그룹 ID를 지정해주고 들여쓰기를 통해 태스크들을 묶어주면 된다.

GROUP_ID는 웹 UI상에서 보는 이름이고 as 뒤에 GROUP_ID_PYTHON은 python에서 사용하는 명칭이다.  

 

from airflow.utils.task_group import TaskGroup

# 기본 dag 설정
default_args = {
    'start_date': datetime(2023, 8, 24),
    'timezone': 'Asia/Seoul',
    'on_failure_callback': SlackAlert(channel='#final_project', token=failure_token).FailAlert
}

with DAG(
    'api_to_rds_Dag',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
) as dag:

    start_task = EmptyOperator(
        task_id = 'start_extraction'
    )

    # TaskGroup 설정
    with TaskGroup(group_id='egyt_data_extraction', dag=dag) as egyt_data_extraction :

        op_orgs = [Variable.get('BASIC_EGYT_URL'), 0] # 응급의료기관은 center_type == 0
        call_basic_info_Egyt = PythonOperator(
            task_id='call_basic_Egyt_data',
        python_callable=LoadHpidInfo.CallAPI,
        op_args=[op_orgs],
        provide_context=True
        )
        
        load_basic_data_to_rds_egyt = PythonOperator(
            task_id='load_basic_info_egyt',
            python_callable=LoadHpidInfo.LoadBasicInfo,
            provide_context=True
            )
        
        load_detail_info_Egyt = PythonOperator(
            task_id='load_detail_info_Egyt',
            python_callable=SaveConcurrentDB,
            op_args=[Variable.get('DETAIL_EGYT_URL')],
            provide_context=True
            )

 

이렇게 작성하면 call_basic_Egyt_data, load_basic_info_egyt, load_detail_info_Egyt 태스크는 한 그룹으로 묶일 수 있다. 

실제 완성 사진

airflow Dag taskgroup

반응형