본문 바로가기

Programming/Apache Kafka

아파치 카프카 실습 (Apache Kafka) / JAVA로 프로듀서 생성 및 데이터 보내기

반응형

토픽 생성

bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --create \
> --topic test \
> --partitions 3
Create topic test.

 

SimpleProducer 코드 작성

메시지 키 없이 전송

public class SimpleProducer {
    private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class);
    // 전송하기 위한 토픽 이름 설정
    private final static String TOPIC_NAME = "test";
    // 전송하고자하는 카프카 클러스터의 서버의 host, ip
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {

        // KafkaProducer 인스턴스를 생성하기 위한 옵션들을 key, value 값으로 선언
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        String messageValue = "testMessage";
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, messageValue);
        producer.send(record);
        logger.info("{}", record);
        producer.flush();
        producer.close();
    }
}

 

출력화면

[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [my-kafka:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = producer-1
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
	... 생략

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 2.5.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 66563e712b0b9f84
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1671368185049
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: FbiHTu4QQcGMFEt3Xc4ZpQ
[main] INFO com.example.simplekafkaproducer.SimpleProducer - ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=testMessage, timestamp=null)
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.

카프카 프로듀서 구동 시 설정한 옵션, 카프카 클라이언트 버전, 전송한 ProducerRecord 등이 출력된다.

ProducerRecord 인스턴스 생성 시 메시지 키를 설정하지 않았기에 null로 설정된 것을 확인할 수 있다.

 

 

컨슈머를 통해 전송된 데이터 확인

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic test \
> --from-beginning
testMessage

 

 

메시지 키를 가진 데이터 전송

public class ProducerWithKeyValue {
    private final static Logger logger = LoggerFactory.getLogger(ProducerWithKeyValue.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";

    public static void main(String[] args) {

        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "Pangyo", "23");
        producer.send(record);
        logger.info("{}", record);
        producer.flush();
        producer.close();
    }   
}

 

출력화면

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic test \
--property print.key=true \
> --property key.separator="-" \
> --from-beginning
null-testMessage
Pangyo-23

 

 

 

 

 

해당 내용 출처 아파치 카프카 애플리케이션 프로그래밍 with 자바

 

반응형