본문 바로가기

Programming/Apache Kafka

아파치 카프카 실습 (Apache Kafka) / 스프링 카프카(Spring Kafka) - 스프링 카프카 프로듀서

반응형

스프링 카프카

스프링 카프카는 카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만들어진 라이브러리다.

기존 카프카 클라이언트 라이브러리를 래핑하여 마든 스프링 카프카 라이브러리는 카프카 클라이언트에서 사용하는 여러 가지 패턴을 미리 제공한다.

예를 들어, 컨슈머를 멀티 스레드로 운영하기 위한 스레드 풀 로직은 스프링 카프카를 사용하면 concurrency 옵션 하나만 추가하면 어렵지 않게 구현할 수 있다.

 

스프링 카프카 라이브러리를 스프링 부트 프레임워크와 함께 사용하기 위해서는 다음과 같이 build.gradle에 디펜던시를 추가한다.

dependencies {
    // 스프링 카프카 라이브러리 디펜던시
    // 스프링 카프카 라이브러리를 추가하면 카프카 클라이언트 라이브러리도 같이 포함되어 빌드된다.
    implementation 'org.springframework.kafka:spring-kafka:2.5.10.RELEASE'
    // 스프링 부트를 활용하기 위해 스프링 부트 스타터를 추가한다.
    implementation 'org.springframework.boot:spring-boot-starter:2.4.0'
}

 

스프링 카프카 라이브러리는 어드민, 컨슈머, 프로듀서, 스트림즈 기능을 제공한다.

 

1. 스프링 카프카 프로듀서

스프링 카프카 프로듀서는 '카프카 템플릿(Kafka Template)' 라고 불리는 클래스를 사용하여 데이터를 전송할 수 있다.

카프카 템플릿은 프로듀서 팩토리(ProducerFactory) 클래스를 통해 생성할 수 있다.

카프카 템플릿 사용법은 2가지가 있다.

 

  • 스프링 카프카에서 제공하는 기본 카프카 템플릿 사용
  • 직접 사용자가 카프카 템플릿을 프로듀서 팩토리로 생성하여 사용

 

1.1 기본 카프카 템플릿

기본 카프카 템플릿은 기본 프로듀서 팩토리를 통해 생성된 카프카 템플릿을 사용한다.

기본 카프카 템플릿 사용 시 application.yaml에 프로듀서 옵션을 넣고 사용할 수 있다.

application.yaml에 설정한 프로듀서 옵션값은 애플리케이션이 실행될 때 자동으로 오버라이드되어 설정된다.

옵션을 설정하지 않으면 bootstrap-servers는 localhost:9092, key-serializer와 value-serializer는 StringSerializer로 자동 설정된다.

연결하고픈 대상 서버와 acks옵션을 all로 설정하고 싶다면 application.yaml 에 아래와 같이 설정한다.

spring:
      kafka:
        producer:
          bootstrap-servers: my-kafka:9092
          acks: all

 

 

 

스프링 부트 애플리케이션을 실행하고 test0 부터 test9까지 메시지 값을 클러스터로 보내는 프로듀서 애플리케이션을 작성한다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@SpringBootApplication
public class SpringProducerApplication implements CommandLineRunner {
 
    private static String TOPIC_NAME = "test";
 
    // KafkaTemplate을 @Autowired 어노테이션으로 주입받아서 사용한다.
    // 사용자가 직접 선언하지 않은 bean객체지만 스프링 카프카에서 제공하는 기본 KafkaTemplate 객체로 주입된다.
    // application.yaml에 선언한 옵션값은 자동으로 주입된다.
    @Autowired
    private KafkaTemplate<Integer, String> template;
 
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringProducerApplication.class);
        application.run(args);
    }
 
    @Override
    public void run(String... args) {
        for (int i = 0; i < 10; i++) {
            // send() 메서드를 사용해 토픽 이름과 메시지 값을 넣어 전송한다.
            template.send(TOPIC_NAME, "test" + i);
        }
        // 프로듀서를 데이터 전송이 완료되면 종료한다.
        System.exit(0);
    }
}
cs

 

KafkaTemplate은 send(String topic, V data) 이외에도 여러 가지 데이터 전송 메서드들을 오버로딩하여 제공한다.

