프로듀서 (kafka-console-producer.sh)
생성된 토픽에 데이터를 넣을 수 있는 kafka-console-producer.sh 명령어로 데이터를 넣어보겠다.
토픽에 넣는 데이터는 레코드(record), 메시지 key와 value 값으로 이루어져 있다.
메시지 키 없이 메시지 값만 보내게 될 경우 자바의 null로 기본 설정되어 브로커로 전송된다.
bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka
>hello
>kafka
>0
>1
>2
>3
>4
>5
kafka-console-producer.sh로 전송되는 레코드 값은 UTF-8을 기반으로 Byte변환되고
ByteArraySerializer로만 직렬화된다.
String이 아닌 타입으로는 직렬화하여 전송할 수 없다.
그러므로 텍스트 목적으로 문자열만 전송할 수 있고, 다른 타입으로 직렬화하여 데이터를 브로커에 전송하고 싶다면
카프카 프로듀서 애플리케이션을 직접 개발해야 한다.
메시지 키 포함하여 레코드 전송 (로컬)
bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka \
> --property "parse.key=true" \
> --property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3
parse.key=true 는 레코드를 전송할 때 메시지 키를 추가할 수 있다.
key.separator를 선언하지 않으면 기본설정은 Tab delimiter(\t)이다.
separator를 선언하지 않고 메시지를 보내려면 메시지 키 다음 탭을 누른 뒤 값을 작성한다.
메시지 키가 null인 경우 프로듀서가 파티션으로 전송할 때 레코드 배치 단위로 라운드로빈으로 전송한다.
메시지 키가 존재하는 경우 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당된다.
메시지 키가 동일한 경우 동일한 파티션으로 전송된다.
이런 메시지 키와 파티션 할당은 프로듀서에서 설정된 파티셔너에 의해 결정되는데,
기본 파티셔너의 경우 이와 같은 동작을 보장한다.
컨슈머 (kafka-console-consumer.sh)
토픽으로 전송한 데이터는 kafka-console-consumer.sh 명령어로 확인할 수 있다.
필수 옵션으로 클러스터 정보, 토픽이름이 필요하다.
추가로 --from-beginning 옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력한다.
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka \
> --from-beginning
kafka
2
3
0
1
no2
hello
메시지 키오 메시지 값을 확인하고 싶다면 --property 옵션을 사용하면 된다.
bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic test.kafka \
--property print.key=true \
--property key.separator="-" \
--group test-group \
--from-beginning
null-kafka
null-2
null-3
null-0
null-1
key2-no2
null-hello
null-
null-
key1-no1
null-4
null-5
key3-no3
메시지 키를 확인하기 위해 print.key=true로 설정했다. 기본 설정값은 false이다.
separator를 활용하여 키와 값 사이에 - 가 표시된다. 기본 설정은 tab이다.
--group 옵션을 통해 신규 컨슈머 그룹을 생성했다.
컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있고, 이 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 커밋을 한다.
커밋이란 컨슈머가 특정 레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것이다.
커밋 정보는 _consumer_offsets 이름의 내부 토픽에 저장된다.
메시지 키를 넣지 않은 데이터는 null과 함께 값이 보인다.
여기서 특이한 점은 producer.sh 로 전송했던 데이터의 순서가 현재 출력되는 순서와 다르다는 것이다.
이는 카프카의 핵심인 파티션 개념 대문에 생기는 현상이다.
kafka-console-consumer.sh 명령어를 통해 토픽의 데이터를 가져가게 되면 파티션으로 부터 동일한 중요도로 데이터를 가져간다.
만약 데이터의 순서를 보장하고 싶다면 가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는 것이다.
한 개의 파티션에서는 데이터 순서를 보장한다.
컨슈머 그룹(kafka-consumer-groups.sh)
위의 예제코드로 test-group 컨슈머 그룹으로 생성된 컨슈머로 test.kafka 토픽의 데이터를 가져갔다.
컨슈머 그룹은 따로 생성하는 명령어 필요없이 컨슈머 동작 시 그룹이름을 지정하면 생성된다.
생성된 컨슈머 그룹의 리스트는 아래 명령어로 확인할 수 있다.
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --list
test-group
컨슈머 그룹 이름을 토대로 컨슈머 그룹이 어떤 토픽의 데이터를 가져가는지 확인할 때 쓰인다.
bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \
--group test-group \
--describe
Consumer group 'test-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID - - -
test-group test.kafka 2 3 3 0 - - -
test-group test.kafka 3 3 3 0 - - -
test-group test.kafka 0 4 4 0 - - -
test-group test.kafka 1 3 3 0 - - -
...생략
명령어
--group 을 통해 어떤 그룹을 조회 할 것인지 지정한다.
--describe 옵션을 통해 컨슈머 그룹의 상세 내용을 확인할 수 있다.
조회결과
GROUP / TOPIC / PARTITION : 조회한 컨슈머 그룹이 마지막으로 커밋한 토픽과 파티션을 나타낸다.
test-group 컨슈머 그룹이 test.kafka 토픽의 2번 파티션의 레코드가 마지막으로 커밋된 것을 알 수 있다.
CURRENT-OFFSET : 컨슈머 그룹이 가져간 파티션에 가장 최신 오프셋이 몇 번인지 나타낸다.
오프셋이란 파티션의 각 레코드에 할당된 번호다.
이번호는 데이터가 파티션에 들어올때마다 1씩 증가한다.
LOG-END-OFFSET : 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있다.
LAG : 컨슈머 그룹이 데이터를 가져가는데 지연 발생 지표
CONSUMER-ID : 컨슈머의 토픽 할당을 카프카 내부적으로 구분하기 위해 사용하는 id이다.
이 값은 client에 uuid 값을 붙여 자동 할당되어 유니크한 값으로 설정된다.
HOST : 컨슈머가 동작하는 host명을 출력한다.
CLIENT-ID : 컨슈머에 할당된 id이다. 사용자가 지정할 수 있으며 지정하지 않으면 자동 생성 된다.
kafka-verifiable-producer
kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다.
bin/kafka-verifiable-producer.sh --bootstrap-server my-kafka:9092 \
> --max-message 10 \
> --topic verify-test
{"timestamp":1671365107411,"name":"startup_complete"}
{"timestamp":1671365108504,"name":"producer_send_success","key":null,"value":"0","offset":10,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"1","offset":11,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"2","offset":12,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"3","offset":13,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"4","offset":14,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"5","offset":15,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"6","offset":16,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"7","offset":17,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"8","offset":18,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"9","offset":19,"topic":"verify-test","partition":0}
{"timestamp":1671365108536,"name":"shutdown_complete"}
{"timestamp":1671365108539,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":8.802816901408452}
sangoh@jeonsang-oui-MacBookPro kafka_2.12-2.5.0 %
명령어
--max-messages는 kafka-verifiable-producer.sh로 보내는 데이터 개수를 지정한다.
만약 -1을 옵션값으로 입력하면 kafka-verifiable-producer.sh가 종료될 때까지 계속 데이터를 토픽으로 보낸다.
--topic 명령어를 통해 데이터를 받을 대상 토픽을 입력한다.
조회결과
메시지별로 보낸 시간과 메시지 키, 메시지 값, 토픽, 저장된 파티션, 저장된 오프셋 번호가 출력된다.
나는 이미 10번을 넣은 후 포스팅 하며 10번을 더 넣어 offset이 10부터 시작되었다.
kafka-verifiable-consumer
bin/kafka-verifiable-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic verify-test \
> --group-id test-group
{"timestamp":1671365408817,"name":"startup_complete"}
{"timestamp":1671365410373,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1671365411152,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":10,"maxOffset":19}]}
{"timestamp":1671365411331,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":20}],"success":true}
명령어
--topic 명령어를 통해 데이터를 가져올 토픽을 지정한다.
--group-id 명령어를 통해 컨슈머 그룹을 지정한다.
조회결과
컨슈머는 한 번에 다수의 메시지를 가져와서 처리하므로 한 번에 10개의 메시지를 정상적으로 받았음을 알 수 있다.
메시지 수신 이후 10번 오프셋 커밋 여부도 확인할 수 있다.
kafka-delete-records.sh
이미 적재된 토픽 데이터를 지우는 방법으로 kafka-delete-records.sh를 사용할 수 있다.
적재된 토픽의 가장 오래된 데이터부터 특정 시점의 오프셋까지 삭제할 수 있다.
vi delete-topic.json
{"partitions": [{"topic": "test", "partition": 0, "offset": 50}], "version":1 }
삭제하고자 하는 데이터에 대한 정보를 파일로 저장하여 사용한다.
bin/kafka-delete-records.sh --bootstrap-server my-kafka:9092 \
> --offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 50
삭제 토픽,파티션,오프셋에 대한 정보를 담은 위 json 파일은 옵션값으로 입력하면 파일을 읽어서 데이터 삭제를 진행한다.
파티션에 가장 오래된 오프셋부터 지정한 오프셋까지 삭제된다.
'Programming > Apache Kafka' 카테고리의 다른 글
스트림즈DSL로 간단하게 데이터 주고받기 - 카프카스트림즈(KafkaStreams) with stream-filter (0) | 2023.01.17 |
---|---|
[Kafka 연결 오류] broker id, cluster id가 달라 연결에 실패하는 경우 (0) | 2022.12.19 |
아파치 카프카 실습 (Apache Kafka) / JAVA로 프로듀서 생성 및 데이터 보내기 (0) | 2022.12.18 |
아파치 카프카 실습 (Apache Kafka) / 토픽 생성 및 조회 (0) | 2022.12.18 |
아파치 카프카 실습 (Apache Kafka) / EC2 서버 구축, 카프카, 주키퍼 설치하기 (0) | 2022.09.01 |