본문 바로가기

Programming/Apache Kafka

아파치 카프카 실습 (Apache Kafka) / 카프카 컨슈머 - 상세 개념 (멀티스레드 컨슈머)

반응형

멀티 스레드 컨슈머

파티션을 여러 개로 운영하는 경우 병렬처리를 위해 파티션 개수와 컨슈머 개수를 동일하게 맞추는 것이 좋은 방법이다.

토픽의 파티션은 1개 이상으로 이루어져 있으며 1개의 파티션은 1개 컨슈머가 할당되어 데이터를 처리할 수 있다.

파티션 개수가 n개라면 동일 컨슈머 그룹으로 묶인 컨슈머 스레드를 최대 n개 운영할 수 있다.

컨슈머를 멀티 스레드로 활용하는 방식은 크게 두 가지로 나눈다.

 

  • 멀티 워커 스레드
  • 컨슈머 멀티 스레드 전략

 

 

1. 카프카 컨슈머 멀티 워커 스레드 전략

데이터를 for 반복구문으로 처리할 경우 이전 레코드의 처리가 끝날 때까지 다음 레코드는 기다리게 된다.

만약 레코드별로 처리해야 하는 시간이 길 경우 더욱 오래 기다리게 디므로 처리 속도는 느려진다.

멀티 스레드를 사용하면 각기 다른 레코드들의 데이터 처리를 동시에 실행할 수 있기 때문에 처리 시간을 현저히 줄일 수 있다.

멀티 스레드를 생성하는 ExecutorService 자바 라이브러리를 사용하면 레코드를 병렬처리하는 스레드를 효율적으로 생성하고 관리할 수 있다.

ExcutorService를 사용하여 스레드 개수를 제어하는 스레드 풀을 생성할 수 있다.

작업 이후 스레드가 종료되어야 한다면 CachedThreadPool을 사용하여 스레드를 실행한다.

 

# 사용자 지정 스레드 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
public class ConsumerWorker implements Runnable{
    private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
    private String recordValue;
 
    ConsumerWorker(String recordValue) {
        this.recordValue = recordValue;
    }
 
    @Override
    public void run() {
        logger.info("thread:{}\trecord:{}", Thread.currentThread().getName(), recordValue);
    }
}
cs

 

Runnable 인터페이스로 구현한 ConsumerWorker 클래스는 스레드로 실행되며, 오버라이드된 run() 메서드가 실행된다.

run() 메서드에는 데이터를 처리할 구문이 들어가면 되는데, 여기선 thread이름과 record 클래스를 로그로 출력하도록 작성하였다.

 

# 스레드 호출하는 구문 생성 후 작동

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class ConsumerWithMultiWorkerThread {
    private final static Logger logger = LoggerFactory.getLogger(ConsumerWithMultiWorkerThread.class);
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private final static String GROUP_ID = "test-group";
 
    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000);
 
        KafkaConsumer<StringString> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
        ExecutorService executorService = Executors.newCachedThreadPool();
        while (true) {
            ConsumerRecords<StringString> records = consumer.poll(Duration.ofSeconds(10));
            for (ConsumerRecord<StringString> record : records) {
                ConsumerWorker worker = new ConsumerWorker(record.value());
                executorService.execute(worker);
            }
        }
    }
}
cs

 

 

# 프로듀서로 데이터 전송

$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic test
>hello
>kafka
>world

 

 

# 콘솔 확인

 

 

단어별로 전송하여 여러 개의 스레드가 데이터를 처리한 것을 로그를 통해 확인할 수 있다.

 

스레드를 사용하면 한번 poll()을 통해 받은 데이터를 병렬처리함으로써 속도의 이점을 확실히 얻을 수 있다.

그러나 주의해야 하는 사항이 있다.

첫째는 스레드를 사용함으로써 데이터 처리가 끝나지 않았음에도 불구하고 커밋을 하기 때문에 리밸런싱, 컨슈머 장애 시에 데이터 유실이 발생할 수 있다.