다음은 send()를 오버로딩하는 메서드들이다.

 

  • send(String topic, K key, V data) : 메시지 키, 값을 포함하여 특정 토픽으로 전달
  • send(String topic, Integer partition, K key, V data) : 메시지 키, 값이 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
  • send(String topic, Integer partition, Long timestamp K key, V data) : 메시지 키, 값, 타임스탬프가 포함된 레코드를 특정 토픽의 특정 파티션으로 전달
  • send(ProducerRecord<K,V> record) : 프로듀서 레코드 객체를 전송

 

1.2 커스텀 카프카 템플릿

커스텀 카프카 템플릿은 프로듀서 팩토리를 통해 만든 카프카 템플릿 객체를 빈으로 등록하여 사용하는 것이다.

프로듀서에 필요한 각종 옵션을 선언하여 사용할 수 있으며 한 스프링 카프카 애플리케이션 내부에 다양한 종류의 카프카 프로듀서 인스턴스를 생성하고 싶다면 이 방식을 사용하면 된다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// KafkaTemplate 빈 객체 등록을 위해 Configuration 어노테이션을 선언한 클래스를 만든다.
// 이 클래스에서 KafkaTemplate 빈 객체가 등록된다.
@Configuration
public class KafkaTemplateConfiguration {
 
    // KafkaTemplate 객체를 리턴하는 빈 객체.
    @Bean
    public KafkaTemplate<StringString> customKafkaTemplate() {
 
        // ProducerFactory를 사용하여 KafkaTemplate 객체를 만들 때는 프로듀서 옵션을 직접 넣는다.
        // 카프카 기본 템플릿과 다르게 직접 프로듀서 옵션들을 선언해야 한다.
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "my-kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.ACKS_CONFIG, "all");
 
        // KafkaTemplate 객체를 만들기 위한 ProducerFactory를 초기화한다.
        ProducerFactory<StringString> pf = new DefaultKafkaProducerFactory<>(props);
 
        // 빈 객체로 사용할 KafkaTemplate 인스턴스를 초기화하고 리턴한다.
        return new KafkaTemplate<>(pf);
    }
 
}
cs

 

KafkaTemplate 빈 객체를 등록하였다.

 

 

이제 메인 애플리케이션에서 기본 카프카 템플릿에서 사용했던 것처럼 Autowired 어노테이션을 붙여 선언하면 된다.

다른 점은 빈 객체 이름과 동일하게 customKafkaTemplate을 변수명으로 선언해야 한다.

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@SpringBootApplication
public class springProducerApplication implements CommandLineRunner {
 
    private static String TOPIC_NAME = "my-kafka:9092";
 
    // 빈 객체로 등록한 customKafkaTemplate을 주입받도록 메서드 이름과 동일한 변수명 선언.
    @Autowired
    private KafkaTemplate<StringString> customKafkaTemplate;
 
    public static void main(String[] args) {
        SpringApplication application = new SpringApplication(SpringProducerApplication.class);
        application.run(args);
    }
 
    @Override
    public void run(String... args) {
        // 커스텀 카프카 템플릿을 주입받은 이후에 KafkaTemplate 인스턴스를 사용하는 것은
        // 기본 카프카 템플릿을 사용하는 것과 동일하다.
        // send() 메서드를 사용하여 특정 토픽으로 데이터를 전송할 수 있다.
        // 만약 전송한 이후에 정상 적재됐는지 여부를 확인하고 싶다면 ListenableFuture 메서드를 사용하면 된다.
        ListenableFuture<SendResult<StringString>> future = customKafkaTemplate.send(TOPIC_NAME, "test");
        // ListenableFutrue 인스턴스에 addCallback 함수를 붙여 프로듀서가 보낸 데이터의 브로커 적재여부를 비동기로 확인할 수 있다.
        // 만약 브로커에 정상 적재되었다면 onSuccess메서드가 호출된다.
        // 적재되지 않고 이슈가 발생했다면 OnFailure 메서드가 호출된다.
        future.addCallback(new KafkaSendCallback<StringString>() {
            @Override
            public void onSuccess(SendResult<StringString> result) {
            }
 
            @Override
            public void onFailure(KafkaProducerException ex) {
            }
        });
        System.exit(0);
    }
}
cs

 

반응형