컨슈머 API
package com.example.kafkastudy.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
private final static Loggerlogger= LoggerFactory.getLogger(SimpleConsumer.class);
private final static StringTOPIC_NAME= "test";
private final static StringBOOTSTRAP_SERVERS= "localhost:9092";
private final static StringGROUP_ID= "testGroup";
public static void main(String[] args) {
Properties configs = new Properties();
//카프카 클러스터의 서버 host와 ip 설정
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS);
//메시지 키 , 메시지 값 직렬화하기 위한 클래스 설정
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
configs.put(ConsumerConfig.GROUP_ID_CONFIG,GROUP_ID);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
//Collection 타입의 String 값들을 받음.
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
}
}
}
}
이제 이 파일을 실행시켜주고 console 창을 띄워서 컨슈머가 잘 데이터를 받아가는지 확인하면 된다.
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
>testMs
>^C%
토픽의 파티션으로부터 컨슈머가 데이터를 가져가는 방법은 크게 2가지로 나뉜다.
첫번째는 1개 이상의 컨슈머로 이루어진 컨슈머 그룹을 운영하는 것이고 나머지 하나는 토픽의 특정 파티션만 구독하는 컨슈머를 운영하는 것이다.
컨슈머를 그룹으로 운영하는 방식은 다른 컨슈머 그룹과 독립된 환경을 제공하기 때문에 서로 영향을 받지 않을 수 있고 1개의 파티션의 최대 1개의 컨슈머에 할당이 가능하다.(즉, 1개의 컨슈머가 여러개의 파티션엘 할당될 수 있다)
그렇다면 컨슈머 그룹 내에서 장애가 발생하면 어떻게 될까?
한 컨슈머가 장애가 발생하게 된다며 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어가게 된다. 이 과정을 리밸런싱이라고 하고, 리밸런싱은 이렇게 컨슈머를 빼는 작업이나 새로운 컨슈머가 추가될 때 발생한다. 컨슈머를 빼고 넣는 작업은 그룹 내에 있는 코디네이터가 담당한다.
이러한 기능 덕분에 이슈가 발생해도 컨슈머 그룹은 지속적으로 데이터를 처리할 수 있다.(가용성을 높임)
하지만 이런 리밸런싱도 자주 일어나면 안되는데 그 이유는 파티션의 소유권이 넘어가는 동안 해당 컨슈머 그룹은 토픽의 데이터들을 읽어올 수 없기 때문이다.
필수 옵션
- bootstrap.servers
- key.deserializer
- value.deserializer
선택 옵션
- group.id
- auto.offset.reset
- enable.auto.commit
- auto.commit.interval.ms
- max.poll.records
- session.timeout.ms
- hearbeat.interval.ms
- max.poll.interval.ms
- isolation.level
동기 오프셋 커밋
configs.put(ConsumerConfig.*ENABLE_AUTO_COMMIT_CONFIG* , false);
consumer.commitSync(); → while 문 안에 poll다하고 마지막 줄에 추가
commitSync는 poll메소드로 받은 가장 마지막 레코드의 오프셋을 기준으로 커밋
레코드 단위로 동기 오프셋 커밋을 할 경우
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currOffset = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
logger.info("{}",record);
currOffset.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()+1,null)
);
consumer.commitSync(currOffset);
}
}
비동기 오프셋 커밋
poll마지막 줄에 consumer.commitAsync(); 을 추가만 하면 된다.
파라미터로 callback 함수를 넣어서 응답의 결과를 얻을 수도 있다.
# consumer 안전한 종료하기
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> currOffset = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
logger.info("{}", record);
currOffset.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, null)
);
consumer.commitSync(currOffset);
}
// consumer.commitAsync(new OffsetCommitCallback() {
// @Override
// public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
// if (exception != null) {
// System.err.println("commit failed");
// } else {
// System.out.println("commit succeed");
// }
// if (exception != null) {
// logger.error("commit failed for offsets {}", offsets, exception);
// }
// }
// });
// consumer.commitSync();
}
} catch (WakeupException e) {
logger.warn("wakeup consumer");
} finally {
consumer.close();
}
}
'Infra > Apache Kafka' 카테고리의 다른 글
kafka producer API (0) | 2022.09.18 |
---|---|
kafka의 기본 개념 정리 (2) | 2022.09.05 |
Kafka mac에서 설치 및 테스트 (0) | 2022.08.27 |
Kafka의 등장 배경 및 특징 (0) | 2022.08.26 |