본문 바로가기

Programming/Apache Kafka

스트림즈DSL로 간단하게 데이터 주고받기 - 카프카스트림즈(KafkaStreams) with stream-filter

반응형

스트림즈DSL로 데이터 주고받기 - 카프카스트림즈(KafkaStreams)  

 

1. 필터 없이 순수 데이터 프로듀스 후 컨슘

 

- 스트림즈 애플리케이션 Class 생성 후 실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class SimpleStreamApplication {
 
    private static String APPLICATION_NAME = "streams-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_COPY = "stream_log_copy";
 
    public static void main(String[] args) {
        Properties props = new Properties();
 
        /* 애플리케이션 아이디 지정, 애플리케이션 아이디 값 기준으로 병렬 처리 됨 */
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        /* 스트림즈 애플리케이션과 연동할 카프카 클러스터 정보 */
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        /* 스트림 처리를 위해 메시지 키,값의 역직렬화, 직렬화 방식 지정, 데이터 처리 시 키, 값을 역직렬화 사용하고 최종적으로 토픽에 넣을 시 직렬화 */
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
                Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
                Serdes.String().getClass());
 
        /* 스트림 토폴로지를 정의하기 위한 용도 */
        StreamsBuilder builder = new StreamsBuilder();
        /* stream_log 토픽으로부터 KStream 객체를 만들기 위해 StreamBuilder의 stream() 메서드 사용 */
        KStream<StringString> streamLog = builder.stream(STREAM_LOG);
        /* stream_log 토픽을 담은 KStream 객체를 다른 토픽으로 전송키 위해 to() 메서드 사용
        * to() 메서드는 KStream 인스턴스의 데이터들을 특정 토픽으로 저장하기 위한 용도로 사용된다.
        * 즉, to() 메서드는 싱크 프로세서이다.*/
        streamLog.to(STREAM_LOG_COPY);
 
        /* StreamBuilder로 정의한 파라미터로 인스턴스 생성
        * 인스턴스 실행하려면 start() 메서드 사용
        * 이 스트림즈 애플리케이션은 stream_log 토픽의 데이터를 stream_log_copy 토픽으로 전달한다. */
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
cs

 

 

- stream_log 토픽 생성

$ kafka_2.12-2.5.0 % bin/kafka-topics.sh --create \
--bootstrap-server my-kafka:9092 \
--partitions 3 \
--topic stream_log
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic stream_log.

 

 

stream_log 토픽에 데이터 프로듀스

kafka_2.12-2.5.0 % bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic stream_log                      
>hello
>kafka
>stream

 

 

- stream_log_copy 토픽 조회

$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic stream_log_copy 
--from-beginning
hello
kafka
stream

 

stream_log 토픽의 데이터를 stream_log_copy 토픽으로 보낸 것을 확인할 수 있다.

 

 

1. steam-filter를 사용하여 5자리 문자이상의 데이터만 추출하기

 

- 스트림즈 필터 애플리케이션 Class 생성 후 실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class StreamsFilter {
 
    private static String APPLICATION_NAME = "streams-filter-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String STREAM_LOG = "stream_log";
    private static String STREAM_LOG_FILTER = "stream_log_filter";
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream<StringString> streamLog = builder.stream(STREAM_LOG);
        // 메시지 값 길이가 5보다 큰 경우만 필터링
        KStream<StringString> filteredStream = streamLog.filter(
                (key, value) -> value.length() > 5);
        filteredStream.to(STREAM_LOG_FILTER);
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
cs

 

 

stream_log 토픽에 데이터 프로듀스

$ kafka_2.12-2.5.0 % bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic stream_log
>hello
>gogo
>streams

 

 

- stream_log_filter 토픽 조회

$ kafka_2.12-2.5.0 % bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic stream_log_filter --from-beginning
stream
streams

 

 

앞서 넣은 stream (6자리) 과 방금 넣은 streams (7자리) 가 출력되는 것을 확인할 수 있다.

 

 

 

 

해당 내용 출처 아파치 카프카 애플리케이션 프로그래밍 with 자바

 

반응형