반응형
KTable과 KStream 조인하여 데이터 추출하기
KTable과 KStream을 조인할 때 중요한 것은 코파티셔닝 되어 있어야 한다.
KTable로 사용할 토픽과 KStream으로 사용할 토픽을 생성할 때 동일한 파티션 개수, 파티셔닝을 사용하는 것이 중요하다.
KTable로 사용할 토픽은 address이고, KStream으로 사용할 토픽은 order이다.
조인된 데이터를 저장할 토픽은 order_join 으로 생성한다.
$ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
> --partitions 3 \
> --topic address
Created topic address.
$ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
> --partitions 3 \
> --topic order
Created topic order.
bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
> --partitions 3 \
> --topic order_join
Created topic order_join.
- 애플리케이션을 구성한 class 생성 후 실행
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 | public class KStreamJoinTable { private static String APPLICATION_NAME = "order-join-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String ADDRESS_TABLE = "address"; private static String ORDER_STREAM = "order"; private static String ORDER_JOIN_STREAM = "order_join"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); // address 토픽을 KTable로 가져올 때는 table() 메서드를 소스 프로세서로 사용하면 된다. KTable<String, String> addressTable = builder.table(ADDRESS_TABLE); // order 토픽은 KStream으로 가져올 것이므로 stream() 메서드를 소스 프로세서로 사용된다. KStream<String, String> orderStream = builder.stream(ORDER_STREAM); // 조인을 위해 KStream 인스턴스에 정의되어 있는 join() 메서드를 사용한다. // 첫 번째 파라미터로 조인을 수행할 KTable 인스턴스를 넣는다. // KStream과 KTable에서 동일한 메시지 키를 가진 데이터를 찾았을 경우 각각의 메시지 값을 조합해서 어떤 데이터를 만들지 정의한다. // 여기서는 order 토픽의 물품 이름과 address토픽의 주소를 조합하여 새로운 메시지 값을 만든다. orderStream.join(addressTable, (order, address) -> order + " send to " + address) .to(ORDER_JOIN_STREAM); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } } | cs |
- KTable로 사용되는 address 토픽에 이름을 키, 주소를 메시지값으로 전송
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic address \
> --property "parse.key=true" \
> --property "key.separator=:"
>masildog:Seoul
>masilcat:Busan
- KStream으로 사용되는 order 토픽에 이름을 키, 주문 물품을 메시지값으로 전송
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic order \
--property "parse.key=true" \
--property "key.separator=:"
>masildog:iPhone
>masilcat:Galaxy
- order_join 토픽 확인하여 order, adress 각각 토픽의 조인된 데이터가 있는지 확인
$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic order_join \
> --property print.key=true \
> --property key.separator=":" \
> --from-beginning
masilcat:Galaxy send to Busan
masildog:iPhone send to Seoul
KTable에 존재하는 메시지 키를 기준으로 KStream이 데이터를 조인하여
order_join 토픽에서는 물품과 주소 데이터가 합쳐진 것을 볼 수 있다.
조인할 때 사용했던 메시지 키는 조인이 된 데이터의 메시지 키로 들어간다.
사용자 주소가 변경되는 경우엔 KTable은 동일한 메시지 키가 들어올 경우
가장 마지막 레코드를 유효한 데이터로 보기 때문에 가장 최근에 바뀐 주소로 조인을 수행한다.
현재 masildog 사용자가 Seoul 주소를 가지고 있는데, Jeju 로 바꾸어 추가해보자.
그리고 order 토픽에 masildog 사용자가 물품을 주문하도록 order에 레코드를 추가한다.
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic address \
--property "parse.key=true" \
--property "key.separator=:"
>masildog:Jeju
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic order \
> --property "parse.key=true" \
> --property "key.separator=:"
>masildog:Tesla
- order_join 토픽 확인 시 신규 주소 확인
$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic order_join \
--property print.key=true \
--property key.separator=":" \
--from-beginning
masildog:Tesla send to Jeju
반응형
'Programming > Apache Kafka' 카테고리의 다른 글
아파치 카프카 실습 (Apache Kafka) / 프로세서 API 활용하기 (0) | 2023.01.25 |
---|---|
아파치 카프카 실습 (Apache Kafka) / GlobalKTable과 KStream 조인하기 (0) | 2023.01.25 |
스트림즈DSL로 간단하게 데이터 주고받기 - 카프카스트림즈(KafkaStreams) with stream-filter (0) | 2023.01.17 |
[Kafka 연결 오류] broker id, cluster id가 달라 연결에 실패하는 경우 (0) | 2022.12.19 |
아파치 카프카 실습 (Apache Kafka) / JAVA로 프로듀서 생성 및 데이터 보내기 (0) | 2022.12.18 |