반응형
스트림즈DSL로 데이터 주고받기 - 카프카스트림즈(KafkaStreams)
1. 필터 없이 순수 데이터 프로듀스 후 컨슘
- 스트림즈 애플리케이션 Class 생성 후 실행
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | public class SimpleStreamApplication { private static String APPLICATION_NAME = "streams-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String STREAM_LOG = "stream_log"; private static String STREAM_LOG_COPY = "stream_log_copy"; public static void main(String[] args) { Properties props = new Properties(); /* 애플리케이션 아이디 지정, 애플리케이션 아이디 값 기준으로 병렬 처리 됨 */ props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); /* 스트림즈 애플리케이션과 연동할 카프카 클러스터 정보 */ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); /* 스트림 처리를 위해 메시지 키,값의 역직렬화, 직렬화 방식 지정, 데이터 처리 시 키, 값을 역직렬화 사용하고 최종적으로 토픽에 넣을 시 직렬화 */ props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); /* 스트림 토폴로지를 정의하기 위한 용도 */ StreamsBuilder builder = new StreamsBuilder(); /* stream_log 토픽으로부터 KStream 객체를 만들기 위해 StreamBuilder의 stream() 메서드 사용 */ KStream<String, String> streamLog = builder.stream(STREAM_LOG); /* stream_log 토픽을 담은 KStream 객체를 다른 토픽으로 전송키 위해 to() 메서드 사용 * to() 메서드는 KStream 인스턴스의 데이터들을 특정 토픽으로 저장하기 위한 용도로 사용된다. * 즉, to() 메서드는 싱크 프로세서이다.*/ streamLog.to(STREAM_LOG_COPY); /* StreamBuilder로 정의한 파라미터로 인스턴스 생성 * 인스턴스 실행하려면 start() 메서드 사용 * 이 스트림즈 애플리케이션은 stream_log 토픽의 데이터를 stream_log_copy 토픽으로 전달한다. */ KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } } | cs |
- stream_log 토픽 생성
$ kafka_2.12-2.5.0 % bin/kafka-topics.sh --create \
--bootstrap-server my-kafka:9092 \
--partitions 3 \
--topic stream_log
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic stream_log.
stream_log 토픽에 데이터 프로듀스
kafka_2.12-2.5.0 % bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic stream_log
>hello
>kafka
>stream
- stream_log_copy 토픽 조회
$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic stream_log_copy
--from-beginning
hello
kafka
stream
stream_log 토픽의 데이터를 stream_log_copy 토픽으로 보낸 것을 확인할 수 있다.
1. steam-filter를 사용하여 5자리 문자이상의 데이터만 추출하기
- 스트림즈 필터 애플리케이션 Class 생성 후 실행
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 | public class StreamsFilter { private static String APPLICATION_NAME = "streams-filter-application"; private static String BOOTSTRAP_SERVERS = "my-kafka:9092"; private static String STREAM_LOG = "stream_log"; private static String STREAM_LOG_FILTER = "stream_log_filter"; public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_NAME); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> streamLog = builder.stream(STREAM_LOG); // 메시지 값 길이가 5보다 큰 경우만 필터링 KStream<String, String> filteredStream = streamLog.filter( (key, value) -> value.length() > 5); filteredStream.to(STREAM_LOG_FILTER); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start(); } } | cs |
stream_log 토픽에 데이터 프로듀스
$ kafka_2.12-2.5.0 % bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic stream_log
>hello
>gogo
>streams
- stream_log_filter 토픽 조회
$ kafka_2.12-2.5.0 % bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic stream_log_filter --from-beginning
stream
streams
앞서 넣은 stream (6자리) 과 방금 넣은 streams (7자리) 가 출력되는 것을 확인할 수 있다.
반응형
'Programming > Apache Kafka' 카테고리의 다른 글
아파치 카프카 실습 (Apache Kafka) / GlobalKTable과 KStream 조인하기 (0) | 2023.01.25 |
---|---|
아파치 카프카 실습 (Apache Kafka) / KTable과 KStream 조인하기 (0) | 2023.01.18 |
[Kafka 연결 오류] broker id, cluster id가 달라 연결에 실패하는 경우 (0) | 2022.12.19 |
아파치 카프카 실습 (Apache Kafka) / JAVA로 프로듀서 생성 및 데이터 보내기 (0) | 2022.12.18 |
아파치 카프카 실습 (Apache Kafka) / 데이터 주고 받기 (0) | 2022.12.18 |