반응형
* 코파티셔닝되어 있지 않은 토픽을 조인해야 할 때
- 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리
- KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용
GlobalKTable 선언하여 실습해보기
1. 파티션 2개로 이루어진 address_v2 토픽 생성
(KStream으로 사용하는 order 토픽은 파티션이 3개이기 대문에 코파티셔닝 되지 않은 상태)
$ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \
--partitions 2 \
--topic address_v2
Created topic address_v2.
2. 스트림 프로세싱을 위한 코드를 스트림즈DSL로 작성후 코드 실행
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | public class KStreamJoinGlobalKTable { private static String APPLICATION_NAME = "global-table-join-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String ADDRESS_GLOBAL_TABLE = "address_v2"; private static String ORDER_STREAM = "order"; private static String ORDER_JOIN_STREAM = "order_join"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); GlobalKTable<String,String> addressGlobalTable = builder.globalTable(ADDRESS_GLOBAL_TABLE); KStream<String,String> orderStream = builder.stream(ORDER_STREAM); // 이전 예제에서 KTable을 조인할 때 썼던 join() 메서드와 이름이 동일하며 오버로딩 사용한다. orderStream.join(addressGlobalTable, (orderKey, orderValue) -> orderKey, (order, address) -> order + " send to " + address) .to(ORDER_JOIN_STREAM); KafkaStreams streams; streams = new KafkaStreams(builder.build(), props); streams.start(); } } | cs |
3. address_v2 토픽에 이름을 메시지 키, 주소를 메시지 값으로 데이터 넣기
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic address_v2 \
> --property "parse.key=true" \
> --property "key.separator=:"
>masildog:Seoul
>masilcat:Busan
4. KStream으로 사용되는 order 토픽에 이름을 메시지 키, 주문 물품을 메시지 값으로 데이터 넣기
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic order \
> --property "parse.key=true" \
> --property "key.separator=:"
>masilcat:Porshe
>masildog:Benz
5. order_join 토픽을 확인하여 order 토픽과 address 토픽의 조인된 데이터 확인
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic order_join \
> --property print.key=true \
> --property key.separator=":" \
> --from-beginning
masildog:Tesla send to Jeju
masildog:Benz send to Seoul
GlobalKTable에 존재하는 메시지 키를 기준으로 KStream이 제대로 데이터를 조인하여 order_join 토픽에서는 물품과 주소 데이터가 합쳐진 것을 볼 수 있다.
결과물만 보면 KTable과 크게 다르지 않아 보이지만 GlobalKTable로 선언한 토픽은 토픽에 존재하는 모든 데이터를 태스크마다 저장하고 조인 처리를 수행하는 점이 다르다.
그리고 조인을 수행할 때 Stream의 메시지 키뿐만 아니라 메시지 값을 기준으로 매칭하여 조인할 수 있다는 점도 다르다.
반응형
'Programming > Apache Kafka' 카테고리의 다른 글
아파치 카프카 실습 (Apache Kafka) / 카프카 커넥트란 (0) | 2023.01.26 |
---|---|
아파치 카프카 실습 (Apache Kafka) / 프로세서 API 활용하기 (0) | 2023.01.25 |
아파치 카프카 실습 (Apache Kafka) / KTable과 KStream 조인하기 (0) | 2023.01.18 |
스트림즈DSL로 간단하게 데이터 주고받기 - 카프카스트림즈(KafkaStreams) with stream-filter (0) | 2023.01.17 |
[Kafka 연결 오류] broker id, cluster id가 달라 연결에 실패하는 경우 (0) | 2022.12.19 |