이번에 구현한 코드는 각 레코드의 데이터 처리가 끝났음을 스레드로부터 리턴받지 않고 바로 그 다음 poll() 메서드를 호출한다.

오토 커밋일 경우 데이터 처리가 스레드에서 진행 중임에도 불구하고 다음 poll() 메서드 호출 시 커밋을 할 수 있기 때문에 발생하는 현상이다. 

두 번째는 레코드 처리의 역전현상이다. 

for 반복구문으로 스레드를 생성하므로 레코드별로 스레드의 생성은 순서대로 진행된다.

그러나 스레드의 처리 시간은 다를 수 있다.

나중에 생성된 스레드의 레코드 처리 시간이 더 짧을 경우 이전 레코드가 다음 레코드보다 나중에 처리될 수 있다.

이로 인해 레코드의 순서가 뒤바뀌는 현상이 발생할 수 있다.

중복 및 역전현상이 발생해도 매우 빠른 처리속도가 필요한 데이터 처리에 적합하다.

서버 리소스(CPU, 메모리 등) 모니터링 파이프라인, IoT 서비스의 센서 데이터 수집 파으프라인이 그 예이다.

 

 

1. 카프카 컨슈머 멀티 스레드 전략

하나의 파티션은 동일 컨슈머 중 최대 1개까지 할당된다.

그리고 하나의 컨슈머는 여러 파티션에 할당될 수 있다.

1개의 애플리케이션에 구독하고자 하는 토픽의 파티션 개수만큼 컨슈머 스레드 개수를 늘려서 운영하는 것이 이 특징이다.

컨슈머 스레드를 늘려서 운영하면 각 스레드에 각 파티션이 할당되며, 파티션의 레코드들을 병렬처리할 수 있다.

 

 

# 컨슈머 스레드 클래스 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ConsumerWorker implements Runnable {
    private final static Logger logger = LoggerFactory.getLogger(ConsumerWorker.class);
    private Properties prop;
    private String topic;
    private String threadName;
    private KafkaConsumer<StringString> consumer;
 
    // KafkaConsumer 인스턴스를 생성하기 위해 필요한 변수를 컨슈머 스레드 생성자 변수로 받는다.
    // 카프카 컨슈머 옵션을 담는 Properties 클래스와 토픽이름, 스레드 번호를 변수로 받았다.
    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }
 
    @Override
    public void run() {
        // KafkaConsumer 클래스는 스레드 세이프하지 않다.
        // 스레드별로 KafkaConsumer 인스턴스를 별개로 만들어야 한다.
        consumer = new KafkaConsumer<>(prop);
        // 생성자 변수로 받은 토픽을 명시적으로 구독하기 시작한다.
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            // poll() 메서드를 통해 리턴받은 레코드들을 처리한다.
            ConsumerRecords<StringString> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<StringString> record : records) {
                logger.info("{}", record);
            }
        }
    }
}
cs

 

 

# 멀티 컨슈머 스레드를 가진 애플리케이션 생성 및 실행

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MultiConsumerThread {
 
    private final static String TOPIC_NAME = "test";
    private final static String BOOTSTRAP_SERVERS = "my-kafka:9092";
    private final static String GROUP_ID = "test-group";
    // 스레드 개수를 지정하기 위해 int 변수에 생성할 스레드 개수를 담았다.
    private final static int CONSUMER_COUNT = 3;
 
    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
 
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            // 3개 컨슈머 스레드를 execute() 메서드를 통해 실행한다.
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            executorService.execute(worker);
        }
    }
}
cs

 

 

# 프로듀서로 데이터 전송

bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 \
--topic test
> hi
>goodnight
>kafka
>bro

 

 

로그를 통해 스레드 3개가 각기 다른 파티션으로부터 레코드를 받은 내역을 확인할 수 있다.

 

 

해당 내용 출처 아파치 카프카 애플리케이션 프로그래밍 with 자바

반응형