-
[Kafka] 카프카의 실전 서비스 운영[IT] 공부하는 개발자/Data - ALL 2024. 9. 18. 10:22
2020.02.15 - [[IT] 공부하는 개발자/Open Source] - [Kafka] 카프카란? 개념과 디자인
최근에 카프카 관련 장애를 겪고 나서, 내가 정말 수박 겉핥기로 카프카를 알고 있었다는 걸 깨닫게 되었다. 그래서 조금 더 공부해서 위의 포스팅에 이어서 2편을 쓴다.
카프카 운영 옵션
클라이언트 공통 옵션
metadata.max.age.ms
- 클라이언트가 브로커로부터 토픽에 대한 메타데이터를 받아와 새로고침하는 시간
- default는 5분인데, 3분으로 줄여도 좋다.
bootstrap.servers
- 카프카 클러스터 주소
- 코드에 고정시키면 좋지 않음. 환경변수로 관리하는 것이 장애 대비에 좋음
key.deserializer, value.deserializer
브로커에 보낼 데이터를 바이트 배열로 직렬화하기 위해 사용할 클래스
- 프로듀서와 컨슈머는 동일한 직렬화 방식을 사용해야 한다
프로듀서 주요 옵션
min.insync.replicas최소 리플리케이션 팩터 수
acks
- 프로듀서가 토픽 리더에게 메시지를 보낸 후 받아야하는 승인(ack)의 수
- 작을 수록 성능이 좋고, 높을 수록 메시지 손실 가능성이 적다. (중요한 옵션)
- 예시
- acks = 0 이면 프로듀서는 서버로부터 응답을 기다리지 않는다 => 메시지 손실 가능성이 높지만 빠른 전송이 필요한 경우
- acks = 1 이면 파티션 리더가 데이터를 기록하는 것만 확인한다. => 메시지 손실 가능성이 적고 적당한 속도의 전송이 필요한 경우 (속도와 안전성을 확보할 수 있어서 가장 추천되는 설정이다)
- acks = all(-1)이면 {min.insync.replicas} 만큼의 파티션들에게 모두 응답을 받아야 한다. => 가장 강력한 보장
만약 acks=all 이면서 min.insync.replicas는 1이라면? replica가 리더파티션밖에 없으므로 acks=1인것과 다르지 않다!
아파치 카프카 문서에 의하면 손실 없는 메시지 전송을 위한 조건으로, acks=all & min.insync.replicas=2, replication factor=3 으로 권장하고 있다. 즉, min.insync.replicas 는 n개의 복제 숫자보다 1 적은 n-1으로 두어야 한다는 것이다. 그래야 1대 정도 브로커가 다운되더라도 에러 없이 메시지 전송이 계속 유지될 수 있다.
만약 acks=all 로 두면서 min.insync.replicas도 전체 replica 개수와 동일하게 두게 되면, 브로커 하나만 다운되더라도 카프카로 메시지를 보낼 수 없는 클러스터 전체 장애와 비슷한 상황이 발생하게 될지도....buffer.memory
프로듀서가 사용하는 임시 메모리 바이트
batch.size
프로듀서가 보내는 메시지 배치 사이즈 제한. 이 사이즈를 초과하면 {linger.ms}에 도달하지 못했더라도 메시지를 발송한다.
linger.ms
메시지 전송 제한 시간. 이 시간을 초과하면 {batch.size}에 도달하지 못했더라도 메시지를 발송한다.
max.request.size
실제 이 옵션을 통해 장애를 해결하였음. 메시지를 대량으로 처리해야 하는 상황이었는데 1048576(default: 1MB)에서 104857600(100MB)로 변경하여 발행속도를 기하급수적으로 증가시킬 수 있었다.
컨슈머 주요 옵션
fetch.min.bytes / fetch.max.bytes
한번에 가져올 수 있는 최소/최대 데이터 사이즈. 폴링한 데이터 사이즈가 {fetch.min.bytes}보다 작은 경우, 요청에 대해 응답하지 않고 데이터가 누적될 때까지 기다린다.
group.id
- 컨슈머가 속한 컨슈머 그룹 ID
- 코드에 고정시키기보다 배포 설정으로 빼놓는 것이 좋다. (장애 혹은 구조 변경시 변경할 일이 생길 수 있음)
enable.auto.commit
컨슈머가 메시지를 처리한 후 명시적으로 오프셋을 커밋하지 않아도 Kafka가 자동으로 커밋을 처리하는 옵션.
=> 메시지 손실 가능성을 0으로 만들려면 false 로 설정하여, 메시지 처리가 완료된 후에 컨슈머가 직접 커밋(수동 커밋)을 하도록 한다.
auto.offset.reset
- 초기 오프셋이 없거나 데이터 삭제로 오프셋이 사라진 경우, 다음 옵션으로 리셋한다
- earliest: 토픽의 0번 메시지부터 읽기 시작
- latest: 가장 최근 오프셋부터 읽기 시작
session.timeout.ms
브로커가 컨슈머가 살아있는 것으로 판단하는 타임 아웃 시간.
heartbeat.interval.ms
그룹 코디네이터에게 얼마나 자주 하트비트를 보낼 것인지 설정값. 일반적으로 {session.timeout.ms} 값의 3분의 1정도로 설정한다고 한다. default 값은 3초이다.
max.poll.interval.ms
컨슈머가 문제가 있다고 판단하는 메시지 처리 시간. 하트비트를 정상적으로 보내고 있더라도, 메시지 폴링을 처리하는 데에 이 시간 이상이 걸리면 컨슈머를 파티션에서 할당 해제하고 리밸런싱을 트리거한다. 메시지 한 건을 처리하는 시간이 아니라 폴링으로 끌어온 메시지 전체를 처리하는 시간이다. (default: 5분)
- 이것 역시 고정하는 것보다 코드에 설정값으로 빼두고, 상황에 따라 배포 옵션으로 변경할 수 있게하는 것이 좋다.
group.instance.id
Kafka에서 정적 컨슈머는 컨슈머 그룹 리밸런싱에 대한 부담을 줄이기 위해 도입된 개념이다. 일반적인 컨슈머는 그룹에 참여할 때마다 새로운 멤버로 취급되어 리밸런싱이 트리거되지만, 정적 컨슈머는 동일한 인스턴스 ID를 사용하여 그룹에 재참여할 때 이전 파티션이 그대로 할당된다.
- k8s 환경에서 컨슈머에 고유하면서 바뀌지 않는 instance id를 부여하기 위해 Statefulset를 이용한 인덱스명을 부여하는 것을 생각해 볼 수 있겠음. 예를 들어 consumer-1, consumer-2 이런 식으로 부여될 수 있도록
partition.assignment.strategy
컨슈머 파티션 할당 정책을 정할 수 있다. 리밸런싱은 몹시 비싼 작업이므로... 파티션 할당 정책으로 리밸런싱 시간을 줄인다던지 횟수를 줄여서 자원을 절약할 수 있다. 파티션 할당 정책은 1개 이상 지정하는 것도 가능하다. 카프카 코드를 보면 리스트로 받고 있다.
partition.assignment.strategy=org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.RoundRobinAssignor
예를 들어 위 코드는 파티션 할당 전략을 다중할당전략으로 설정한것이다. 그러면 컨슈머그룹은 기본적으로 파티션 할당 전략을 점진적 리밸런싱(1순위)를 채택하고, 1순위 전략이 모종의 사유(?)로 작동하지 않을 때에만 2순위로 지정한 전략을 채택한다. 여기서 2순위는 라운드 로빈이다.
기존 리밸런싱 방식은 Stop the world 방식으로 이루어져, 리밸런싱 도중에는 소비 중단이 발생한다. 모든 컨슈머가 파티션을 반환하므로 리밸런싱 시간도 길어진다. 점진적 리밸런싱은, 데이터 처리 중단없이 진행된다. 모든 파티션을 이전하는 것이 아니라, 점진적으로 파티션을 이전하는 방식이다. 그래서 메시지 컨슘이 끊기지 않는다는 장점이 있다.
커넥트 컨슈머 옵션 조정
다음은 카프카 커넥트 컨슈머에서 HDFS 스몰 파일 이슈를 해결하기 위해 사용한 옵션들이다.
rotate.schedule.interval.ms
파일 롤링 주기를 늘렸더니 스몰파일이 감소하였다.
tasks.max
파티션 별 task 개수를 줄였더니 스몰파일이 감소하였다.
성능 튜닝
파티션을 늘릴 때는 신중하게
1. 파티션이 많을 수록 더 많은 데이터를 동시에 처리할 수 있지만, 대신 브로커 복구 시간이 증가한다는 단점이 있다. 파티션들에 대해 모두 새로운 리더 선출을 해주어야 하기 때문이다. (1편 참조)
2. 하지만 파티션은 늘릴 수는 있어도 줄일 수는 없다. 파티션을 줄이려면 토픽을 삭제하는 것 외에는 방법이 없다. 따라서 처음에는 적게 운영하다가 늘리는 것이 좋다.
3. 파티션의 개수는 Broker skew를 피하기 위해 Broker의 배수만큼 지정하는 것이 좋다. 만약 Broker가 15대이고 Replica가 3인 토픽이 있다면, 파티션의 개수는 5의 배수로 설정해주어야, 모든 브로커에 균등하게 파티션이 배분될 것이다. (예: 파티션이 5일때는 리더파티션 5개와 팔로워파티션 10개, 총 15개의 파티션이 브로커 15대에 하나씩 배치될 수 있다.)
4. 파티션을 50개 이상으로 늘려서 성능 튜닝하는 것은 효과가 드라마틱하지 않다. 차라리 parallel consumer를 쓰는 것도 방법이다.
파티션 리더와 팔로워의 역할
모든 읽기와 쓰기는 파티션 리더에 의해서만 일어나고, 파티션 팔로워는 리더의 데이터를 그대로 리플리케이션만 한다.
리플리케이션 팩터의 default의 값은 3이다. 즉 파티션은 리더파티션 1개와 팔로워파티션 2개를 갖는것이 기본 값이다. 하지만 모든 토픽을 이렇게 운영하는 것보다, 데이터의 중요도에 따라 리플리케이션 팩터를 2혹은 3으로 나누어 운영하는 것이 더 효율적입니다. 모든 토픽이 완전한 데이터 정합성을 요구하는 것이 아니라면...
세션 타임아웃 설정
컨슈머 그룹 안에서 컨슈머들은 토픽의 파티션을 나눠 갖는다. 할당 받은 파티션이 바뀌기도 하는데 이것을 리밸런싱이라고 부른다. 리밸런싱이 일어날 때에는 컨슈머 그룹 전체가 메시지를 가져올 수 없다. 리밸런싱은 언제 일어날까?
1. 컨슈머 그룹 안에 새 컨슈머가 조인할 때
2. 컨슈머가 오랫동안 하트비트를 보내지 않아서 세션 타임아웃이 발생할 때. 브로커의 그룹 코디네이터는 해당 컨슈머가 다운되었다고 판단하여 리밸런스를 시작한다.
3. 컨슈머가 메시지 폴링을 너무 오래할 때
즉 세션 타임아웃은 리밸런스 빈도와 관련이 높다. 만약 타임아웃 시간을 낮게 설정한다면, 실패를 빨리 감지할 수 있지만, GC가 길어지는 등의 정상 동작에 의해서도 의도하지 않은 리밸런스가 계속 발생할 수 있다는 점도 유의해야 한다.
메시지 폴링 제한 시간은 실제 평균 메시지 처리 시간과 연관성 있게 두는 것이 좋다. polling 주기를 로그에서 살펴서 - 평소 20초라고 가정시, default 값인 5분이 아니라, 10배인 2분 정도로 변경할 수도 있다. 그 정도 시간이 걸리는 컨슈머를 그룹에서 제외하겠다는 뜻이다.
파티션 분산 임팩트
신규 브로커를 추가하는 작업을 한다고 가정해보자. 이 때 새 브로커에 토픽의 파티션들이 옮겨와야 할 것이다. (그게 브로커이니까..!) 하지만 파티션의 데이터가 10G이상 된다고 가정하면, 이동시키는 것이 네트워크 인터페이스의 사용량을 급증시키고 브로커에도 상당한 부담을 줄 수 있다. 가능한 안전하게 이 작업을 수행하려면..
1. 토픽의 사용량이 가장 적은 시간에 수행하기
2. 토픽의 보관 주기를 임시로 줄여서 사이즈를 축소시킨 후에 작업하기
jmx 모니터링 지표
Byte in rate
브로커 서버로 초당 들어오는 사이즈
Byte out rate
브로커 서버로 초당 나가는 사이즈
URP (Under replicated partitions)
현재 복제가 되고 있지 않은 파티션 수를 뜻한다 -> {min.insync.replicas} 보다 적게 복제가 되고 있으면 URP에 카운트한다.
- 0보다 크면 문제로 판단한다.
'[IT] 공부하는 개발자 > Data - ALL' 카테고리의 다른 글
Spark JAR에 대해 연구해보자... 너는 무엇을 하는 아이니 (0) 2024.12.12 데이터 레이크하우스 아키텍처 (3) 2024.12.10 [Hive] 하이브 핵심정리 (1) 2023.10.19 Zeppelin 과 Notebook 비교, 어떤 것을 고를까? (1) 2023.05.18 [Streamlit] 설치 단계에서 발생한 이슈와 해결방법 정리 (0) 2023.04.16