아파치 카프카 실습 (Apache Kafka) / 토픽 생성 및 조회
카프카 커맨드 라인 툴
카프카에서 제공하는 카프카 커맨드 라인 툴들은 카프카 운영 시 가장 많이 접하는 도구다.
이를 이용해 카프카 브로커 운영에 필요한 다양한 명령을 내릴 수 있다.
카프카 클라이언트 애플리케이션을 운영할 때는 클러스터와 연동하여 데이터를 주고 받는 것도 중요하지만
토픽이나 파티션 개수 변경과 같은 명령을 실행해야 하는 경우도 자주 발생한다.
그러므로 카프카 명령어를 손에 익히자
토픽 (kafka-topics.sh)
토픽이란 카프카에서 데이터를 구분하는 가장 기본적인 개념이다.
예를들어 RDBMS에서 사용하는 테이블과 유사하다고 볼 수 있다.
클러스터에 토픽은 여러 개 존재할 수 있다.
토픽 내 파티션은 최소 1개 이상이다.
파티션을 통해 한 번에 처리할 수 있는 데이터양을 늘릴 수 있고
토픽 내부에서도 파티션을 통해 데이터의 종류를 나누어 처리할 수 있다.
토픽 생성 (로컬)
카프카 설치 경로에서 해당 명령어로 토픽을 생성하자.
bin/kafka-topics.sh \
> --create \
> --bootstrap-server my-kafka:9092 \
> --topic test.kafka \
>
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test.kafka.
1. --create 옵션으로 토픽 생성 명령어라는 것을 명시
2. 클러스트를 구성하는 브로커들의 ip와 port 기입
3. --topic에서 토픽 이름을 작성한다. 토픽 이름은 내부 데이터가 무엇이 있는지 유추가 가능할 정도로 자세히 적는다.
4. 클러스터 정보와 토픽 이름은 토픽을 만들기 위한 필수 값이다.
토픽 옵션 설정 후 생성 (로컬)
bin/kafka-topics.sh \
> --create \
> --bootstrap-server my-kafka:9092 \
> --partitions 3 \
> --replication-factor 1 \
> --config retention.ms=172800000 \
> --topic test.kafka.2
WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.
Created topic test.kafka.2.
1. --partitions는 파티션 개수를 지정할 수 있다. (만약 이 옵션을 사용하지 않으면 카프카 브로커 설정파일에 있는 num.partitions 옵션값을 따라 생성한다.
2. --replication-factor에는 토픽의 파티션을 복제할 복제 개수를 적는다. 1은 복제를 하지 않고 사용한다는 의미다. 2면 1개의 복제본을 사용하겠다는 의미이다. 복제 설정을 해두면 한 개의 브로커에 장애가 발생하더라도 나머지 한 개 브로커에 저장된 데이터를 사용하여 안전하게 데이터 처리를 지속적으로 할 수 있다.
3. --config를 통해 kafka-topics.sh 명령에 포함되지 않은 추가적인 설정이 가능하다. retension.ms는 토픽 데이터 유지 기간을 뜻한다. 위와 같은 ms 설정 시 2일 단위로 나타낸 것이며, 2일이 지난 토픽 데이터는 삭제된다.
토픽 리스트 조회 (로컬)
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --list
test.kafka
test.kafka.2
토픽 상세 조회 (로컬)
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --describe --topic test.kafka.2
Topic: test.kafka.2 PartitionCount: 3 ReplicationFactor: 1 Configs: segment.bytes=1073741824,retention.ms=172800000
Topic: test.kafka.2 Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test.kafka.2 Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test.kafka.2 Partition: 2 Leader: 0 Replicas: 0 Isr: 0
--describe 옵션을 사용하여 파티션 개수, 복제 파티션의 위치한 브로커 번호, 기타 구성하는 설정들을 출력한다.
Leader가 0으로 표시되는건 0번 브로커에 위치하고 있음을 뜻한다.
클러스터 운영 시 토픽 리더 파티션이 일부 브로커에 몰려있을 수 있는데, 이를 확인하기 위해 --describe 옵션을 사용할 수 있다.
토픽 옵션 수정
토픽에 설정된 옵션을 변경하기 위해서는 kafka-topics.sh 또는 kafka-configs.sh 두 개를 사용해야 한다.
파티션 개수 변경은 kafka-topics.sh
토픽 삭제 정책인 리텐션 기간 변경은 kafka-confgs.sh
--alter옵션과 --partitions옵션을 사용하여 파티션 개수 변경 (로컬)
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka \
> --alter \
> --partitions 4
파티션을 늘릴 수는 있지만 줄일 수는 없으니 파티션 개수를 늘릴 때는 반드시 늘려야 하는 상황인지 판단이 중요하다.
bin/kafka-topics.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka \
> --describe
Topic: test.kafka PartitionCount: 4 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: test.kafka Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: test.kafka Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: test.kafka Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: test.kafka Partition: 3 Leader: 0 Replicas: 0 Isr: 0
0부터 시작하는 파티션이 4개로 변경되어 3번 파티션이 생성되었다.
--retentions.ms 변경 (로컬)
bin/kafka-configs.sh --bootstrap-server my-kafka:9092 \
> --entity-type topics \
> --entity-name test.kafka \
> --alter --add-config retention.ms=86400000
Completed updating config for topic test.kafka.
retention.ms 수정하기 위해 kafka-configs.sh 와 --alter, --add-config 옵션을 사용해야한다.
--add-config 옵션을 사용하면 이미 존재하는 설정값은 변경하고 존재하지 않는 설정값은 신규로 추가된다.
다음 포스팅에선 프로듀서 및 컨슈머를 통해 토픽에 데이터 전송 및 받아오는 예제 코드를 작성해보자.