일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Ajax
- 6603
- 따라하기
- 구슬탈출2
- 17136
- 미세먼지 안녕!
- 연산자 끼워넣기
- django
- 인스타
- 9095
- 부분수열의 합
- 색종이 붙이기
- 알고리즘
- 로또
- 다리 만들기2
- 17472
- 인스타그램
- 14888
- 백준
- 재귀
- 좋아요
- 16637
- 17143
- 괄호추가하기
- 장고
- 1182
- 14502
- 17144
- 댓글
- Java
- Today
- Total
Be a developer
4. Consumer 본문
Consumer
- Kafka Consumer들은 Consumer Group에 속한다.
- Consumer Group에 속한 consumer들은 같은 topic을 소비하고, 해당 topic의 서로 다른 partition을 분담해서 message를 소비한다.
- 4개의 partition을 소비하는 Consumer Group
- 한 Consumer Group의 두 Consumer가 분담하는 4개의 partition
- 각각 한 partition을 담당하는 4개의 Consumer
- partition 개수보다 더 많은 Consumer
- 새로운 Consumer Group 추가
Rebalancing
- 한 Consumer로부터 다른 Consumer로 partition 소유권을 이전하는 것.
- rebalancing을 통해 쉽고 안전하게 consumer를 추가하고 삭제할 수 있다.
- 하지만 rebalancing을 진행하는 동안 consumer들은 message를 읽을 수 없으므로 해당 Consumer Group 자체가 해당 시간 동안 사용 불가능한 상태가 된다.
- Group coordinator로 지정된 kafka broker에게 Consumer가 heartbeat을 전송하면, 자신이 속한 Consumer Group의 membership과 자신에게 할당된 partition 소유권을 유지할 수 있다.
- Consumer가 일정 시간 간격으로 heartbeat을 전송한다면, 그 Consumer는 살아있고, 잘 동작하며, 자신의 partition message를 처리 가능한 것으로 간주한다.
- 만일 Consumer가 session timeout이 경과될 때까지 heartbeat 전송을 중단하면, GroupCoordinator가 해당 Consumer를 중단된 것으로 간주하고 rebalancing을 시작시킨다.
- 또는 Consumer에 문제가 생겨 중단되어도 마찬가지로 rebalancing을 진행한다.
- Consumer가 정상적으로 종료될 때는 GroupCoordinator에게 떠난다는 것을 알려주면 되며, 이 때 GroupCoordinator는 처리 공백을 줄이기 위해 곧바로 rebalancing을 시작시킨다.
우선 예제를 따라서 가볍게 Consumer를 만들어서 돌려보았다.
consumer를 먼저 돌려 놓고(왜 먼저 돌렸는지는 아래 테스트에서 나온다.), producing을 하면 console에 로그가 찍힌다.
그리고 producing할 때마다 offset이 commit되는 것을 볼 수 있다.
kafka manager에서도 consumer를 볼 수 있다.
Commit과 offset
- poll() 메서드는 호출될 때마다 group의 consumer들이 아직 읽지 않은 record들을 반환
- consumer는 partition별로 자신이 읽는 record의 offset을 추적 관리할 수 있다.
- commit이란 partition 내부의 현재 위치를 변경하는 것.
- consumer가 offset을 commit하면 kafka는 내부적으로 __consumer_offsets라는 이름의 특별한 topic에 message로 쓴다.
- 해당 topic은 모든 consumer의 offset을 갖는다.
- consumer group의 모든 consumer들이 정상적으로 실행 중일 때는 offset을 commit해도 아무런 영향을 주지않음.
- 그러나 기존 consumer가 비정상적으로 종료되거나, 새로운 consumer가 consumer group에 추가된다면 offset commit은 rebalancing을 유발한다.
- rebalancing이 끝나면 각 consumer는 종전과 다른 partition들을 할당받게 될 수 있다.
- 만일 마지막으로 commit된 offset이 consumer가 가장 최근에 읽고 처리한 message의 offset보다 작으면,
- 마지막으로 commit된 offset과 최근에 읽고 처리된 offset 사이의 message들이 두 번 처리된다.
- 아래 그림을 보면 마지막으로 commit된 offset은 2번이다.
- 그리고 poll이 여러번 호출되었고, 그 중 마지막 호출된 poll의 마지막 offset은 11이다.
- 10번 offset의 message를 처리하다가 rebalancing이 발생하는 경우 2~10번 message들은 재처리된다.
- 만일 마지막으로 commit된 offset이 consumer가 가장 최근에 읽고 처리한 message의 offset보다 크면,
- 최근에 읽고 처리한 offset과 마지막으로 commit된 offset 사이의 message들이 consumer 그룹에서 누락된다.
- 아래 그림에서 마지막으로 commit된 offset은 11번이다.
- 마지막 poll의 offset도 11이다. 즉, poll하여 가져온 message들을 다 처리하지 않은 상태로 commit을 경우이다.
- 5번 offset을 처리하다가 rebalancing이 발생하는 경우 5~11번 message들은 누락된다.
commit
1. 자동 commit
- enable.auto.commit=true로 설정
- poll() 메서드에서 받은 offset 중 가장 큰 값을 5초마다 한 번씩 commit
- auto.commit.interval.ms를 통해 자동 commit 간격을 조절할 수 있다.
- polling loop 내에서 poll할 때마다 KafkaConsumer 객체가 commit할 시간이 되었는지 확인하며, 시간이 되었다면 마지막으로 호출된 poll() 메서드에서 반환된 offset을 commit한다.
2. 직접 commit
- enable.auto.commit=false로 설정
- commitSync() 메서드 호출
- poll() 메서드에서 반환된 마지막 offset을 commit한다.
3. 비동기 commit
- commitSync()의 단점은 broker가 commit 요청에 응답할 때까지 application이 일시 중지된다는 것.
- 따라서 처리량이 줄어든다.
- commit을 자주 하지 않게 할 수 있지만, rebalancing시 중복 처리 record 수가 늘어난다.
- commitAsync()를 통해서 응답을 기다리지 않고, commit 요청 후 처리를 계속할 수 있다.
4. 각 record를 처리한 후 commit하기(특정 offset을 commit)
- commitSync()나 commitAsync()는 항상 마지막으로 반환된 offset을 커밋하기 때문에 batch 처리도중 문제가 발생하면 rebalancing 후 중복처리 문제가 발생할 수 있다.
- 따라서 이를 해결하기 위해 commitSync()나 commitAsync() 메서드의 인자에 commit하고자하는 partition과 offset을 가지는 Map을 전달하여 특정 offset을 commit할 수 있다.
5. rebalancing listener
- record의 중복 처리를 막기 위해서는 consumer가 종료되기 전 또는 rebalancing이 시작되기 전에 consumer가 처리했던 마지막 message의 offset을 commit해야 한다.
- ConsumerRebalanceListener 인터페이스를 구현하는 객체를 subscribe() 메서드에 인자로 전달하여, partition이 추가되거나 제거될 때 해당 코드가 실행되게 할 수 있다.
테스트
1. auto.offset.reset 옵션 테스트
: commit된 offset이 없는 partition을 consumer가 읽기 시작할 때, 또는 commit된 offset이 있지만 유효하지 않을 때 cosumer가 어떤 record를 읽게 할 것인지 제어하는 옵션.
1) 3000개의 message를 test3 topic에 producing해 놓는다.
2) 해당 옵션을 설정하지 않은 상태로 consumer를 실행시켜 본다.(default는 latest)
- current-offset이 가장 최신 offset으로 찍혀 있다.
- 하지만 실제로는 하나도 소비하지 않았다.
- console에 topic 명을 출력하도록 했지만 아무것도 출력되지 않았다.
3) 해당 옵션을 earliest로 설정하고 consumer를 실행시켜 본다.
- topic명이 제대로 출력되는 것이 보인다.
4) 다시 1000개의 message를 producing한 후에 earliest와 latest로 놓고 테스트한다.
- 이미 해당 topic의 partition마다 해당 group의 offset이 기록되었기 때문에, 옵션에 상관없이 current-offset부터 소비하기 시작한다.
2. max.poll.records 옵션 테스트
1) 3000개의 message를 producing한 후 consume해본다.
- 최대 500개까지 cosume하는 것을 볼 수 있다.
- default가 500임을 알 수 있다.
2) max.poll.records 옵션을 1000으로 변경한 후 다시 consume해본다.
- 최대 1000개까지 consume하는 것을 볼 수 있다.
'Kafka' 카테고리의 다른 글
3. Producer (0) | 2021.04.07 |
---|---|
2. 간단한 사용 및 개념 정리 (0) | 2021.04.04 |
1. 설치 (0) | 2021.03.19 |