티스토리 뷰
[Apache Kafka] 카프카 프로듀서와 컨슈머 직접 구현하기 | 대용량 처리 및 반복적인 데이터 파이프라인 생성 | Streams | Connect
YouJungJang 2024. 4. 18. 17:41*본 포스팅은 인프런 [데브원영]님의 아파치 카프카 for beginners 강의 세션 1,2를 수강하고 이를 참고해 작성했습니다.
Kafka Producer Application
- 카프카 프로듀서는 토픽에 데이터를 퍼블리시 생성하는 역할을 한다.
- 카프카 브로커로 데이터 전송 시 전송 성공 여부를 알 수 있고, 실패 시 재시도도 가능하다.
Kafka Producer Application 코드 예제
🔹 라이브러리 추가
- 카프카의 프로듀서와 컨슈머를 사용하려면 아파치 카프카 라이브러리를 추가해야 한다.
- 라이브러리는 Gradle, Maven을 사용해서 가져올 수 있다.
- 이때, 브로커 버전과 클라이언트 버전의 하위 호환성이 완벽하게 지원되지 않으므로 서로 지원되는 버전을 맞춰서 사용해야 한다.
🔹카프카 프로듀서 작성 코드
[1] Properties 설정
- 부트스트랩 서버 설정을 로컬 호스트의 카프카를 바라보도록 설정한다.
- 카프카 브로커의 주소 목록은 되도록 두 개 이상의 IP와 Port를 설정하도록 권장한다: 둘 중 한 개가 비정상일 경우 다들 정상적인 브로커에 연결돼 사용 가능하기 때문!
- Key, Value는 String Serializer로 직렬화 설정한다. 여기서 Key는 메시지를 보내면 토픽의 파티션이 지정될 때 사용된다.
[2] 프로듀서 인스턴스 생성
- 설정한 프로퍼티를 사용해서 카프카 프로듀서 인스턴스를 생성한다.
[3] 전송 객체 생성
- ProducerRecord: 인스턴스 생성할 때 어떤 토픽에 넣을 것인지, 어떤 key, value에 담을 건지 설정 가능하다.
- 현재 예시에는 토픽으로 "click_log", value로 "login"을 보낸다.
- 파라미터 개수에 따라 자동으로 오버로딩 되어 인스턴스가 생성되므로 Key를 담고 싶다면 파라미터를 세 개로 지정하면 된다.
[4] 전송과 종료
- 데이터 전송 send 하면 click_log 토픽에 login 밸류가 들어간다.
- 전송 완료되면 close()하여 프로듀서를 종료한다.
key 유무에 따른 send() 동작 차이
[1] key가 null인 경우
- 두 파티션에 Round Robin으로 두 개의 파티션에 쌓인다.
[2] key가 존재하는 경우
- 각 key는 각 해시값으로 변환되어 대응되는 파티션에 일대일 매칭되어 들어간다.
- 각 파티션에 동일 키의 동일 value만 쌓이게 되는 것!
+ 이때 파티션을 추후에 하나 더 생성하는 경우
- 새로운 파티션에 추가되는 순간 키와 파티션의 매칭이 깨지므로 둘의 연결 매칭이 보장되지 않는다.
- 그러므로 키를 사용하는 경우 이 점에 유의해서 초반 파티션 개수를 설정해야 한다. 추후에 생성하지 않는 것을 추천!
Kafka Consumer Application
- 다른 메시징 시스템과 달리 컨슈머가 데이터를 가져가도 데이터 큐에서 데이터가 사라지지 않는다 -> 이는 카프카 컨슈머를 데이터 파이프라인으로 운영하는 것의 핵심!
- 카프카 컨슈머는 기본적으로 토픽의 데이터를 가져온다. 데이터는 토픽 내부의 파티션에 저장되는데 컨슈머는 이렇게 파티션에 저장된 데이터를 가져온다 = 폴링 Polling
✅ 컨슈머의 주요 특징 세 가지
1. Polling: 컨슈머의 역할은 토픽 내부의 파티션에서 메시지 가져오기: 메시지를 가져와서 특정 DB에 저장하거나 또 다른 파이프라인에 전달할 수 있다.
2. Partition Offset(= 파티션에 있는 데이터 번호) 위치를 커밋할 수 있다.
3. Consumer Group : 컨파티션 개수에 따라 컨슈머 여러 개를 만들면 병렬 처리가 가능하다 -> 더 빠른 속도
Kafka Producer Application 코드 예제
🔹Kafka 라이브러리 추가
- 프로듀서와 마찬가지로 그래들, 메이븐 도구를 사용해 라이브러리를 추가한다.
-이전에도 언급했듯이 카프카 브로커와의 버전 차이로 인해 정상 작동이 안 될 수 있으니 호환 가능한 버전인지 확인한다.
🔹카프카 consumer 작성 코드
[1] properties 설정
- Bootstrap.servers 옵션으로 카프카 브로커를 설정한다. 브로커 중 하나에 이슈가 생기면 다른 브로커가 붙을 수 있도록 여러 브로커를 설정한다.
- 그룹 아이디를 지정한다.
- producer에서 설정한 바와 같이 key, value 직렬화 설정을 추가한다.
[2] Consumer 인스턴스 생성
- KafkaConsumer 클래스를 사용해서 이전에 선언한 properties를 매개변수로 컨슈머 인스턴스를 생성한다. 이를 통해 데이터를 읽고 처리하는 것이 가능하다.
[3] Subscribe: 토픽 지정
- subscribe를 사용해서 어느 토픽을 대상으로 데이터를 가져올지 선언한다.
- 만약 특정 토픽의 전체 파티션이 아닌 일부 파티션에서 가져오고 싶다면 assign 메서드를 사용한다: 키가 존재한다면 이 방식으로 데이터 순서를 보장하는 처리가 가능하다.
[4] Polling: 데이터를 가져오자
- Poll 매서드가 포함된 무한 루프: 핵심은 브로커로부터 연속적으로 그리고 컨슈머가 허락하는 한 많은 데이터를 Poll을 사용해서 읽어온다.
- poll에 매개변수로 숫자를 넣을 수 있는데 이는 데이터를 기다리는 시간을 설정하는 것이다. 500 = 500 밀리초
- 예) 0.5초 동안 데이터가 안 들어오면 빈 값의 레코드 변수를 반환한다. / 있다면 데이터가 들어있는 레코드 값을 반환한다.
- Records 변수는 데이터 배치로서 레코드의 묶음 리스트이다 -> 실제로 카프카에서 데이터를 처리할 때 가장 작은 단위인 레코드로 나누어 처리한다.
- 레코드 변수를 for 루프로 반복 처리하면서 실질적으로 처리하는 데이터를 가져오게 한다.
실제로 데이터가 컨슈머에게 전달되는 과정
1. 프로듀서가 키를 null로 설정하고 데이터를 전송한다
- 라운드 로빈으로 파티션에 골고루 데이터가 쌓임
2. 컨슈머가 데이터를 가져갈 때마다 오프셋 정보가 갱신된다.
- 각 데이터는 파티션 내에서 고유 번호인 오프셋 Offset을 가진다.
- 오프셋은 토픽별, 파티션별로 별개로 지정된다.
- 이 오프셋을 사용해서 컨슈머가 어디까지 읽었는지 확인할 수 있는 것!
- 컨슈머가 데이터를 읽기 시작하면 offset을 commit 하게 되는데 이렇게 가져간 내용에 대한 정보는 카프카의 __consumer_offsets 토픽에 저장된다.
+ 만약, 불의의 사고로 consumer가 중지됐다면?
각 파티션별로 읽은 데이터의 오프셋이 저장되어 있으므로 컨슈머가 재실행하여 중지 시점부터 복구해 데이터 처리가 가능하다.
즉 컨슈머 이슈가 발생해도 데이터 처리 시점을 복구할 수 있는 고가용성의 특징을 구현한다!
컨슈머는 몇 개가 적당할까?
- 만약 컨슈머가 파티션 개수보다 적거나 같을 경우 각 컨슈머에 각 파티션을 할당해서 데이터 처리를 할 수 있다.
- 하지만 파티션보다 컨슈머의 개수가 많으면 더 이상 할당할 수 있는 파티션이 없기 때문에 동작하지 않는다 -> 병렬처리를 하고 싶다면 컨슈머 개수는 파티션보다 적거나 같아야 한다.
컨슈머 그룹이 다른 컨슈머들의 동작
- 서로 다른 그룹은 서로 영향을 주지 않는다.
예시) Elastic Search에 데이터를 저장하는 consumer group1과 Hadoop에 저장하는 consumer group2가 있다고 하자.
만약 elastic에 저장하는 그룹이 각 파티션의 특정 오프셋을 읽고 있어도 hadoop 컨슈머 그룹에게 영향을 주지 않는다.
-> 컨슈머 오프셋 토픽은 그룹별, 토픽별로 오프셋을 나누어 저장하기 때문!
이런 특징을 토대로 하나의 토픽에 들어온 데이터는 다양한 역할을 하는 여러 consumer들에게 각자 원하는 데이터로 처리될 수 있다.
Kafka Streams Application
- 카프카 스트림즈: 컨슈머를 사용해서 데이터를 처리하는 것보다 더 안전하고 빠르고 다양한 기술을 사용할 수 있다.
- 카프카가 공식적으로 제공하는 자바 라이브러리
- 토픽의 데이터를 낮은 지연과 빠른 속도로 처리가 가능하다.
- 스트림즈는 라이브러리로 제공되므로 자바, 스칼라, 코틀린 같은 JVM 기반 언어 중 하나로 개발해야 한다.
- Springboot, Java app에 라이브러리를 추가해서 동작하게 배포하는 것도 가능하다.
✅ Kafka Streams 장점 4가지
1. 카프카와 완벽하게 호환된다.
- 대부분의 기업에서 카프카를 이벤트 저장소로 사용하고 저장된 데이터를 스파크, 로그스태시 같은 외부 툴로 연동하는데, 외부 오픈소스 툴의 문제는 빠르게 발전하는 오픈소스 카프카 버전을 따라오지 못한다는 점이다.
- 반면에 스트림즈는 매번 카프카가 릴리즈 될 때마다 카프카 클러스터와 완벽 호환되면서 최신 기능을 가지고 있다. 즉 성능 개선이 빠르다.
- 또한 카프카 보안 기능이나 ACL이 붙어있어도 완벽하게 호환되어 처리가 가능하다.
-유실, 중복 처리 없는 강력한 기능은 카프카 연동 이벤트 프로세싱 도구 중 거의 유일하다.
2. 스케줄링 도구가 필요 없다.
- 기존: 스카프 스트리밍, 스파크 구조적 스트림 사용 시 카프카와 연동해서 마이크로 배치 처리를 하는 이벤트 데이터 앱을 만들 수 있는데, 문제는 스파크 운영을 위해 yarn, mesos와 같이 클러스터 관리자, 리소스 매니저 같은 것이 필요하다 + 클러스터 운영을 위한 대규모 장비 구축도 필요함
- Streams: 스케줄링 도구가 전혀 필요 없다! 내가 원하는 만큼 스트림즈 앱을 띄워서 배포하면 된다.
3. 스트림즈 DSL과 프로세서 API를 제공한다.
스트림즈를 구현하는 방법 두 가지 : 스트림즈 DSL, 프로세서 API
- 스트림즈 DSL: 이벤트 기반 데이터를 처리할 때 필요한 다양한 기능들(map, join, window 같은 메서드들)을 제공하므로 사용하기 편리하다. 거의 대부분의 기능을 제공하는데 특히 KStream, Ktable, GlocalKTable은 새로운 독특한 스트림 처리 개념이다.
- 프로세서 API: DSL에 없는 기능들은 프로세서 API에 직접 로직 작성해서 사용하면 된다.
4. 로컬 상태 저장소로 사용한다.
실시간으로 데이터를 처리하는 방식 두 가지: 비상태 기반 처리 Stateless, 상태 기반 처리 Stateful
- 비상태 기반 처리 Stateless: 필터링이나 데이터를 변환하는 처리를 한다. 데이터가 들어오는 족족 바로 처리하고 프로듀스 하면 되므로 유실이나 중복 발생 염려가 적고 쉽게 개발할 수 있다.
- 상태 기반 처리 Stateful: 직접 구현하려면 무척 어려운데 이런 어려운 처리를 돕는 것이 스트림즈이다 -> 로컬에 rocksdb를 사용해서 상태를 저장하고, 상태 변환 정보는 카프카의 변경 로그 토픽에 저장한다. 프로세스 장애가 발생해도 그 상태가 모두 안전하게 저장되므로 자연스럽게 장애 복구가 될 수 있다.
Kafka Connect
카프카에서 공식적으로 제공하는 컴포넌트로 반복적인 데이터 파이프라인을 효과적으로 배포하고 관리하는 방법이다.
- 커넥트(Connect): 커넥터를 동작하도록 실행해 주는 프로세스. 파이프라인으로 동작하는 커넥터를 동작하기 위해선 반드시 커넥트를 실행시켜야 한다.
- 커넥터(Connector): 실질적으로 데이터를 처리하는 코드가 담긴 jar 패키지. 커넥터는 일련의 템플릿 같은 특정 동작을 하는 코드 뭉치다. 커넥터 안에는 파이프라인에 필요한 동작들과 설정, 실행 메서드가 포함돼 있다.
커넥터(Connector)의 종류
- 싱크 커넥터(Sink Connector): 특정 토픽에 있는 데이터를 오라클, 즉 특정 저장소에 저장하는 컨슈머 같은 역할 / 토픽의 데이터를 오라클 특정 테이블에 넣고 싶다면 싱크 커넥터 글자 앞에 어떤 DB인지 명시하면 된다. 예) OracleSinkConnector
- 소스 커넥터(Source Connector): DB로부터 데이터를 가져와서 토픽에 넣는 프로듀서 역할.
커넥트(Connect)의 종류
- 단일 실행 모드 커넥트: 간단한 데이터 파이프라인을 구성하거나 개발용으로 주로 사용한다.
- 분산모드 커넥트: 여러 개의 프로세스를 하나의 클러스터로 묶어서 운영하는 방식으로 2개 이상의 커넥트가 하나의 클러스터로 묶인다. 일부 커넥트에 장애가 발생하더라도 파이프라인을 자연스럽게 failover 해서 나머지 실행 중인 커넥트에서 데이터를 지속적으로 처리할 수 있게 도와준다.
커넥트에서 커넥터를 실행하는 방법
- 커넥트를 실행할 때 커넥터가 어디에 위치하는지 컨피그 파일에 위치를 지정해야 한다. 커넥터 jar 패키지가 있는 디렉터리를 config 파일에 지정한다.
- 커넥트를 실행하면 jar 파일이 커넥터를 함께 모아서 실행할 수 있도록 준비 상태에 돌입한다.
- 실행 중인 커넥트에서 Rest API를 사용해 커넥터를 실행할 수 있다.
만약, 커넥트를 사용하지 않으면 파이프라인 반복 생성에 시간이 많이 소요된다. 개발하고 배포하고 모니터링을 구축하는 일련의 과정들이 커넥트에서는 템플릿 형태로 커넥터를 개발하고 Rest API로 파이프라인을 반복 생성하여 효율이 뛰어나다.
그러므로 여러 개의 파이프라인이 필요하다면 컨슈머를 여러 개 만드는 것보다 Connect를 구축해서 반복적으로 커넥터들을 실행하는 방식으로 진행하는 것이 좋다.
카프카의 미래
- 카프카는 링크드인에서 개발되어 2011년에 오픈소스로 공개되었다. 기존 데이터 기반 분석 툴들은 각각의 특징이 뚜렷해서 데이터 처리에 있어서 파편화가 심각했다. 이는 유지보수의 어려움으로 직결되고 비즈니스에 막대한 영향을 끼친다.
- 카프카는 데이터 처리를 각각 여러 앱에서 처리하는 것이 아닌 중앙의 한 곳에서 처리할 수 있도록 중앙 집중화한 것으로 한 곳에서 실시간 관리가 가능해졌다.
- 기업의 대용량 데이터를 수집하고 이를 사용자들이 실시간 스트림으로 소비할 수 있게 중추 신경으로 작동한다.
카프카의 중요한 세 가지 특징
- High throughput message capacity: 짧은 시간 내에 엄청난 양의 데이터를 컨슈머까지 전달전달할 수 있다. 파티션을 통한 분산처리가 가능하므로 데이터 양이 많아질수록 컨슈머컨슈머 개수를 늘려서 병렬처리가 가능하다. 즉 데이터 처리를 빠르게 할 수 있다.
- Scalability Fault Tolerant: 확장성이 뛰어나다. 이미 사용되고 있는 카프카 브로커가 있더라도 신규 브로커 서버를 추가해서 수평 확장이 가능하다. 늘어난 브로커 중 몇 대가 죽어도 레플리카로 복제된 데이터는 안전하게 보관되어 있으므로 복구하여 처리 가능.
- Undelelted Log: 컨슈머가 데이터를 가지고 가더라도 데이터가 사라지지 않는다. 카프카에서는 컨슈머 그룹아이디가 다르면 동일한 데이터도 각각 다른 형태로 처리할 수 있음