Dag를 실행하는 방법
- 주기적 실행: schedule로 지정
- 다른 Dag에 의해 트리거
- Explicit Trigger: Dag A가 분명하게 Dag B를 트리거 (TriggerDagOperator)
- Reactive Trigger: Dag B가 Dag A가 끝나기를 대기(ExtrnalTaskSensor)
- 알아두면 좋은 상황에 따라 다른 태스크 실행 방식들
- 조건에 따라 다른 태스크로 분기 (BranchPythonOperator)
- 과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnlyOperator)
- 앞단 태스크들의 실행상황
- 어떤 경우에는 앞단이 실패해도 동작해야 하는 경우가 있을 수 있음
Explicit trigger vs Reactive trigger
- Explicit trigger
- TriggerDagOperator
- DAG A가 명시적으로 DAG B를 트리거
- Reactive trigger
- ExternalTaskSensor
- DAG B가 DAG A의 태스크가 끝나기를 대기
- 이 경우 DAG A는 이 사실을 모름
Sensor
- Sensor는 특정 조건이 충족될 때까지 대기하는 Operator
- Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용
- Airflow는 몇 가지 내장 Sensor를 제공
- FileSensor: 지정된 위치에 파일이 생길 때까지 대기
- HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기
- SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
- TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지
- ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기
- 주기적으로 poke를 하는 것
- worker를 하나 붙잡고 poke간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재: mode
- mode의 값은 reschedule 혹은 poke가 됨
- worker를 하나 붙잡고 poke간에 sleep를 할지 아니면 worker를 릴리스하고 다시 잡아서 poke를 할지 결정해주는 파라미터가 존재: mode
ExternalTaskSensor
- DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크함
- 먼저 동일한 schedule_interval을 사용
- 이 경우 두 태스크들의 Execution Date이 동일해야 함. 아니면 매칭이 안 됨!
- 만일 DAG A와 DAG B가 서로 다른 schedule interval을 갖는다면?
- 예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?
- execution_delta를 사용
- execution_date_fn을 사용하면 조금더 복잡하게 컨트롤 가능
- 만일 두개의 DAG가 서로 다른 frequency를 갖고 있다면 이 경우 ExternalTaskSensor는 사용 불가
- 예를 들어 DAG A가 DAG B보다 5분 먼저 실행된다면?
- 컨트롤이 복잡하기 때문에 추천하지 않음
BranchPythonOperator
- 상황에 따라 뒤에 실행되어야 할 태스크를 동적으로 결정해주는 오퍼레이터
- 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
- TriggerDagOperator 앞에 이 오퍼레이터를 사용하는 경우도 있음
- 개발 중일 때와 아닐 때를 나눠서 작성하기 좋음
LatestOnlyOperator
- Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
- 현재 시간이 지금 태스크가 처리하는 execution_date보다 미래이고 다음 execution_date 보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단됨
- 오늘 일자를 기준으로 실행할 때만 뒤로 넘어갔으면 할 때 사용하기 좋음
- t1 >> t3 >> [t2,t4]
Trigger Rules (airflow.utils.tirgger_rule.TriggerRule)
- Upstream 태스크의 성공실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면?
- 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가
- Operator에 trigger_rule이란 파라미터로 결정 가능
- trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능
- all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, nonde_failed_min_one_success
Dynamic Dag
- 템플릿과 YAML을 기반으로 DAG를 동적으로 만들어보자
- Jinja를 기반으로 DAG 자체의 템플릿을 디자인하고 YAML을 통해 앞서 만든 템플릿에 파라미터를 제공
- 이를 통해 비슷한 DAG를 계속해서 매뉴얼하게 개발하는 것을 방지
- DAG를 계속해서 만드는 것과 한 DAG안에서 태스크를 늘리는 것 사이의 밸런스 필요
- 오너가 다르거나 태스크의 수가 너무 커지는 경우 DAG를 복제해 나가는 것이 더 좋음
반응형