본문 바로가기

Programming/Apache Kafka

(17)
아파치 카프카 실습 (Apache Kafka) / 프로세서 API 활용하기 * 프로세서 API 프로세서 API는 스트림즈DSL보다 투박한 코드를 가지지만 토폴로지를 기준으로 데이터를 처리한다는 관점에서는 동일한 역할을 한다. 스트림즈DSL은 데이터 처리, 분기, 조인을 위한 다양한 메서드들을 제공하지만 추가적인 상세 로직의 구현이 필요하다면 프로세서 API를 활용할 수 있다. 프로세서 API에서는 스트림즈DSL에서 사용했던 KStream, KTable, GlobalKTable 개념이 없다는 점을 주의해야 한다. 문자열 5자리 이상 데이터 필터링 실습 스트림즈DSL에서는 filter() 메서드를 스트림 프로세서로 사용해서 구현할 수 있었지만 프로세스API에서 동일한 로직을 구현하기 위해서는 스트림 프로세서 역할을 하는 클래스를 생성해야 한다. 1. 필터링 역할을 위한 스트림 프로..
아파치 카프카 실습 (Apache Kafka) / GlobalKTable과 KStream 조인하기 * 코파티셔닝되어 있지 않은 토픽을 조인해야 할 때 리파티셔닝을 수행한 이후에 코파티셔닝이 된 상태로 조인 처리 KTable로 사용하는 토픽을 GlobalKTable로 선언하여 사용 GlobalKTable 선언하여 실습해보기 1. 파티션 2개로 이루어진 address_v2 토픽 생성 (KStream으로 사용하는 order 토픽은 파티션이 3개이기 대문에 코파티셔닝 되지 않은 상태) $ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \ --partitions 2 \ --topic address_v2 Created topic address_v2. 2. 스트림 프로세싱을 위한 코드를 스트림즈DSL로 작성후 코드 실행 HTML 삽입 미리보기할 수 ..
아파치 카프카 실습 (Apache Kafka) / KTable과 KStream 조인하기 KTable과 KStream 조인하여 데이터 추출하기 KTable과 KStream을 조인할 때 중요한 것은 코파티셔닝 되어 있어야 한다. KTable로 사용할 토픽과 KStream으로 사용할 토픽을 생성할 때 동일한 파티션 개수, 파티셔닝을 사용하는 것이 중요하다. KTable로 사용할 토픽은 address이고, KStream으로 사용할 토픽은 order이다. 조인된 데이터를 저장할 토픽은 order_join 으로 생성한다. $ bin/kafka-topics.sh --create --bootstrap-server my-kafka:9092 \ > --partitions 3 \ > --topic address Created topic address. $ bin/kafka-topics.sh --create -..
스트림즈DSL로 간단하게 데이터 주고받기 - 카프카스트림즈(KafkaStreams) with stream-filter 스트림즈DSL로 데이터 주고받기 - 카프카스트림즈(KafkaStreams) 1. 필터 없이 순수 데이터 프로듀스 후 컨슘 - 스트림즈 애플리케이션 Class 생성 후 실행 HTML 삽입 미리보기할 수 없는 소스 - stream_log 토픽 생성 $ kafka_2.12-2.5.0 % bin/kafka-topics.sh --create \ --bootstrap-server my-kafka:9092 \ --partitions 3 \ --topic stream_log WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to u..
[Kafka 연결 오류] broker id, cluster id가 달라 연결에 실패하는 경우 카프카 실행 후 컨슈머로 데이터를 받아 보려는데 오류가 났다.[2022-12-19 21:00:52,642] WARN [Consumer clientId=consumer-console-consumer-64001-1, groupId=console-consumer-64001] Bootstrap broker my-kafka:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)[2022-12-19 21:00:53,880] WARN [Consumer clientId=consumer-console-consumer-64001-1, groupId=console-consumer-64001] Connection to node -1 (my-..
아파치 카프카 실습 (Apache Kafka) / JAVA로 프로듀서 생성 및 데이터 보내기 토픽 생성 bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --create \ > --topic test \ > --partitions 3 Create topic test. SimpleProducer 코드 작성 메시지 키 없이 전송 public class SimpleProducer { private final static Logger logger = LoggerFactory.getLogger(SimpleProducer.class); // 전송하기 위한 토픽 이름 설정 private final static String TOPIC_NAME = "test"; // 전송하고자하는 카프카 클러스터의 서버의 host, ip private final static Stri..
아파치 카프카 실습 (Apache Kafka) / 데이터 주고 받기 프로듀서 (kafka-console-producer.sh) 생성된 토픽에 데이터를 넣을 수 있는 kafka-console-producer.sh 명령어로 데이터를 넣어보겠다. 토픽에 넣는 데이터는 레코드(record), 메시지 key와 value 값으로 이루어져 있다. 메시지 키 없이 메시지 값만 보내게 될 경우 자바의 null로 기본 설정되어 브로커로 전송된다. bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \ > --topic test.kafka >hello >kafka >0 >1 >2 >3 >4 >5 kafka-console-producer.sh로 전송되는 레코드 값은 UTF-8을 기반으로 Byte변환되고 ByteArraySeriali..
아파치 카프카 실습 (Apache Kafka) / 토픽 생성 및 조회 카프카 커맨드 라인 툴 카프카에서 제공하는 카프카 커맨드 라인 툴들은 카프카 운영 시 가장 많이 접하는 도구다. 이를 이용해 카프카 브로커 운영에 필요한 다양한 명령을 내릴 수 있다. 카프카 클라이언트 애플리케이션을 운영할 때는 클러스터와 연동하여 데이터를 주고 받는 것도 중요하지만 토픽이나 파티션 개수 변경과 같은 명령을 실행해야 하는 경우도 자주 발생한다. 그러므로 카프카 명령어를 손에 익히자 토픽 (kafka-topics.sh) 토픽이란 카프카에서 데이터를 구분하는 가장 기본적인 개념이다. 예를들어 RDBMS에서 사용하는 테이블과 유사하다고 볼 수 있다. 클러스터에 토픽은 여러 개 존재할 수 있다. 토픽 내 파티션은 최소 1개 이상이다. 파티션을 통해 한 번에 처리할 수 있는 데이터양을 늘릴 수 있..

반응형