본문 바로가기

Programming/Apache Kafka

아파치 카프카 실습 (Apache Kafka) / 데이터 주고 받기

반응형

프로듀서 (kafka-console-producer.sh)

생성된 토픽에 데이터를 넣을 수 있는 kafka-console-producer.sh 명령어로 데이터를 넣어보겠다.

토픽에 넣는 데이터는 레코드(record), 메시지 key와 value 값으로 이루어져 있다.

메시지 키 없이 메시지 값만 보내게 될 경우 자바의 null로 기본 설정되어 브로커로 전송된다.

bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka
>hello
>kafka
>0
>1
>2
>3
>4
>5

kafka-console-producer.sh로 전송되는 레코드 값은 UTF-8을 기반으로 Byte변환되고 

ByteArraySerializer로만 직렬화된다.

String이 아닌 타입으로는 직렬화하여 전송할 수 없다.

그러므로 텍스트 목적으로 문자열만 전송할 수 있고, 다른 타입으로 직렬화하여 데이터를 브로커에 전송하고 싶다면

카프카 프로듀서 애플리케이션을 직접 개발해야 한다.

 

kafka-console-producer.sh가 카프카 브로커의 파티션으로 메시지를 전송

 

메시지 키 포함하여 레코드 전송 (로컬)

bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka \
> --property "parse.key=true" \
> --property "key.separator=:"
>key1:no1
>key2:no2
>key3:no3

parse.key=true 는 레코드를 전송할 때 메시지 키를 추가할 수 있다.

key.separator를 선언하지 않으면 기본설정은 Tab delimiter(\t)이다.

separator를 선언하지 않고 메시지를 보내려면 메시지 키 다음 탭을 누른 뒤 값을 작성한다.

 

메시지 키와 메시지 값으로 묶인 레코드가 토픽의 파티션에 전송됨

메시지 키가 null인 경우 프로듀서가 파티션으로 전송할 때 레코드 배치 단위로 라운드로빈으로 전송한다.

메시지 키가 존재하는 경우 키의 해시값을 작성하여 존재하는 파티션 중 한 개에 할당된다.

메시지 키가 동일한 경우 동일한 파티션으로 전송된다.

이런 메시지 키와 파티션 할당은 프로듀서에서 설정된 파티셔너에 의해 결정되는데,

기본 파티셔너의 경우 이와 같은 동작을 보장한다.

 

컨슈머 (kafka-console-consumer.sh)

토픽으로 전송한 데이터는 kafka-console-consumer.sh 명령어로 확인할 수 있다.

필수 옵션으로 클러스터 정보, 토픽이름이 필요하다.

추가로 --from-beginning 옵션을 주면 토픽에 저장된 가장 처음 데이터부터 출력한다.

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic test.kafka \
> --from-beginning
kafka
2
3
0
1
no2
hello

 

메시지 키오 메시지 값을 확인하고 싶다면 --property 옵션을 사용하면 된다.

bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 \
--topic test.kafka \
--property print.key=true \
--property key.separator="-" \
--group test-group \
--from-beginning
null-kafka
null-2
null-3
null-0
null-1
key2-no2
null-hello
null-
null-
key1-no1
null-4
null-5
key3-no3

메시지 키를 확인하기 위해 print.key=true로 설정했다. 기본 설정값은 false이다.

separator를 활용하여 키와 값 사이에 - 가 표시된다. 기본 설정은 tab이다.

--group 옵션을 통해 신규 컨슈머 그룹을 생성했다.

컨슈머 그룹은 1개 이상의 컨슈머로 이루어져 있고, 이 컨슈머 그룹을 통해 가져간 토픽의 메시지는 가져간 메시지에 대해 커밋을 한다.

커밋이란 컨슈머가 특정 레코드까지 처리를 완료했다고 레코드의 오프셋 번호를 카프카 브로커에 저장하는 것이다.

커밋 정보는 _consumer_offsets 이름의 내부 토픽에 저장된다.

메시지 키를 넣지 않은 데이터는 null과 함께 값이 보인다.

여기서 특이한 점은 producer.sh 로 전송했던 데이터의 순서가 현재 출력되는 순서와 다르다는 것이다.

이는 카프카의 핵심인 파티션 개념 대문에 생기는 현상이다.

kafka-console-consumer.sh 명령어를 통해 토픽의 데이터를 가져가게 되면 파티션으로 부터 동일한 중요도로 데이터를 가져간다.

만약 데이터의 순서를 보장하고 싶다면 가장 좋은 방법은 파티션 1개로 구성된 토픽을 만드는 것이다.

한 개의 파티션에서는 데이터 순서를 보장한다.

 

kafka-console-consumer.sh가 파티션으로 메시지를 가져간다

 

 

컨슈머 그룹(kafka-consumer-groups.sh)

위의 예제코드로 test-group 컨슈머 그룹으로 생성된 컨슈머로 test.kafka 토픽의 데이터를 가져갔다.

컨슈머 그룹은 따로 생성하는 명령어 필요없이 컨슈머 동작 시 그룹이름을 지정하면 생성된다.

생성된 컨슈머 그룹의 리스트는 아래 명령어로 확인할 수 있다.

bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 --list
test-group

컨슈머 그룹 이름을 토대로 컨슈머 그룹이 어떤 토픽의 데이터를 가져가는지 확인할 때 쓰인다.

bin/kafka-consumer-groups.sh --bootstrap-server my-kafka:9092 \
--group test-group \
--describe

Consumer group 'test-group' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID             -               -               -
test-group      test.kafka      2          3               3               0               -               -               -
test-group      test.kafka      3          3               3               0               -               -               -
test-group      test.kafka      0          4               4               0               -               -               -
test-group      test.kafka      1          3               3               0               -               -               -
...생략

