-
스파크 - 성능 최적화하기, 리팩토링 practice[IT] 공부하는 개발자/Data engineering 2023. 2. 26. 19:52
스파크 분산 데이터 컬렉션
스파크에서 발생할 수 있는 성능 저하 이슈,
성능 개선 방법에 대해 정리해 보았다.
데이터 콜렉션의 비교
DataFrame과 Dataset
스파크는 데이터를 쪼개서 여러 개의 Executor nodes에 분산저장한다.
이것이 가능하려면 데이터는 분산 데이터 콜렉션에 저장되어 있어야 하는데,
스파크에서 사용되는 분산 콜렉션 API는 다음 세 가지이다.
- DataFrame
- Dataset
- SQL 테이블, 뷰
유형 DataFrame Dataset 데이터 타입 검증 시기 런타임에서 확인 컴파일 타임에 확인 지원 모두 지원 JVM 기반의 언어인 스칼라, 자바에서만 지원 Row 접근 Row 타입을 직렬화된 바이너리 구조로 변환
스파크의 최적화된 내부 포맷: 일반적으로 가장 빠름Row 타입을 사용자 정의 데이터 타입으로 변환
사용자에게 더 많은 유연성을 제공하지만 성능이 나빠짐DataFrame 을 사용하는 것과 Dataset을 사용하는 것 사이에는 유의미한 성능차이가 있어서,
어느 쪽을 선택하든
타입 안정성과 성능 둘 중 하나는 반드시 희생하게 된다.
성능이 나쁜데도 Dataset을 써야 하는 상황이란 어떤 것일까?
1. 정확도와 방어적 코드를 가장 중요시한다면 성능을 희생하고 Dataset을 사용하는 것이 좋은 선택이 될 수도 있다. (type-safe)
2. 혹은 DataFrame 에서 제공하지 않는 단일 함수로 인코딩 하고 싶을 때
상황에 따라 답은 달라지겠으나,
나는 RawData 를 읽어올 때에 Dataset으로 읽어온 후,
DataFrame으로 변환하여 분산처리를 모두 수행한 후에
최종적으로 다시 Dataset 으로 변환하여 파일으로 출력하는 것을 선호한다.
이렇게 하여 어느정도 성능을 확보하고 타입 안정성도 (뒤늦게나마ㅜ) 확보하기 위해서..?
Data Frame 정적 변수로 정의하기
DataFrame 은 Row 타입의 레코드와 칼럼으로 구성된다.
DataFrame의 파티셔닝은 DataFrame 이 클러스터에서 물리적으로 배치되는 형태를 정의한다.
이 데이터 집합이 어떻게 쪼개져서 어디 노드에 보내질지를 정의한다는 뜻이다.
로우가 추가된 DataFrame 을 참조하려면 새롭게 만들어진 DataFrame 객체를 사용해야 한다.
나는 요런 건... 장애나 버그로 이어질 수 있다고 생각하기 때문에
스칼라로 작성할 때에는 가능한..!! 정적 변수(val)로 정의해서 작성한다.
그러면 변경이 일어날 때마다 매번 새로운 객체로 작성되게 된다.
(DataFrame을 뷰로 만들거나 테이블로 등록할 때는 상관없음)
DataFrame의 최적화
1. sortWithinPartitions
DataFrame의 가장 잘 알려진 최적화 기법은, 트랜스포메이션을 처리하기 전에 파티션별 정렬을 수행하는 것이다.
파티션별 정렬은 sortWithinPartitions 메서드로 할 수 있다.
spark.read.load("file.json").sortWithinPartitions("count")
2. repartition
또 다른 최적화 기법은 자주 필터링하는 칼럼을 기준으로 데이터를 분할하는 것이다.
이를 통해 클러스터 전반의 물리적 데이터 구성을 제어할 수 있다.
repartition 메서드를 호출하면 전체 데이터를 셔플 한다.
사용자가 repartition() 메서드를 호출하여 파티션을 변경하면, Spark는 해당 작업에서 사용자가 명시적으로 지정한 파티션 구성을 따르지만, 이후에 다른 작업이 실행될 때는 Spark의 내부 옵티마이저에 의해 파티션 구성이 다시 조정될 수 있다. 본래 Spark는 최적의 성능을 위해 파티션 구성을 동적으로 조정하기 때문이다. 만일 사용자가 지정한 파티션 구성이 유지되길 원한다면, 해당 작업 이후에 cache() 또는 persist() 메서드를 호출하여 결과를 캐시해야 한다.
repartition은 향후에 사용할 파티션 수가 현재 파티션 수보다 많거나 칼럼을 기준으로 파티션을 만드는 경우에만 사용해야 한다.
특정 칼럼을 기준으로 자주 필터링한다면 자주 필터링되는 칼럼을 기준으로 파티션을 재분배하는 것이 좋다.
왜 리파티션을 하면 성능이 더 좋아질까?
Spark에서 repartition() 메서드를 사용하여 데이터셋을 재분할(repartition)하면 성능이 개선되는 이유는 다음과 같습니다.
- 데이터 분산 균형 조정
repartition() 메서드를 사용하면 데이터셋의 파티션 수를 늘리거나 줄여서 분산 균형을 맞출 수 있습니다. 파티션의 균형이 맞지 않으면 데이터 처리 시간이 불균형하게 분산되어 일부 노드가 오버로드될 수 있습니다. 이러한 상황은 성능 저하를 초래하므로, 데이터셋을 재분할하여 균형을 맞추면 성능이 향상됩니다.
- 데이터 선별
repartition() 메서드를 사용하면 파티션 수를 늘릴 때, 파티션별로 데이터를 선별하여 재분배합니다. 이 과정에서 데이터가 선별되어 파티션별로 분산되므로, 셔플링 작업이 최소화됩니다. 이는 전체 처리 시간을 줄이고, 성능을 향상합니다.
- 병렬성 증가
repartition() 메서드를 사용하면 데이터셋의 파티션 수를 늘려서, 병렬성을 증가시킬 수 있습니다. 병렬성은 동시에 처리 가능한 작업의 수를 뜻하며, 작업 수가 증가하면 처리 시간이 단축됩니다. 따라서 repartition() 메서드를 사용하여 병렬성을 증가시키면 성능이 개선됩니다.
그러나, repartition() 메서드를 사용하면 파티션 수가 많아질수록 메모리 사용량이 늘어나므로, 적절한 파티션 수를 선택하는 것이 중요합니다. 또한, 파티션 수를 늘리는 작업은 셔플링이 필요하므로, 데이터셋의 크기가 크면 성능 저하를 초래할 수 있습니다. 따라서, repartition() 메서드를 사용할 때는 데이터셋의 특성을 고려하여 적절한 파티션 수를 선택해야 합니다.꼬리에 꼬리를 무는.. 꼬꼬무 질문을.. 해보자면..
Q. 셔플링 작업이 최소화되면 왜 좋을까?
A. Executor 노드 간의 통신이 적어지므로 좋을 것이다. 파티션에서 동일한 값을 가지는 데이터는 한 노드에 모여있게 되므로..
Q. 리파티션시 주의점으로 적절한 파티션 수를 선택해야 한다는 말은.. 무슨 말일까?
A. 리파티션을 적용할 칼럼으로 executor 노드에 적절히 분배될 수 있는 칼럼을 골라야 한다는 뜻이다.
executor 노드의 개수와 칼럼 안의 unique value의 개수가 비슷할수록 좋을 것이다.
COUNTING practice
count 함수를 사용할 때에, Best practice는
select 구문에 표현식으로 작성하는 것이 아니라 agg 메서드에 작성하는 것이다.
select(count("colA")) 로 작성하지 말고
다음 예시처럼 그룹화하여 작성하자.
df.select(count(col("colB")).groupBy("colA") # NG !! df.groupBy("colA").agg(count("colB")) # RECOMENDED
챗봇에게 이유를 물어보았다.
Spark에서 counting query를 작성할 때, SELECT 구문에 표현식으로 작성하는 것보다 agg 메서드 안에 작성하는 것이 권장되는 이유는 다음과 같습니다.
- 최적화
agg 메서드를 사용하면 최적화가 수행됩니다. agg 메서드 안에 작성된 표현식은 Spark Catalyst Optimizer에 의해 최적화되어 실행 계획을 개선하므로, 실행 속도가 빨라집니다. SELECT 구문에 표현식으로 작성하는 경우 최적화가 수행되지 않으므로, 실행 속도가 느려질 수 있습니다.
- 타입 안정성
agg 메서드를 사용하면 표현식의 결과 타입을 명시적으로 지정할 수 있습니다. 따라서, 표현식의 결과가 예상과 다를 경우 타입 에러가 발생하여 빠른 디버깅이 가능합니다. 반면, SELECT 구문에 표현식으로 작성하는 경우에는 타입 안정성이 보장되지 않으므로, 런타임 오류가 발생할 가능성이 있습니다.
- 가독성
agg 메서드를 사용하면 여러 개의 집계 함수를 동시에 적용할 수 있으므로, 가독성이 높아집니다. 반면, SELECT 구문에 표현식으로 작성하는 경우에는 여러 개의 집계 함수를 적용하려면 복잡한 표현식을 작성해야 하므로 가독성이 떨어집니다.
따라서, counting query를 작성할 때에는 agg 메서드를 사용하여 집계 함수를 적용하는 것이 권장됩니다. 이를 통해 최적화, 타입 안정성, 가독성 등을 개선할 수 있습니다.
Action을 통한 Data cacheing
스파크에서 트랜스포메이션 쿼리는 액션 전까지는 수행되지 않는다.
데이터 프레임이 캐싱되지 않기 때문에,
간혹 외부의 데이터 read 가 섞여있는 경우 데이터프레임의 read 가 불필요하게 여러 번 수행되기도 경우도 있다.
이런 경우 메모리에 DataFrame 캐싱 작업을 수행하는 용도로 read 직후에 캐싱을 추가하여 성능을 개선할 수도 있다.
예시 코드를 보자..
df1 = spark.read.format("file") df2 = df1.groupby("colA").count().collect() df3 = df1.groupby("colB").count().collect()
df2, df3 은 df1이라는 공통 부모를 공유하기 때문에,
매번 df1 에서 했던 read 작업을 반복하게 된다.
하지만 중간에 df1의 데이터가 캐싱된다면, read라는 부담스러운 작업을 반복하지 않아도 된다.
df1 = spark.read.format("file") print("df1 count: " + str(df1.count())) df2 = df1.groupby("colA").count().collect() df3 = df1.groupby("colB").count().collect()
count()는 액션이므로 즉시 실행된다. 그리고 캐싱된다. df2, df3에서 read 작업은 추가로 발생하지 않는다..!
count() 대신 cache(), persist() 를 사용할 수도 있다.
cache()는 메모리 상황에 따라 일부만 캐싱되기도 하고,
persist()는 메모리, 디스크를 둘 다 캐시 영역으로 지정하기 때문에 전체를 다 강제 캐싱한다는 것이 차이이다.
권장 파일 포맷, 압축 포맷 사용
데이터 포맷은 스파크에 최적화된 parquet을 사용하는 것이 최선이다.
CSV는 스파크에서 파싱 속도가 아주 느리고 예외가 자주 발생한다.
따라서 데이터 형식을 선택할 수 있다면 CSV 대신 파케이를 선택하자.
- 스파크 최적화 여부
파케이는 데이터를 바이너리 파일에, 컬럼 지향 방식으로 저장하고,
사용하지 않는 데이터를 빠르게 건너뛸 수 있도록 통계를 함께 저장하기 때문에 Spark에 가장 최적화되어 있는 파일 형식이다.
Hive와 동시에 사용한다면 orc 도 권장된다.
-> orc OK
-> parquet OK
-> CSV NG
- 분할 가능한 확장자
또한 분할 가능한 포맷을 사용해야 한다.
분할 가능한 포맷이어야 Spark의 병렬처리로 인한 이점을 누릴 수 있기 때문이다.
분할이 불가능한 포맷을 사용한다면 하나의 Node 에서만 쓰기 작업을 하게 되겠지...?
-> orc OK
-> parquet OK
-> json NG
- 분할 가능한 압축 포맷
압축 포맷 역시 분할 가능 여부를 결정하는 요소이다.
ZIP 파일이나 TAR 파일은 분할 할 수 없다.
그래서 이 경우에도 하나의 Node 만 작업을 하게 된다.
하지만 gzip 이라면 분할할 수 있다
-> zip NG
-> tar NG
-> gzip OK
요약 : 최고의 조합은 parquet + gzip
테이블 파티셔닝/버켓팅
테이블 파티셔닝은 어떤 데이터를 어디에 저장할 것인지 제어할 수 있는 기능이다. Directory 별로 칼럼 데이터를 분리하여 저장하기 때문에, 특정 상황에서 필요한 칼럼의 데이터만 읽을 수 있다. 필터링을 자주 사용하는 테이블에서 사용할 수 있는 가장 손쉬운 최적화 방식이다. 예를 들어 지난주 데이터만 보고 싶을 때 날짜를 기준으로 파티션을 만들어 두면 전체 데이터를 보지 않고 물리적으로 지난주 데이터가 저장된 파티션에만 접근하여, 처리 속도를 높일 수 있다.
파티셔닝/버켓팅을 미리 해두면 조인 전에 발생하는 셔플을 미리 방지하여 데이터 접근 속도를 높일 수 있다.
예를 들어 날짜별 그룹연산을 하고 싶을 때에,
2월 10일의 데이터는 Executor A에 모여있고 2월 11일의 데이터는 Executor B에 미리 모여있다면,
그룹연산시 Executor A와 Executor B의 통신이 극히 줄어들어
속도가 훨씬 빨라질 것이다.
데이터의 물리적 재배치는 중요한 성능 튜닝 포인트이다.
파일 크기 관리
파일의 크기가 작으면 read 할때에 메타데이터에 관리부하가 발생한다. 스파크는 HDFS 기반이라 특히 작은 크기의 파일을 잘 다루지 못한다. 그러므로 나중의 read 작업을 위해, write 할 때에 미리 한 파일에 적당한 양의 row를 포함하도록 만들자. 파일당 5000개의 row를 포함하는 파일을 생성하는 것이 Best practice이다.
(그렇다고 파일의 크기가 너무 커지는 것도 좋지 않다. executor node의 개수만큼 적당히 쪼개서 read 도 병렬처리가 되게 해야 할 것이고, 또 파일이 너무 필요이상으로 커지면 필요한 것보다 너무 많은 데이터를 읽게 되기도 한다. 1일 치의 데이터만 필요한데 7일 치의 데이터가 한 파일에 들어있다면 6일 치 메모리를 괜히 더 쓰는 거니까..)
스파크 다른 포스팅 보기
2023.02.26 - [[IT] 공부하는 개발자/Data engineering] - 스파크는 무엇이고 왜 쓰는지? 스파크에 대해 알아보기
Reference
스파크 완벽 가이드 (한빛 미디어)
- 공부 후 잊어버리지 않기 위한 요약 포스팅입니다
'[IT] 공부하는 개발자 > Data engineering' 카테고리의 다른 글
Zeppelin 과 Notebook 비교, 어떤 것을 고를까? (1) 2023.05.18 [Streamlit] 설치 단계에서 발생한 이슈와 해결방법 정리 (0) 2023.04.16 스파크는 무엇이고 왜 쓰는지? 스파크에 대해 알아보기 (0) 2023.02.26 [R 회귀분석 예제] 야구선수 연봉에 영향을 미치는 요인 (Linear Regression) (3) 2020.06.21 [Kafka] 카프카란? 개념과 디자인 (0) 2020.04.11