티스토리 뷰

*본 포스팅은 인프런 [데브원영]님의 아파치 카프카 프로그래밍 강의 세션 5를 수강하고 이를 참고해 작성했습니다.

 

[아파치 카프카 애플리케이션 프로그래밍] 개념부터 컨슈머, 프로듀서, 커넥트, 스트림즈까지! |

데브원영 DVWY | 실전 환경에서 사용하는 아파치 카프카 애플리케이션 프로그래밍 지식들을 모았습니다! 데이터 파이프라인을 구축하는데 핵심이 되는 아파치 카프카의 각종 기능들을 살펴보고

www.inflearn.com

 


프로듀서 애플리케이션 개발

0. 사용 환경

- IntelliJ IDEA 

- JAVA 11

- Gradle

 

1. build.gradle 파일 설정

plugins {
    id 'java'
}

group = 'org.example'
version = '1.0-SNAPSHOT'

repositories {
    mavenCentral()
}

dependencies {
    testImplementation platform('org.junit:junit-bom:5.9.1')
    testImplementation 'org.junit.jupiter:junit-jupiter'
    implementation 'org.apache.kafka:kafka-clients:2.5.0'
    implementation 'org.slf4j:slf4j-simple:1.7.30'
}

test {
    useJUnitPlatform()
}

 

프로듀서를 구현하기 위해 카프카 클라이언트 라이브러리를 build.gradle에 implementation 명령어를 사용해 추가한다. 

 

기본적인 프로듀서 애플리케이션

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public class SimpleProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {

        Properties configs = new Properties();

        // 필수값: bootstrap server, 메시지 키와 값 직렬화
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 프로듀서 인스턴스 생성
        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";

        // [1] 레코드 생성 ( 토픽 이름, 메시지 값 )
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);

        // [2] 레코드 전송
        producer.send(record);
        logger.info("{}", record);
        producer.flush(); // accumulator 데이터 강제 전송

        // [3] 프로듀서 종료
        producer.close();

    }

}

- StringSerializer로 직렬화

- KafkaProducer를 사용해 프로듀서 인스턴스를 생성한다. 

- "testMessage"라는 메시지 값을 전송한다. 

- producer.flush()를 사용해 데이터를 전송하고, close()를 통해 종료한다. 

 

다음과 같이 코드를 작성하고 코드를 실행하면 아래와 같이 Build Successful 로그가 뜨고, acks, batch.size 등 프로듀서 정보가 나온다.

22:52:18: Executing ':SimpleProducer.main()'...

> Task :compileJava
> Task :processResources NO-SOURCE
> Task :classes

> Task :SimpleProducer.main()

Deprecated Gradle features were used in this build, making it incompatible with Gradle 9.0.

You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins.

For more on this, please refer to https://docs.gradle.org/8.4/userguide/command_line_interface.html#sec:command_line_warnings in the Gradle documentation.

BUILD SUCCESSFUL in 2s
2 actionable tasks: 2 executed
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [my-kafka:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metadata.max.idle.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	security.providers = null
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLSv1.2
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1716299540586
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE}
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: -HgV9-voQBK2Qh6Phk-G0w
[main] INFO SimpleProducer - ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=testMessage, timestamp=null)
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

 

이후 콘솔 컨슈머를 실행시켜 토픽에 저장된 값을 확인해 보면, 

 

우측에 콘솔에서 우리가 보낸 메시지가 실시간으로 뜨는 것을 확인할 수 있다.

이제 기본적인 프로듀서가 아닌, 다양한 커스텀 기능을 사용해 프로듀서를 생성해 보자.

 

메시지 키를 가진 프로듀서 애플리케이션

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class ProducerWithKeyValue {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo");
        producer.send(record);
        ProducerRecord<String, String> record2 = new ProducerRecord<>(TOPIC_NAME, "Busan", "Busan");
        producer.send(record2);
        producer.flush();
        producer.close();
    }
}

 

메시지 키와 값을 Pangyo, Busan으로 갖는 두 개의 레코드를 모두 전송해 보자. 

 

프로듀서를 실행하면 다음과 같이 콘솔에 레코드가 잘 저장되고 그 값을 콘솔 컨슈머로 확인할 수 있다. 

--property print.key=true를 사용해 키와 밸류를 모두 출력해 볼 수 있다.


파티션 번호를 지정한 프로듀서 애플리케이션

        int partitionNo = 0;
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, partitionNo, "Pangyo", "Pangyo");
        producer.send(record);

파티션을 직접 지정하고 싶다면 토픽 이름, 파티션 번호, 메시지 키, 메시지 값을 순서대로 파라미터로 넣고 생성하면 된다. 

이때 파티션 번호는 토픽에 존재하는 번호로 설정해야 한다. 


