본문 바로가기

Programming/Apache Kafka

아파치 카프카 실습 (Apache Kafka) / GlobalKTable과 KStream 조인하기

반응형

* 코파티셔닝되어 있지 않은 토픽을 조인해야 할 때

  • 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리 
  • KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용 

 

GlobalKTable 선언하여 실습해보기

 

1.  파티션 2개로 이루어진 address_v2 토픽 생성

(KStream으로 사용하는 order 토픽은 파티션이 3개이기 대문에 코파티셔닝 되지 않은 상태)

$ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
--partitions 2 \
--topic address_v2
Created topic address_v2.

 

 

2.  스트림 프로세싱을 위한 코드를 스트림즈DSL로 작성후 코드 실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class KStreamJoinGlobalKTable {
 
    private static String APPLICATION_NAME = "global-table-join-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String ADDRESS_GLOBAL_TABLE = "address_v2";
    private static String ORDER_STREAM =  "order";
    private static String ORDER_JOIN_STREAM = "order_join";
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        GlobalKTable<String,String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE);
        KStream<String,String> orderStream = builder.stream(ORDER_STREAM);
 
        // 이전 예제에서 KTable을 조인할 때 썼던 join() 메서드와 이름이 동일하며 오버로딩 사용한다.
        orderStream.join(addressGlobalTable,
                (orderKey, orderValue) -> orderKey,
                (order, address) -> order + " send to " + address)
                .to(ORDER_JOIN_STREAM);
 
        KafkaStreams streams;
        streams = new KafkaStreams(builder.build(), props);
        streams.start();
 
    }
}
 
cs

 

 

3.  address_v2 토픽에 이름을 메시지 키, 주소를 메시지 값으로 데이터 넣기

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic address_v2 \
> --property "parse.key=true" \
> --property "key.separator=:"
>masildog:Seoul
>masilcat:Busan

 

 

4.  KStream으로 사용되는 order 토픽에 이름을 메시지 키, 주문 물품을 메시지 값으로 데이터 넣기

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic order \
> --property "parse.key=true" \
> --property "key.separator=:"
>masilcat:Porshe
>masildog:Benz

 

 

5.  order_join 토픽을 확인하여 order 토픽과 address 토픽의 조인된 데이터 확인

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic order_join \
> --property print.key=true \
> --property key.separator=":" \
> --from-beginning
masildog:Tesla send to Jeju
masildog:Benz send to Seoul

 

 

GlobalKTable에 존재하는 메시지 키를 기준으로 KStream이 제대로 데이터를 조인하여 order_join 토픽에서는 물품과 주소 데이터가 합쳐진 것을 볼 수 있다.

 

결과물만 보면 KTable과 크게 다르지 않아 보이지만 GlobalKTable로 선언한 토픽은 토픽에 존재하는 모든 데이터를 태스크마다 저장하고 조인 처리를 수행하는 점이 다르다.

그리고 조인을 수행할 때 Stream의 메시지 키뿐만 아니라 메시지 값을 기준으로 매칭하여 조인할 수 있다는 점도 다르다.

 

 

 

 

해당 내용 출처 아파치 카프카 애플리케이션 프로그래밍 with 자바

 

반응형