Spark.sql.shuffle.partitions
- 이 변수 하나로 다양한 상황의 shuffling을 해결하기는 쉽지 않음
- MapReduce 세상에서 mapreduce.job.reduces와 동일
- 적은 수의 Partition은 병렬성을 낮추고 OOM과 disk spill의 가능성을 높임 -> processing 시간이 오래 걸림
- 많은 수의 Partition은 task scheduler와 task 생성과 관련된 오버헤드가 생기며 너무 흔한 네트워크 I/O 요청으로 병목 초래
만약 Spark Engine Optimizer가 알아서 Partition의 수를 결정할 수 있다면? -> AQE의 목적
AQE
"Dynamic query optimization that happens in the middle of query execution based on runtime statistics"
그러면 언제 이런 실행시간 통계 정보를 뽑고 최적화 방식에 변경을 줄 수 있는 최적의 시점인가?
Query -> Job -> Stage -> Task : stage마다 발생!
Stage가 가장 좋은 최적화 방식 변경 포인트
- shuffling/Broadcasting이 Job을 stage들로 나눔
- 따라서 가장 좋은 시점이며 또한 Partition의 수와 크기 정보들도 알 수 있음
- 또한 이 때 중간 결과들이 materialize됨
SELECT sku, SUM(price) sales From order GROUP BY sku;
AQE가 필요한 경우들
- shuffling 후에 partition의 수를 동적으로 조정해줄 때 (partition을 많이 만들어 놓고 줄여나가는 방식)
- join되는 방법을 변경할 때
- skew된 조인을 최적화 할 때
AQE의 동작 방식
- stage DAG를 순차적으로 실행
- 필요하면 다시 실행하거나 쿼리 플랜 변경
- 매번 새로운 최정화 기회가 있는지 조사
Dynamically coalescing shuffle partitions
- 필요성
- 적당한 파티션의 크기와 수는 성능에 지대한 영향을 끼침
- 너무 많은 수의 작은 파티션
- 스케줄러 오버헤드
- 태스크 준비 오버헤드
- 비효율적인 I/O (파일시스템/네트워크)
- spark.sql.shuffle.partitions라는 하나의 변수로는 불충분
- 동작방식
- 내부적으로 많은 수의 파티션을 일부러 생성
- spark.sql.adaptive.coalescePartitions.initialPartitionNum (200)
- 매 Stage가 종료될 때 필요하다면 자동으로 Coalesce 수행
- spark.sql.adaptive.coalescePartitions.enabled
- 설정에 따라 파티션의 크기는 최소 크기 혹은 목표 크기를 맞추려 동작
- spark.sql.adaptive.advisoryPartitionSizeBytes
- spark.sql.adaptive.coalecesPartitions.minPartitionSize
- 무엇을 쓸지는 spark.sql.adaptive.coalescePartitions.parallelismFirst에 의해 결정
- 내부적으로 많은 수의 파티션을 일부러 생성
Dynamically switching join strategies
- 필요성
- static Query Plan이 여러 이유로 BHJ (Broadcast Hash Join) 기회를 놓친 경우
- 조인대상 DataFrame들에 대한 통계 정보 부족 (필터링 등)
- UDF가 사용된 경우
- AQE의 해법
- Runtime 통계정보를 바탕으로 조인 전략을 변경
- 이는 stage들이 끝나고 조인되기 전에 다시 쿼리 플래닝을 수행
- 아래 두 가지 옵션이 존재
- Broadcast Join (추천)
- Shuffle Hash Join
- Runtime 통계정보를 바탕으로 조인 전략을 변경
- static Query Plan이 여러 이유로 BHJ (Broadcast Hash Join) 기회를 놓친 경우
- 동작방식
반응형