반응형
토픽 생성
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --create \
> --topic test \
> --partitions 3
Create topic test.
SimpleProducer 코드 작성
메시지 키 없이 전송
public class SimpleProducer {
private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
// 전송하기 위한 토픽 이름 설정
private final static String TOPIC_NAME = "test";
// 전송하고자하는 카프카 클러스터의 서버의 host, ip
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
// KafkaProducer 인스턴스를 생성하기 위한 옵션들을 key, value 값으로 선언
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
String messageValue = "testMessage";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
출력화면
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [my-kafka:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
... 생략
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1671368185049
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: FbiHTu4QQcGMFEt3Xc4ZpQ
[main] INFO com.example.simplekafkaproducer.SimpleProducer - ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=testMessage, timestamp=null)
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
카프카 프로듀서 구동 시 설정한 옵션, 카프카 클라이언트 버전, 전송한 ProducerRecord 등이 출력된다.
ProducerRecord 인스턴스 생성 시 메시지 키를 설정하지 않았기에 null로 설정된 것을 확인할 수 있다.
컨슈머를 통해 전송된 데이터 확인
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic test \
> --from-beginning
testMessage
메시지 키를 가진 데이터 전송
public class ProducerWithKeyValue {
private final static Logger logger = LoggerFactory.getLogger(ProducerWithKeyValue.class);
private final static String TOPIC_NAME = "test";
private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");
producer.send(record);
logger.info("{}", record);
producer.flush();
producer.close();
}
}
출력화면
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic test \
--property print.key=true \
> --property key.separator="-" \
> --from-beginning
null-testMessage
Pangyo-23
반응형
'Programming > Apache Kafka' 카테고리의 다른 글
스트림즈DSL로 간단하게 데이터 주고받기 - 카프카스트림즈(KafkaStreams) with stream-filter (0) | 2023.01.17 |
---|---|
[Kafka 연결 오류] broker id, cluster id가 달라 연결에 실패하는 경우 (0) | 2022.12.19 |
아파치 카프카 실습 (Apache Kafka) / 데이터 주고 받기 (0) | 2022.12.18 |
아파치 카프카 실습 (Apache Kafka) / 토픽 생성 및 조회 (0) | 2022.12.18 |
아파치 카프카 실습 (Apache Kafka) / EC2 서버 구축, 카프카, 주키퍼 설치하기 (0) | 2022.09.01 |