카프카 클라이언트
카프카 클라이언트 라이브러리를 사용하면 프로듀서 컨슈머 어드민 클라이언트로 카프카 클러스터에 명령을 내리거나 데이터를 송수신할 수 있다.
프로듀서 API
프로듀서 어플리케이션은 카프카에 필요한 데이터를 선언하고 브로커의 특정 토픽의 파티션에 직접 전송(직접 통신)한다.
프로듀서는 데이터를 직렬화하여 카프카 브로커로 보내기 때문에 자바에서 선언 가능한 모든 형태를 브로커로 전송할 수 있다.
💡 여기서 직렬화란 자바 또는 외부 시스템에서 사용 가능하도록 바이트 형태로 데이터를 변환하는 기술을 말한다.
인텔리제이에서 개발하기 위해선 일단
카프카 클라이언트 라이브러리를 gradle에 추가해야한다.
plugins {
id 'org.springframework.boot' version '2.7.3'
id 'io.spring.dependency-management' version '1.0.13.RELEASE'
id 'java'
}
group = 'com.example'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '17'
configurations {
compileOnly {
extendsFrom annotationProcessor
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.apache.kafka:kafka-streams'
implementation 'org.springframework.kafka:spring-kafka'
compileOnly 'org.projectlombok:lombok'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'
implementation 'org.apache.kafka:kafka-clients'
implementation 'org.slf4j:slf4j-simple'
}
tasks.named('test') {
useJUnitPlatform()
}
package com.example.kafkastudy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties configs = new Properties();
//카프카 클러스터의 서버 host와 ip 설정
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
//메시지 키 , 메시지 값 직렬화하기 위한 클래스 설정
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//프로듀서 인스턴스 생성
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String message = "TestFirst";
//생성자에 들어가는 변수는 두가지 말고 더 존재. 토픽 이름, 메시지키, 메시지 값 여기선 메시지 키를 설정하지 않았으므로 default null
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME,message);
//send한다고 즉각적으로 보내진 않음. 프로듀서 내부에 있다가 배치 형태로 전송된다.
producer.send(record);
logger.info("{}", record);
//프로듀서 내부 버퍼에 있는 레코드 배치를 브로커로 전송
producer.flush();
producer.close();
}
}
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 3
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
먼저 주키퍼랑 카프카 서버를 킨 후 토픽을 생성해주고 위 자바 코드를 돌린다음 consuming을 해주면 message가 잘 출력되는걸 확인할 수 있다.
ProducerRecord 클래스는 토픽이나 메시지 값 이외에 추가 파라미터를 오버로딩해서 메시지 키, 파티션 번호 지정, 타임스탬프 설정을 추가적으로 할 수 있다.
위에서 KafkaProducer가 send 메소드를 호출하면 record가 topic의 어느 파티션으로 전송될 것인지 정해진다. 파티셔너를 따로 설정하지 않았다면 DefaultPartitioner로 설정되어 파티션이 결정된다. 이렇게 구분된 record는 전송전에 accumulator에 데이터를 버포로 쌓아놓고 발송한다.(프로듀서 내부에서 일어남.)
💡 파티셔너에는 UniformStickyPartitioner와 RoundRobinPartitioner 두개가 존재. default는 스티키 파티셔너! 둘다 메시지 키가 있을 때 메시지 키의 해시값과 파티션을 매칭하여 데이터를 전송함. UniformStickyPartitioner는 프로듀서 동작에 특화되어 높은 처리량과 낮은 리소스 사용률을 가지고 있으며 어큐뮬레이터에서 데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터를 모두 동일한 파티션에 전송한다.
필수 옵션
- bootstrap.servers: 전송할 카프카 클러스터에 속한 브로커의 호스트 이름
- key.serializer: 레코드의 메시지 키 직렬화하는 클래스를 지정
- value.serializer: 레코드의 메시지 값을 직렬화하는 클래스를 지정
선택 옵션
- acks: 데이터가 브로커들에 정상적으로 저장되었는지 성공 여부. 0, 1, -1(all)이 있고 디폴트는 -1
- buffer.memory: 버퍼 메모리의 양 지정. 기본값은 33554432
- retries: 프로듀서가 브로커로부터 에러를 받고 재전송을 시도하는 횟수
- batch.size: 배치로 전송할 레코드의 최대 용량
- linger.ms: 배치를 전송하기 전까지의 기다리는 최소 시간 기본값은 0
- partitioner.class: 파티셔너 클래스 지정
- enable.idempotence: 멱등성 프로듀서 동작 여부
- transactional.id: 전송 시 레코드를 트랜잭션 단위로 묶을지 여부
linger.ms 시간이 배치 버퍼에 쌓이기 까지의 시간보다 길면 배치 버퍼에 있는 메모리를 시간이 남아있어도 무시하고 플러쉬한다.
메시지 키를 가진 데이터 전송
ProducerRecord<String, String> record = new ProducerRecord<>(*TOPIC_NAME*,"Key_test",message);
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --property print.key=true --property key.separator="-" --from-beginning
파티션 지정해서 데이터 전송
int partition = 0;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME,partition,"Key_test",message+partition);
커스텀파티셔너 만들기
package com.example.kafkastudy.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (keyBytes == null) {
throw new InvalidRecordException("need message key");
}
if (((String) key).equals("Key_test")) {
return 0;
}
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int size = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % size;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
configs.put(ProducerConfig.*PARTITIONER_CLASS_CONFIG*, CustomPartitioner.class);
브로커 정상 전송 여부 확인
KafkaProducer의 send 메소드는 Future 객체를 반환하는데 이 객체는 RecordMetadata의 결과를 표현한다. 여기에는 카프카 브로커에 정상적으로 record가 적재되었는지에 대한 데이터도 포함되어있다.
RecordMetadata mt = null;
try {
mt = producer.send(record).get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
logger.info("{}",mt.toString());
위에 방법이 동기로 전송 결과를 반환받는 방법이었다면 아래는 비동기 방식으로 받는 방법이다.
package com.example.kafkastudy.producer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerCallback implements Callback {
private final static Loggerlogger= LoggerFactory.getLogger(ProducerCallback.class);
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
logger.error(exception.getMessage(), exception);
} else {
logger.info(metadata.toString());
}
}
}
producer.send(record,new ProducerCallback());
비동기로 받으면 더 빨리 처리할 수 있지만 전송하는 데이터의 순서가 중요한 경우에는 사용하면 안된다.
'Infra > Apache Kafka' 카테고리의 다른 글
Kafka consumer API (0) | 2022.09.18 |
---|---|
kafka의 기본 개념 정리 (2) | 2022.09.05 |
Kafka mac에서 설치 및 테스트 (0) | 2022.08.27 |
Kafka의 등장 배경 및 특징 (0) | 2022.08.26 |