Programming/Apache Kafka

아파치 카프카 실습 (Apache Kafka) / 프로세서 API 활용하기

마실개 2023. 1. 25. 21:03
반응형

* 프로세서 API

프로세서 API는 스트림즈DSL보다 투박한 코드를 가지지만 토폴로지를 기준으로 데이터를 처리한다는 관점에서는 동일한 역할을 한다.

스트림즈DSL은 데이터 처리, 분기, 조인을 위한 다양한 메서드들을 제공하지만

추가적인 상세 로직의 구현이 필요하다면 프로세서 API를 활용할 수 있다.

프로세서 API에서는 스트림즈DSL에서 사용했던 KStream, KTable, GlobalKTable 개념이 없다는 점을 주의해야 한다.

 

 

문자열 5자리 이상 데이터 필터링 실습

스트림즈DSL에서는 filter() 메서드를 스트림 프로세서로 사용해서 구현할 수 있었지만

프로세스API에서 동일한 로직을 구현하기 위해서는 스트림 프로세서 역할을 하는 클래스를 생성해야 한다.

 

1. 필터링 역할을 위한 스트림 프로세서 FilterProcessor.java 클래스 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
 
/* 스트림 프로세서 클래스를 생성하기 위해선 kafka-streams 라이브러리에서 제공하는
*  Processor 또는 Transformer 인터페이스를 사용한다.*/
public class FilterProcessor implements Processor<StringString> {
 
    /* ProcessorContext 클래스는 프로세서에 대한 정보를 담고 있다.
    * 생성된 인스턴스로 현재 스트림 처리 중인 토폴로지의 토픽 정보, 애플리케이션 아이디를 조회할 수 있다. */
    private ProcessorContext context;
 
    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }
 
    @Override
    public void process(String key, String value) {
        if (value.length() > 5) {
            context.forward(key, value);
        }
        context.commit();
    }
 
    @Override
    public void close() {
 
    }
}
 
cs

 

 

2. FilterProcessor 클래스를 사용하는 SimpleKafkaProcessor.java 클래스 생성 후 실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class SimpleKafkaProcessor {
 
    private static String APPLICATION_NAME = "processor-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_FILTER = "stream_log_filter";
 
    public static void main(String[] args) {
 
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        // Topology 클래스는 프로세서 API를 사용한 토폴로지를 구성하기 위해 사용된다.
        Topology topology = new Topology();
        // Stream_log 토픽을 소스 프로세서로 가져오기 위해 addSource() 메서드를 사용했다.
        topology.addSource("Source", STREAM_LOG)
                // 스트림 프로세서를 사용하기 위해 addProcessor() 메서드를 사용했다.
                // 부모 노드는 Source, 다음 프로세서는 Process 스트림 프로세스이다.
                .addProcessor("Process",
                        () -> new FilterProcessor(),"Source")
                // stream_log_filter를 싱크 프로세서로 사용하여 데이터를 저장하기 위해 addSink() 메서드를 사용했다.
                // 부모 노드는 Process 이다.
                .addSink("Sink",STREAM_LOG_FILTER,
                        "Process");
 
        KafkaStreams streaming = new KafkaStreams(topology, props);
        streaming.start();
    }
}
 
cs

 

 

3. stream_log 토픽에 다양한 길이의 메시지 값 입력

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic stream_log
>masildog
>hello
>streams

 

 

4. stream_log_filter 토픽 확인

$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic stream_log_filter --from-beginning
masildog
streams

 

 

5자리 길이 초과 데이터만 필터링 되어 stream_log_filter 토픽에 저장된 것을 볼 수 있다.

 

 

 

 

해당 내용 출처 아파치 카프카 애플리케이션 프로그래밍 with 자바

 

반응형