명령어

--group 을 통해 어떤 그룹을 조회 할 것인지 지정한다.

--describe 옵션을 통해 컨슈머 그룹의 상세 내용을 확인할 수 있다.

 

조회결과

GROUP / TOPIC / PARTITION : 조회한 컨슈머 그룹이 마지막으로 커밋한 토픽과 파티션을 나타낸다.

test-group 컨슈머 그룹이 test.kafka 토픽의 2번 파티션의 레코드가 마지막으로 커밋된 것을 알 수 있다.

CURRENT-OFFSET : 컨슈머 그룹이 가져간 파티션에 가장 최신 오프셋이 몇 번인지 나타낸다.

오프셋이란 파티션의 각 레코드에 할당된 번호다.

이번호는 데이터가 파티션에 들어올때마다 1씩 증가한다.

LOG-END-OFFSET : 해당 컨슈머 그룹의 컨슈머가 어느 오프셋까지 커밋했는지 알 수 있다.

LAG : 컨슈머 그룹이 데이터를 가져가는데 지연 발생 지표

CONSUMER-ID : 컨슈머의 토픽 할당을 카프카 내부적으로 구분하기 위해 사용하는 id이다.

이 값은 client에 uuid 값을 붙여 자동 할당되어 유니크한 값으로 설정된다.

HOST : 컨슈머가 동작하는 host명을 출력한다.

CLIENT-ID : 컨슈머에 할당된 id이다. 사용자가 지정할 수 있으며 지정하지 않으면 자동 생성 된다.

 

 

kafka-verifiable-producer

kafka-verifiable로 시작하는 2개의 스크립트를 사용하면 String 타입 메시지 값을 코드 없이 주고받을 수 있다.

bin/kafka-verifiable-producer.sh --bootstrap-server my-kafka:9092 \
> --max-message 10 \
> --topic verify-test
{"timestamp":1671365107411,"name":"startup_complete"}
{"timestamp":1671365108504,"name":"producer_send_success","key":null,"value":"0","offset":10,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"1","offset":11,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"2","offset":12,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"3","offset":13,"topic":"verify-test","partition":0}
{"timestamp":1671365108520,"name":"producer_send_success","key":null,"value":"4","offset":14,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"5","offset":15,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"6","offset":16,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"7","offset":17,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"8","offset":18,"topic":"verify-test","partition":0}
{"timestamp":1671365108521,"name":"producer_send_success","key":null,"value":"9","offset":19,"topic":"verify-test","partition":0}
{"timestamp":1671365108536,"name":"shutdown_complete"}
{"timestamp":1671365108539,"name":"tool_data","sent":10,"acked":10,"target_throughput":-1,"avg_throughput":8.802816901408452}
sangoh@jeonsang-oui-MacBookPro kafka_2.12-2.5.0 %

 

명령어

--max-messages는 kafka-verifiable-producer.sh로 보내는 데이터 개수를 지정한다.

만약 -1을 옵션값으로 입력하면 kafka-verifiable-producer.sh가 종료될 때까지 계속 데이터를 토픽으로 보낸다.

--topic 명령어를 통해 데이터를 받을 대상 토픽을 입력한다.

 

조회결과

메시지별로 보낸 시간과 메시지 키, 메시지 값, 토픽, 저장된 파티션, 저장된 오프셋 번호가 출력된다.

나는 이미 10번을 넣은 후 포스팅 하며 10번을 더 넣어 offset이 10부터 시작되었다.

 

kafka-verifiable-consumer

bin/kafka-verifiable-consumer.sh --bootstrap-server my-kafka:9092 \
> --topic verify-test \
> --group-id test-group
{"timestamp":1671365408817,"name":"startup_complete"}
{"timestamp":1671365410373,"name":"partitions_assigned","partitions":[{"topic":"verify-test","partition":0}]}
{"timestamp":1671365411152,"name":"records_consumed","count":10,"partitions":[{"topic":"verify-test","partition":0,"count":10,"minOffset":10,"maxOffset":19}]}
{"timestamp":1671365411331,"name":"offsets_committed","offsets":[{"topic":"verify-test","partition":0,"offset":20}],"success":true}

명령어

--topic 명령어를 통해 데이터를 가져올 토픽을 지정한다.

--group-id 명령어를 통해 컨슈머 그룹을 지정한다.

 

조회결과

컨슈머는 한 번에 다수의 메시지를 가져와서 처리하므로 한 번에 10개의 메시지를 정상적으로 받았음을 알 수 있다.

메시지 수신 이후 10번 오프셋 커밋 여부도 확인할 수 있다.

 

kafka-delete-records.sh

이미 적재된 토픽 데이터를 지우는 방법으로 kafka-delete-records.sh를 사용할 수 있다.

적재된 토픽의 가장 오래된 데이터부터 특정 시점의 오프셋까지 삭제할 수 있다.

vi delete-topic.json 
{"partitions": [{"topic": "test", "partition": 0, "offset": 50}], "version":1 }

삭제하고자 하는 데이터에 대한 정보를 파일로 저장하여 사용한다.

 

bin/kafka-delete-records.sh --bootstrap-server my-kafka:9092 \
> --offset-json-file delete-topic.json
Executing records delete operation
Records delete operation completed:
partition: test-0 low_watermark: 50

삭제 토픽,파티션,오프셋에 대한 정보를 담은 위 json 파일은 옵션값으로 입력하면 파일을 읽어서 데이터 삭제를 진행한다.

파티션에 가장 오래된 오프셋부터 지정한 오프셋까지 삭제된다.

 

 

 

 

해당 내용 출처 아파치 카프카 애플리케이션 프로그래밍 with 자바

 

반응형