TaskGroup이란
태스크 그룹은 여러 개의 task를 하나의 그룹으로 묶어 관리하는 airflow 기능이다. 이를 통해 코드의 재사용성을 높이고 복잡한 워크플로우를 더 구조화된 방식으로 작성할 수 있게 된다.
예시 코드
태스크 그룹으로 설정하는 코드는 매우 간단하다 !
with TaskGroup(group_id='GROUP_ID', dag=dag) as GROUP_ID_PYTHON :
그룹 ID를 지정해주고 들여쓰기를 통해 태스크들을 묶어주면 된다.
GROUP_ID는 웹 UI상에서 보는 이름이고 as 뒤에 GROUP_ID_PYTHON은 python에서 사용하는 명칭이다.
from airflow.utils.task_group import TaskGroup
# 기본 dag 설정
default_args = {
'start_date': datetime(2023, 8, 24),
'timezone': 'Asia/Seoul',
'on_failure_callback': SlackAlert(channel='#final_project', token=failure_token).FailAlert
}
with DAG(
'api_to_rds_Dag',
default_args=default_args,
schedule_interval=timedelta(days=1),
) as dag:
start_task = EmptyOperator(
task_id = 'start_extraction'
)
# TaskGroup 설정
with TaskGroup(group_id='egyt_data_extraction', dag=dag) as egyt_data_extraction :
op_orgs = [Variable.get('BASIC_EGYT_URL'), 0] # 응급의료기관은 center_type == 0
call_basic_info_Egyt = PythonOperator(
task_id='call_basic_Egyt_data',
python_callable=LoadHpidInfo.CallAPI,
op_args=[op_orgs],
provide_context=True
)
load_basic_data_to_rds_egyt = PythonOperator(
task_id='load_basic_info_egyt',
python_callable=LoadHpidInfo.LoadBasicInfo,
provide_context=True
)
load_detail_info_Egyt = PythonOperator(
task_id='load_detail_info_Egyt',
python_callable=SaveConcurrentDB,
op_args=[Variable.get('DETAIL_EGYT_URL')],
provide_context=True
)
이렇게 작성하면 call_basic_Egyt_data, load_basic_info_egyt, load_detail_info_Egyt 태스크는 한 그룹으로 묶일 수 있다.
실제 완성 사진
반응형