스프링 카프카 컨슈머
스프링 카프카의 컨슈머는 기존 컨슈머를 2개의 타입으로 나누고 커밋을 7가지로 나누어 세분화했다.
타입은 레코드 리스너(MessageListener)와 배치 리스터(BatchMessageListener)가 있다.
리스너 종류에 따라 한번 호출하는 메서드에서 처리하는 레코드의 개수가 달라진다.
레코드 리스너
- 단 1개의 레코드를 처리한다.
- 스프링 카프카 컨슈머의 기본 리스너 타입이다.
배치 리스너
- 한 번에 여러개 레코드들을 처리할 수 있다.
두 리스너 외에도 각 리스너에서 파생된 형태가 존재한다.
- AcknowledgingMessageListener
- ConsumerAwareMessageListener
- AcknowledgingConsumerAwareMessageListener
- BatchAcknowledgingMessageListener
- BatchConsumerAwareMessageListener
- BatchAcknowledgingConsumerAwareMessageListener
메뉴얼 커밋을 사용할 경우에는 Acknowledging이 붙은 리스터를 사용하고,
KafkaConsumer 인스턴스에 직접 접근하여 컨트롤하고 싶다면 ConsumerAware가 붙은 리스너를 사용하면 된다.
기존 카프카 클라이언트 라이브러리에서 컨슈머를 구현할 때 가장 어려운 부분이 커밋을 구현하는 것이다.
컨슈머에서 커밋을 직접 구현할 때는 오토 커밋, 동기 커밋, 비동기 커밋 3가지로 나뉘지만 실제 운영환경에서는 다양한 종류의 커밋을 구현해서 사용하기 때문이다.
그러나 스프링 카프카에서는 사용자가 사용할 만한 커밋의 종류를 7가지로 세분화하여 제공한다.
스프링 카프카에서는 커밋이라고 부르지 않고 AckMode라고 부른다.
스프링 카프카 컨슈머의 AckMode 기본값은 BATCH이고 컨슈머의 enable.auto.commit 옵션은 false로 지정된다.
AcksMode 종류
AckMode | 설명 | |
RECORD | 레코드 단위로 프로세싱 이후 커밋 | |
BATCH | poll() 메서드로 호출된 레코드가 모두 처리된 이후 커밋 스프링 카프카 컨슈머의 AckMode 기본값 |
|
TIME | 특정 시간 이후에 커밋 이 옵션을 상용할 경우 시간 간격을 선언하는 AckTime 옵션을 설정해야 한다. |
|
COUNT | 특정 개수만큼 레코드가 처리된 이후에 커밋 이 옵션을 사용할 경우 레코드 개수를 선언하는 AckCount 옵션을 설정해야 한다. |
|
COUNT_TIME | TIME, COUNT 옵션 중 맞는 조건이 하나라도 나올 경우 커밋 | |
MANUAL | Acknowledement.acknowledge() 메서드가 호출되면 다음번 poll() 때 커밋을 한다. 매번 acknowledge() 메서드를 호출하면 BATCH 옵션과 동일하게 동작한다. 이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스너로 사용해야 한다. | |
MNUAL_IMMEDIATE | Acknowledgement.acknowledge() 메서드를 호출한 즉시 커밋한다. 이 옵션을 사용할 경우 AcknowledgingMessageListener 또는 BatchAcknowledgingMessageListener를 리스터로 사용해야 한다. |
리스너를 생성하고 사용하는 방식
- 기본 리스너 컨테이너
- 컨테이너 팩토리를 사용하여 직접 구현
* 기본 리스너 컨테이너
기본 리스너 컨테이너는 기본 리스너 컨테이너 팩토리를 통해 생성된 리스너 컨테이너를 사용한다.
기본 리스너 컨테이너를 사용할 때는 apllication.yaml에 컨슈머와 리스너 옵션을 넣고 사용할 수 있다.
application.yaml에 설정한 컨슈머와 리스너 옵션값은 애플리케이션이 실행될 때 자동으로 오버라이드되어 설정된다.
1. 레코드 리스너 (message Listener)
1.1 application.yaml 설정
spring:
kafka:
consumer:
bootstrap-servers: my-kafka:9092
listener:
type: RECORD
1.2 레코드 리스너를 활용한 코드 작성
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | @SpringBootApplication public class SpringConsumerApplication { public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringConsumerApplication.class); application.run(args); } // 가장 기본적인 리스너 선언. // 어노테이션 옵션으로 topics, groupId를 설정하여 지정한다. // poll()이 호출되어 가져온 레코드들은 차례대로 개별 레코드의 메시지 값을 파라미터로 받게 된다. // 파라미터로 컨슈머 레코드를 받기 때문에 메시지 키, 값에 대한 처리를 이 메서드 안에서 처리하면 된다. @KafkaListener(topics = "test", groupId = "test-group-00") public void recordListener(ConsumerRecord<String, String> record) { logger.info(record.toString()); } // 메시지 값을 파라미터로 받는 리스너이다. // 스프링 카프카의 역직렬화 클래스의 기본값인 StringDeserializer를 사용했으므로 String 클래스로 메시지 값을 받았다. @KafkaListener(topics = "test", groupId = "test-group-01") public void singleTopicListener(String messageValue) { logger.info(messageValue); } // 개별 리스너에 카프카 컨슈머 옵션값을 부여하고 싶다면 KafkaListener 어노테이션의 properties 옵션을 사용하면 된다. @KafkaListener(topics = "test", groupId = "test-group-02", properties = {"max.poll.interval.ms:60000","auto.offset.reset:earliest"}) public void singleTopicWithPropertiesListener(String messageValue) { logger.info(messageValue); } // 2개 이상의 카프카 컨슈머 스레드를 사용하고 싶다면 concurrency 옵션을 사용하면 된다. // concurrency 옵션값에 해당하는 만큼 컨슈머 스레드를 만들어서 병렬처리한다. @KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3") public void concureentTopicListener(String messageValue) { logger.info(messageValue); } // 특정 토픽의 특정 파티션만 구독하고 싶다면 topicPartitions 파라미터를 사용한다. // PartitionOffset 어노테이션을 활용하면 특정 파티션의 특정 오프셋까지 지정할 수 있다. // 이 경우 그룹 아이디에 관계없이 항상 설정한 오프셋의 데이터부터 가져온다. @KafkaListener(topicPartitions = { @TopicPartition(topic = "test01", partitions = {"0", "1"}), @TopicPartition(topic = "test02", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "3")) }, groupId = "test-group-04") public void listenSpecificPartition(ConsumerRecord<String, String> record) { logger.info(record.toString()); } } | cs |
2. 배치 리스너 (BatchMessageListener)
2.1 application.yaml 설정
spring:
kafka:
consumer:
bootstrap-servers: my-kafka:9092
listener:
type: BATCH
2.2 배치 리스너를 활용한 코드 작성
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | @SpringBootApplication public class SpringConsumerApplication { public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringConsumerApplication.class); application.run(args); } // 컨슈머 레코드의 묶음(ConsumerRecords)을 파라미터로 받는다. // 카프카 클라이언트 라이브러리에서 poll() 메서드로 리턴받은 ConsumerRecords를 리턴받아 사용하는 것과 동일하다. @KafkaListener(topics = "test", groupId = "test-group-01") public void batchListener(ConsumerRecords<String, String> records) { records.forEach(record -> logger.info(record.toString())); } // 메시지 값들을 List 자료구조로 받아서 처리한다. @KafkaListener(topics = "test", groupId = "test-group-02") public void batchListener(List<String> list) { list.forEach(recordValue -> logger.info(recordValue)); } // 2개 이상의 컨슈머 스레드로 배치 리스너를 운영할 경우 concurrency 옵션을 함께 선언하용 사용하면 된다. // 3개의 컨슈머 스레드가 생성된다. @KafkaListener(topics = "test", groupId = "test-group-03", concurrency = "3") public void concurrentBatchListener(ConsumerRecords<String, String> records) { records.forEach(record -> logger.info(record.toString())); } } | cs |
3. 배치 컨슈머 리스너(BatchConsumerAwareMessageListener) / 배치 커밋 리스너(BatchAcknowledgingMessageListener)
둘 다 배치 타입으로 선언된 리스너이다.
배치 컨슈머 리스너
- 컨슈머를 직접 사용하기 위해 컨슈머 인스턴스를 파라미터로 받는다.
- 컨슈머 인스턴스를 사용하면 동기 커밋과 비동기 커밋을 사용할 수 있다.
배치 커밋 리스너
- 컨테이너에서 관리하는 AckMode를 사용하기 위해 Acknowledgement 인스턴스를 파라미터로 받는다.
- Acknowledgement 인스턴스는 커밋을 수행하기 위한 한정적인 메서드만 제공한다.
만약 AckMode도 사용하고 컨슈머도 사용하고 싶다면 배치 커밋 컨슈머 리스너(BatchAcknowledgingConsumerAwareMessageListener)를 사용하면 된다.
3.1 application.yaml 설정
spring:
kafka:
consumer:
bootstrap-servers: my-kafka:9092
listener:
type: BATCH
ack-mode: MANUAL_IMMEDIATE
3.2 배치 컨슈머 리스너와 배치 커밋 리스너를 활용한 코드 작성
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | @SpringBootApplication public class SpringConsumerApplication { public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringConsumerApplication.class); application.run(args); } // AckMode를 MANUAL 또는 MANUAL_IMMEDIATE로 사용할 경우 수동커밋을 하기 위해 파라미터로 Acknowledgment를 받아야 한다. // acknowledge() 메서드를 호출함으로써 커밋을 수행할 수 있다. @KafkaListener(topics = "test", groupId = "test-group-01") public void commitListener(ConsumerRecords<String, String> records, Acknowledgment ack) { records.forEach(record -> logger.info(record.toString())); ack.acknowledge(); } // 동기 커밋, 비동기 커밋을 사용하고 싶다면 컨슈머 인스턴스를 파라미터로 받아 사용할 수 있다. // consumer 인스턴스의 commitSync(), commitAsync() 메서드를 호출하면 // 사용자가 원하는 타이밍에 커밋할 수 있도록 로직을 추가할 수 있다. // 다만, 리스너가 커밋을 하지 않도록 AckMode는 MANUAL 또는 MANUAL_IMMEDIATE로 설정해야한다. @KafkaListener(topics = "test", groupId = "test-group-02") public void consumerCommitListener(ConsumerRecords<String, String> records, Consumer<String, String> consumer) { records.forEach(record -> logger.info(record.toString())); consumer.commitSync(); } } | cs |
* 커스텀 리스너 컨테이너
서로 다른 설정을 가진 2개 이상의 리스너를 구현하거나 리밸런스 리스너를 구현하기 위해서는 커스텀 리스너 컨테이너를 사용해야 한다.
커스텀 리스너 컨테이너를 만들기 위해서는 스프링 카프카에서 카프카 리스너 컨테이너 팩토리 인스턴스를 생성해야 한다.
카프카 리스너 컨테이너 팩토리를 빈으로 등록하고 KafkaListener 어노테이션에서 커스텀 리스너 컨테이너 팩토리를 등록하면 커스텀 리스너 컨테이너를 사용할 수 있다.
ListenerContainerConfiguration 작성
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | @Configuration public class ListenerContainerConfiguration { // KafkaListenerContainerFactory 빈 객체를 리턴하는 메서드를 생성한다. // 이 메서드 이름은 커스텀 리스너 컨테이너 팩토리로 선언할 때 사용된다. @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> customContainerFactory() { // 컨슈머를 실행할 때 필요한 옵션값 세팅 // group.id는 리스너 컨테이너에도 선언하므로 생략가능 Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // DefaultKafkaConsumerFactory 인스턴스 생성. // 이것은 리스너 컨테이너 팩토리를 생성할 때 컨슈머 기본 옵션을 설정하는 용도이다. DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(props); // 2개 이상의 컨슈머 리스너를 만들 때 사용되며 concurrency를 1로 설정할 경우 1개 컨슈머 스레드로 실행된다. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 리밸런스 리스너를 선언하기 위해 setConsumerRebalanceListener 메서드를 호출한다. // 이 메서드는 스프링 카프카에서 제공하는 메서드로 기존에 사용되는 카프카 컨슈머 리밸런스 리스너에 2개의 메서드를 호출한다. factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() { // 커밋이 되기 전에 리밸런스가 발생 했을 때 호출 @Override public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { } // 커밋이 일어난 이후 리밸런스가 발생 했을 때 호출 @Override public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { } @Override public void onPartitionsLost(Collection<TopicPartition> partitions) { } }); // 레코드 리스너를 명시하기 위해 해당 메서드에 false를 파라미터로 넣는다. // 만약 배치 리스너를 사용하고 싶다면 true로 설정하면 된다. factory.setBatchListener(false); // ackMode를 설정한다. 레코드 단위로 커밋하기 위해 RECORD로 설정했다. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.RECORD); // 컨슈머 설정값을 가지고 있는 DefaultKafkaConsumerFactory 인스턴스를 ConcurrentKafkaListenerContainerFactory의 컨슈머 팩토리에 설정한다. factory.setConsumerFactory(cf); return factory; } } | cs |
SrpingConsumerApplication 작성
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 | @SpringBootApplication public class SpringConsumerApplication { public static Logger logger = LoggerFactory.getLogger(SpringConsumerApplication.class); public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringConsumerApplication.class); application.run(args); } // KafkaListener 어노테이션의 containerFactory 옵션을 커스텀 컨테이너 팩토리로 설정한다. // 빈 객체로 등록한 이름인 customContainerFactory를 옵션값으로 설정하면 커스텀 컨테이너 팩토리로 생성된 커스텀 리스너 컨테이너를 사용할 수 있다. @KafkaListener(topics = "test", groupId = "test-group", containerFactory = "customContainerFactory") public void customListener(String data) { logger.info(data); } } | cs |
'Programming > Apache Kafka' 카테고리의 다른 글
아파치 카프카 실습 (Apache Kafka) / 스프링 카프카(Spring Kafka) - 스프링 카프카 프로듀서 (0) | 2023.02.07 |
---|---|
아파치 카프카 실습 (Apache Kafka) / 컨슈머 랙 - 상세 개념 (0) | 2023.02.06 |
아파치 카프카 실습 (Apache Kafka) / 카프카 컨슈머 - 상세 개념 (멀티스레드 컨슈머) (0) | 2023.02.01 |
아파치 카프카 실습 (Apache Kafka) / 카프카 프로듀서 - 상세 개념 (acks, 멱등성 프로듀서, 트랜잭션 프로듀서) (0) | 2023.01.31 |
아파치 카프카 실습 (Apache Kafka) / 토픽과 파티션 - 상세 개념 (적정 파티션 개수, 메시지 키 사용 여부, 토픽 정리 정책, ISR) (0) | 2023.01.30 |