본문 바로가기

Programming/Apache Kafka

아파치 카프카 실습 (Apache Kafka) / KTable과 KStream 조인하기

반응형

 

KTable과 KStream 조인하여 데이터 추출하기

 

KTable과 KStream을 조인할 때 중요한 것은 코파티셔닝 되어 있어야 한다.

KTable로 사용할 토픽과 KStream으로 사용할 토픽을 생성할 때 동일한 파티션 개수, 파티셔닝을 사용하는 것이 중요하다.

KTable로 사용할 토픽은 address이고, KStream으로 사용할 토픽은 order이다.

조인된 데이터를 저장할 토픽은 order_join 으로 생성한다.

 

$ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
> --partitions 3 \
> --topic address
Created topic address.

$ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
> --partitions 3 \
> --topic order
Created topic order.

bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
> --partitions 3 \
> --topic order_join
Created topic order_join.

 

 

- 애플리케이션을 구성한 class 생성 후 실행

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class KStreamJoinTable {
 
    private static String APPLICATION_NAME = "order-join-application";
    private static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private static String ADDRESS_TABLE = "address";
    private static String ORDER_STREAM = "order";
    private static String ORDER_JOIN_STREAM = "order_join";
 
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME);
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        // address 토픽을 KTable로 가져올 때는 table() 메서드를 소스 프로세서로 사용하면 된다.
        KTable<StringString> addressTable = builder.table(ADDRESS_TABLE);
        // order 토픽은 KStream으로 가져올 것이므로 stream() 메서드를 소스 프로세서로 사용된다.
        KStream<StringString> orderStream = builder.stream(ORDER_STREAM);
 
        // 조인을 위해 KStream 인스턴스에 정의되어 있는 join() 메서드를 사용한다.
        // 첫 번째 파라미터로 조인을 수행할 KTable 인스턴스를 넣는다.
        // KStream과 KTable에서 동일한 메시지 키를 가진 데이터를 찾았을 경우 각각의 메시지 값을 조합해서 어떤 데이터를 만들지 정의한다.
        // 여기서는 order 토픽의 물품 이름과 address토픽의 주소를 조합하여 새로운 메시지 값을 만든다.
        orderStream.join(addressTable,
                (order, address) -> order + " send to " + address)
                .to(ORDER_JOIN_STREAM);
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
}
 
cs

 

 

-  KTable로 사용되는 address 토픽에 이름을 키, 주소를 메시지값으로 전송

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic address \
> --property "parse.key=true" \
> --property "key.separator=:" 
>masildog:Seoul
>masilcat:Busan

 

-  KStream으로 사용되는 order 토픽에 이름을 키, 주문 물품을 메시지값으로  전송

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic order \  
--property "parse.key=true" \
--property "key.separator=:"
>masildog:iPhone
>masilcat:Galaxy

 

-  order_join 토픽 확인하여 order, adress 각각 토픽의 조인된 데이터가 있는지 확인

$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic order_join \
> --property print.key=true \
> --property key.separator=":" \
> --from-beginning
masilcat:Galaxy send to Busan
masildog:iPhone send to Seoul

 

 

 

KTable에 존재하는 메시지 키를 기준으로 KStream이 데이터를 조인하여

order_join 토픽에서는 물품과 주소 데이터가 합쳐진 것을 볼 수 있다.

조인할 때 사용했던 메시지 키는 조인이 된 데이터의 메시지 키로 들어간다.

 

address KTable과 order KStream이 조인되어 데이터가 만들어진 모습

 

 

사용자 주소가 변경되는 경우엔 KTable은 동일한 메시지 키가 들어올 경우

가장 마지막 레코드를 유효한 데이터로 보기 때문에 가장 최근에 바뀐 주소로 조인을 수행한다.

현재 masildog 사용자가 Seoul 주소를 가지고 있는데, Jeju 로 바꾸어 추가해보자.

그리고 order 토픽에 masildog 사용자가 물품을 주문하도록 order에 레코드를 추가한다.

 

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic address \
--property "parse.key=true" \
--property "key.separator=:"
>masildog:Jeju

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic order \   
> --property "parse.key=true" \
> --property "key.separator=:"
>masildog:Tesla

 

- order_join 토픽 확인 시 신규 주소 확인

$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic order_join \
--property print.key=true \  
--property key.separator=":" \
--from-beginning
masildog:Tesla send to Jeju

 

반응형