커스텀 파티셔너 프로듀서 애플리케이션

프로듀서 사용 환경에 따라 특정 데이터를 가지는 레코드를 특정 파티션으로 보내야 할 때가 있다. 기본 설정 파티셔너를 사용할 경우 메시지 키의 해시값을 파티션에 매칭해 데이터를 전송하므로 어느 파티션에 들어가는지 알 수 없다. 이때 Partitioner 인터페이스를 사용해 사용자 정의 파티셔너를 생성하면 사용자 마음대로 메시지 키와 파티션을 매칭할 수 있다.

 

CustomPartitioner

public class CustomPartitioner  implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                         Cluster cluster) {

        if (keyBytes == null) {
            throw new InvalidRecordException("Need message key");
        }
        if (((String)key).equals("Pangyo"))
            return 0;

        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }


    @Override
    public void configure(Map<String, ?> configs) {}

    @Override
    public void close() {}
}

다음 커스텀 파티셔너는 키가 '판교'일 경우에는 0번 파티션으로, 이외에는 해시값으로 매칭하도록 커스텀한 사용자 정의 파티셔너이다.

또한 메시지 키가 없는 경우 Exception을 던지는 예외처리도 구현되어 있다. 

 

ProducerWithCustomPartitioner

public class ProducerWithCustomPartitioner {
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo");
        producer.send(record);
        producer.flush();
        producer.close();
    }
}

레코드 전송 결과를 확인하는 프로듀서 애플리케이션

KafkaProducer의 send() 메서드는 Future객체를 반환한다. 이 객체는 RecordMetadata의 비동기 결과를 표현하는 것으로 ProducerRecord가 카프카 브로커에 정상적으로 적재되었는지에 대한 데이터가 포함되어 있다. 

 

아래와 같이 get() 메서드를 사용하면 프로듀서로 보낸 데이터의 결과를 동기적으로 가져올 수 있다. 

	// configs.put(ProducerConfig.ACKS_CONFIG, "0");

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "Pangyo");
        
        try {
            RecordMetadata metadata = producer.send(record).get();
            logger.info(metadata.toString());
            
        } catch (Exception e) {
            logger.error(e.getMessage(),e);
            
        } finally {
            producer.flush();
            producer.close();
            
        }

 

우선 ACK는 기본값인 1일 때 그 결과를 확인해 보자. 

 

결과 로그 중 드래그되어 있는 줄을 확인해 보면, 다음과 같이 적혀있다. 

[main] INFO ProducerWithSyncCallback - test-0@6

 

여기서 'test-0@6'이란, 0번 파티션의 6번 오프셋에 레코드가 저장되어 있다는 응답을 받았다는 의미이다.

 

그러면 결과를 응답받지 않는 'ack 0' 옵션을 설정해서 레코드를 전송해 보면 어떻게 다를까?

이번에는 'test-0@-1 ' 이렇게 출력된 것을 볼 수 있다. 이때 오프셋은 0부터 시작하므로 -1이라는 값은 존재할 수 없다.

즉, 응답을 받지 않았다는 것을 의미한다.


프로듀서 애플리케이션의 안전한 종료

프로듀서를 안전하게 종료하기 위해서는 close() 메서드를 사용하여 accumulator에 저장되어 있는 모든 데이터를 카프카 클러스터로 전송해야 한다.

producer.flush();
producer.close();

 


OX 퀴즈

1) 프로듀서에서 데이터를 전송할 때 반드시 배치로 묶어서 전송한다.

2) 메시지 키를 지정하지 않으면 RoundRobinPartitioner로 파티셔너가 지정된다.

3) ISR은 복제 개수가 2 이상일 경우에만 존재한다.

4) acks가 all(-1)인 경우 데이터 전송 속도가 가장 빠르다.

5) min.insync.replicas=3, 복제 개수가 2일 때 가장 신뢰도 높게 데이터를 전송할 수 있다. 

 


OX 퀴즈 해설

1) X

용량이 크거나 배치 사이즈가 작을 때는 굳이 배치로 묶지 않는다.

 

2) X

메시지 키 지정 여부와 상관없이 파티셔너 옵션 값에 따라 달라진다. 또한 기본값은 RoundRobin이 아닌 UniformSticky파티셔너다.

 

3) X

ISR은 복제 개수가 1 즉 리더 파티션만 있는 경우에도 존재한다. 

 

4) X

acks = 0일 때가 가장 빠르다. -1일 때는 가장 느림

 

5) X

전제 조건이 잘못 됐다. min.insync.replicas=3 이면 리더 1개, 팔로워 2개인데, 복제 개수가 2면 리더 1, 팔로워 1 임. 

공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/01   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함