스프링 카프카
스프링 카프카는 카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만들어진 라이브러리다.
기존 카프카 클라이언트 라이브러리를 래핑하여 마든 스프링 카프카 라이브러리는 카프카 클라이언트에서 사용하는 여러 가지 패턴을 미리 제공한다.
예를 들어, 컨슈머를 멀티 스레드로 운영하기 위한 스레드 풀 로직은 스프링 카프카를 사용하면 concurrency 옵션 하나만 추가하면 어렵지 않게 구현할 수 있다.
스프링 카프카 라이브러리를 스프링 부트 프레임워크와 함께 사용하기 위해서는 다음과 같이 build.gradle에 디펜던시를 추가한다.
dependencies {
// 스프링 카프카 라이브러리 디펜던시
// 스프링 카프카 라이브러리를 추가하면 카프카 클라이언트 라이브러리도 같이 포함되어 빌드된다.
implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'
// 스프링 부트를 활용하기 위해 스프링 부트 스타터를 추가한다.
implementation 'org.springframework.boot:spring-boot-starter:2.4.0'
}
스프링 카프카 라이브러리는 어드민, 컨슈머, 프로듀서, 스트림즈 기능을 제공한다.
1. 스프링 카프카 프로듀서
스프링 카프카 프로듀서는 '카프카 템플릿(Kafka Template)' 라고 불리는 클래스를 사용하여 데이터를 전송할 수 있다.
카프카 템플릿은 프로듀서 팩토리(ProducerFactory) 클래스를 통해 생성할 수 있다.
카프카 템플릿 사용법은 2가지가 있다.
- 스프링 카프카에서 제공하는 기본 카프카 템플릿 사용
- 직접 사용자가 카프카 템플릿을 프로듀서 팩토리로 생성하여 사용
1.1 기본 카프카 템플릿
기본 카프카 템플릿은 기본 프로듀서 팩토리를 통해 생성된 카프카 템플릿을 사용한다.
기본 카프카 템플릿 사용 시 application.yaml에 프로듀서 옵션을 넣고 사용할 수 있다.
application.yaml에 설정한 프로듀서 옵션값은 애플리케이션이 실행될 때 자동으로 오버라이드되어 설정된다.
옵션을 설정하지 않으면 bootstrap-servers는 localhost:9092, key-serializer와 value-serializer는 StringSerializer로 자동 설정된다.
연결하고픈 대상 서버와 acks옵션을 all로 설정하고 싶다면 application.yaml 에 아래와 같이 설정한다.
spring:
kafka:
producer:
bootstrap-servers: my-kafka:9092
acks: all
스프링 부트 애플리케이션을 실행하고 test0 부터 test9까지 메시지 값을 클러스터로 보내는 프로듀서 애플리케이션을 작성한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | @SpringBootApplication public class SpringProducerApplication implements CommandLineRunner { private static String TOPIC_NAME = "test"; // KafkaTemplate을 @Autowired 어노테이션으로 주입받아서 사용한다. // 사용자가 직접 선언하지 않은 bean객체지만 스프링 카프카에서 제공하는 기본 KafkaTemplate 객체로 주입된다. // application.yaml에 선언한 옵션값은 자동으로 주입된다. @Autowired private KafkaTemplate<Integer, String> template; public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringProducerApplication.class); application.run(args); } @Override public void run(String... args) { for (int i = 0; i < 10; i++) { // send() 메서드를 사용해 토픽 이름과 메시지 값을 넣어 전송한다. template.send(TOPIC_NAME, "test" + i); } // 프로듀서를 데이터 전송이 완료되면 종료한다. System.exit(0); } } | cs |
KafkaTemplate은 send(String topic, V data) 이외에도 여러 가지 데이터 전송 메서드들을 오버로딩하여 제공한다.
다음은 send()를 오버로딩하는 메서드들이다.
- send(String topic, K key, V data) : 메시지 키, 값을 포함하여 특정 토픽으로 전달
- send(String topic, Integer partition, K key, V data) : 메시지 키, 값이 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
- send(String topic, Integer partition, Long timestamp K key, V data) : 메시지 키, 값, 타임스탬프가 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
- send(ProducerRecord<K,V> record) : 프로듀서 레코드 객체를 전송
1.2 커스텀 카프카 템플릿
커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것이다.
프로듀서에 필요한 각종 옵션을 선언하여 사용할 수 있으며 한 스프링 카프카 애플리케이션 내부에 다양한 종류의 카프카 프로듀서 인스턴스를 생성하고 싶다면 이 방식을 사용하면 된다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | // KafkaTemplate 빈 객체 등록을 위해 Configuration 어노테이션을 선언한 클래스를 만든다. // 이 클래스에서 KafkaTemplate 빈 객체가 등록된다. @Configuration public class KafkaTemplateConfiguration { // KafkaTemplate 객체를 리턴하는 빈 객체. @Bean public KafkaTemplate<String, String> customKafkaTemplate() { // ProducerFactory를 사용하여 KafkaTemplate 객체를 만들 때는 프로듀서 옵션을 직접 넣는다. // 카프카 기본 템플릿과 다르게 직접 프로듀서 옵션들을 선언해야 한다. Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.ACKS_CONFIG, "all"); // KafkaTemplate 객체를 만들기 위한 ProducerFactory를 초기화한다. ProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(props); // 빈 객체로 사용할 KafkaTemplate 인스턴스를 초기화하고 리턴한다. return new KafkaTemplate<>(pf); } } | cs |
KafkaTemplate 빈 객체를 등록하였다.
이제 메인 애플리케이션에서 기본 카프카 템플릿에서 사용했던 것처럼 Autowired 어노테이션을 붙여 선언하면 된다.
다른 점은 빈 객체 이름과 동일하게 customKafkaTemplate을 변수명으로 선언해야 한다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | @SpringBootApplication public class springProducerApplication implements CommandLineRunner { private static String TOPIC_NAME = "my-kafka:9092"; // 빈 객체로 등록한 customKafkaTemplate을 주입받도록 메서드 이름과 동일한 변수명 선언. @Autowired private KafkaTemplate<String, String> customKafkaTemplate; public static void main(String[] args) { SpringApplication application = new SpringApplication(SpringProducerApplication.class); application.run(args); } @Override public void run(String... args) { // 커스텀 카프카 템플릿을 주입받은 이후에 KafkaTemplate 인스턴스를 사용하는 것은 // 기본 카프카 템플릿을 사용하는 것과 동일하다. // send() 메서드를 사용하여 특정 토픽으로 데이터를 전송할 수 있다. // 만약 전송한 이후에 정상 적재됐는지 여부를 확인하고 싶다면 ListenableFuture 메서드를 사용하면 된다. ListenableFuture<SendResult<String, String>> future = customKafkaTemplate.send(TOPIC_NAME, "test"); // ListenableFutrue 인스턴스에 addCallback 함수를 붙여 프로듀서가 보낸 데이터의 브로커 적재여부를 비동기로 확인할 수 있다. // 만약 브로커에 정상 적재되었다면 onSuccess메서드가 호출된다. // 적재되지 않고 이슈가 발생했다면 OnFailure 메서드가 호출된다. future.addCallback(new KafkaSendCallback<String, String>() { @Override public void onSuccess(SendResult<String, String> result) { } @Override public void onFailure(KafkaProducerException ex) { } }); System.exit(0); } } | cs |
'Programming > Apache Kafka' 카테고리의 다른 글
아파치 카프카 실습 (Apache Kafka) / 스프링 카프카(Spring Kafka) - 스프링 카프카 컨슈머 (0) | 2023.02.09 |
---|---|
아파치 카프카 실습 (Apache Kafka) / 컨슈머 랙 - 상세 개념 (0) | 2023.02.06 |
아파치 카프카 실습 (Apache Kafka) / 카프카 컨슈머 - 상세 개념 (멀티스레드 컨슈머) (0) | 2023.02.01 |
아파치 카프카 실습 (Apache Kafka) / 카프카 프로듀서 - 상세 개념 (acks, 멱등성 프로듀서, 트랜잭션 프로듀서) (0) | 2023.01.31 |
아파치 카프카 실습 (Apache Kafka) / 토픽과 파티션 - 상세 개념 (적정 파티션 개수, 메시지 키 사용 여부, 토픽 정리 정책, ISR) (0) | 2023.01.30 |