반응형
1. 단일 모드 커넥트
단일 모드 커넥트를 실행하기 위해서는 단일 모드 커넥트를 참조하는 설정 파일인 connect-standalone.properties 파일을 수정해야 한다.
해당 파일은 카프카 바이너리 디렉토리의 config 디렉토리에 있다.
# 커넥트와 연동할 카프카 클러스터의 호스트와 포트 설정
# 2개 이상일땐 콤마로 구분하여 설정
bootstrap.servers=my-kafka:9092
# 데이터를 카프카에 저장할 때나 가져올 때 변환할 때 사용
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# 스키마 형태를 사용하고 싶지 않다면 enable 옵션을 false로 설정
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 태스크가 처리 완료한 오프셋을 커밋하는 주기를 설정
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
# 플러그인 형태로 추가할 커넥터의 디렉토리 주소를 입력
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=
단일 모드 커넥트는 커넥트 설정파일과 함께 커넥터 설정파일도 정의하여 실행해야 한다.
# 파일 소스 커넥트
파일 소스 커넥터는 특정 위치에 있는 파일을 읽어서 토픽으로 데이터를 저장하는 데에 사용하는 커넥터이다.
파일 소스 커넥터 설정파일은 카프카 바이너리가 설치된 디렉토리의 config 디렉토리에 connect-file-source-properties로 저장되어 있다.
#커넥터의 이름 지정, 유일한 값을 가져야 한다.
name=local-file-source
#사용할 커넥터의 클래스 이름 지정
connector.class=FileStreamSource
#커넥터로 실행할 태스크 개수 지정
tasks.max=1
#읽을 파일의 위치 지정
file=/tmp/test.txt
#읽은 파일의 데이터를 저장할 토픽의 이름 지정
topic=connect-test
# 단일 모드 커넥트 실행 (로컬)
단일 모드 커넥트를 실행 시에 파라미터로 커넥트 설정파일과 커넥터 설정파일을 차례로 넣어 실행한다.
$ bin/connect-standalone.sh config/connect-standalone.properties \
config/connect-file-source.properties
2. 분산 모드 커넥트
분산 모드 커넥트는 단일 모드 커넥트와 다르게 2개 이상의 프로세스가 1개의 그룹으로 묶여 운영된다.
1개의 커넥트 프로세스에 이슈 발생하여도 살아있는 나머지 1개 커넥트 프로세스가 이어받아 파이프라인을 지속적으로 실행할 수 있다.
connect-distributed.properties 를 아래와 같이 설정한다.
bootstrap.servers=my-kafka:9092
# 다수의 커넥트 프로세스들을 묶을 그룹 이름 지정.
# 동일한 group.id로 지정된 커넥트들은 같은 그룹으로 인식한다.
# 같은 그룹으로 지정된 커넥트들에서 커넥터가 실행되면 커넥트들에 분산되어 실행된다.
# 한 대에 이슈가 발생하더라도 나머지 커넥트가 커넥터를 안전하게 실행할 수 있다.
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
# 분산 모드 커넥트는 카프카 내부 토픽에 오프셋 정보를 저장한다.
# 오프셋 정보는 소스 커넥터 또는 싱크 커넥터가 데이터 처리 시점을 저장하기 위해 사용한다.
# 해당 정보는 데이터를 처리하는 데에 있어 중요한 역할을 하므로 실제로 운영할 때는 복제 개수를 3보다 큰 값으로 설정하는 것이 좋다.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
# 태스크가 처리 완료한 오프셋을 커밋하는 주기를 설정한다.
offset.flush.interval.ms=10000
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# 분산 모드 커넥트 실행 (로컬)
$ bin/connect-distributed.sh config/connect-distributed.properties
반응형
'Programming > Apache Kafka' 카테고리의 다른 글
아파치 카프카 실습 (Apache Kafka) / 카프카 프로듀서 - 상세 개념 (acks, 멱등성 프로듀서, 트랜잭션 프로듀서) (0) | 2023.01.31 |
---|---|
아파치 카프카 실습 (Apache Kafka) / 토픽과 파티션 - 상세 개념 (적정 파티션 개수, 메시지 키 사용 여부, 토픽 정리 정책, ISR) (0) | 2023.01.30 |
아파치 카프카 실습 (Apache Kafka) / 카프카 커넥트란 (0) | 2023.01.26 |
아파치 카프카 실습 (Apache Kafka) / 프로세서 API 활용하기 (0) | 2023.01.25 |
아파치 카프카 실습 (Apache Kafka) / GlobalKTable과 KStream 조인하기 (0) | 2023.01.25 |