Kafka
Kafka Kraft Mode 구축
marcel
2025. 4. 10. 10:25
개요
Kafka Cluster의 기본적인 설치 과정
구성
- 온프레미스 환경을 기준으로 구성
- Java : openjdk version "17.0.13" 2024-10-15 LTS
- VM : 7 대 (Broker : 3, Controller : 3 / Monitoring VM : prometheus, grafana, UI For Kafka )
Controller (KRaft) | 3대 | kaf04,kaf05,kaf07 | 메타데이터 관리. ZooKeeper 없이 운영 |
Broker | 3대 | kaf01,kaf02,kaf03 | 메시지 송수신 및 저장 |
Monitoring | 1대 | mon1 | Prometheus, Grafana, UI for kafka 등 구성 가능 |
설정
Controller VM
controller.properties
- 카프카 컨트롤러를 위한 설정
vim /data/kafka_2.13-3.8.0/config/kraft/controller.properties
# config/kraft/controller.properties
############################# Server Basics #############################
# The role of this server. Setting this puts us in KRaft mode
process.roles=controller
# The node id associated with this instance's roles
node.id=4
# The connect string for the controller quorum
# controller.quorum.voters=1@localhost:9093
controller.quorum.voters=4@10.65.41.105:9093,5@10.65.41.106:9093,6@10.65.41.107:9093
############################# Socket Server Settings #############################
listeners=CONTROLLER://10.65.41.105:9093
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
############################# Log Basics #############################
log.dirs=/data/kafka
# The default number of log partitions per topic. More partitions allow greater
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################ Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
log.retention.check.interval.ms=300000
|
설정들
process.roles | controller |
|
node.id | 4 |
|
controller.quorum.voters | 4@10.65.41.105:9093,5@10.65.41.106:9093,6@10.65.41.107:9093 |
|
listeners | CONTROLLER://10.65.41.105:9093 |
|
controller.listener.names | CONTROLLER |
|
listener.security.protocol.map | CONTROLLER:PLAINTEXT |
|
num.network.threads= num.io.threads |
3 8 |
|
socket.send.buffer.bytes |
102400 | |
socket.receive.buffer.bytes |
102400 | |
socket.request.max.bytes | 104857600 |
|
log.dirs | /data/kafka |
|
log.retention.hours | 168 | 로그 보존 시간 (7일) |
log.segment.bytes | 1073741824 | 로그 파일이 1GB를 넘으면 새 segment 생성 |
log.retention.check.interval.ms | 300000 | 5분마다 로그 보존 조건 확인 후 삭제 여부 판단 |
JMX Monitoring Agent 추가
- Prometheus로 Kafka 지표를 모니터링 하기 위해 JMX 수집 Agent를 구성한다.
vim /data/kafka_2.13-3.8.0/bin/kafka-server-start.sh
# [root@kaf02 kafka_2.13-3.8.0]# vim bin/kafka-server-start.sh
,,,
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
export KAFKA_OPTS="-javaagent:/data/kafka_2.13-3.8.0/mon/jmx_prometheus_javaagent-1.0.1.jar=7071:/data/kafka_2.13-3.8.0/mon/kafka-2_0_0.yml" # JMX Monoitoring 추가
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
|
- 아래 경로에서 파일들을 받을 수 있다.
- https://github.com/prometheus/jmx_exporter
- https://github.com/prometheus/jmx_exporter/blob/main/examples/kafka-2_0_0.yml
[root@kaf02 mon]# ls /data/kafka_2.13-3.8.0/mon
jmx_prometheus_javaagent-1.0.1.jar kafka-2_0_0.yml kafka_exporter node_exporter node_exporter-1.6.1.linux-386.tar.gz start_exporter.txt
|
Controller 실행
- Broker보다 먼저 설정한 노드를 전부 기동 (컨트롤러 내에서는 순서 상관X)
/data/kafka_2.13-3.8.0/bin/kafka-server-start.sh -daemon /data/kafka_2.13-3.8.0/config/kraft/controller.properties
Broker VM
Broker Properties
vim /data/kafka_2.13-3.8.0/config/server.properties
# config/server.properties
############################# Server Basics #############################
# The id of the broker. This must be set to a unique integer for each broker.
# broker.id=1
node.id=1
process.roles=broker
############################# Socket Server Settings #############################
listeners=PLAINTEXT://10.65.41.102:9092,SSL://10.65.41.102:19092
advertised.listeners=PLAINTEXT://kaf01.hjchae.com:9092,SSL://kaf01.hjchae.com:19092
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=1048576000
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka
# The default number of log partitions per topic. More partitions allow greater
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
# The interval at which log segments are checked to see if they can be deleted according
log.retention.check.interval.ms=300000
controller.quorum.voters=4@10.65.41.105:9093,5@10.65.41.106:9093,6@10.65.41.107:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
group.initial.rebalance.delay.ms=0
|
항목설정 값설명
node.id | 1 | 이 브로커의 고유 ID. KRaft 모드에서 broker.id 대신 사용 |
process.roles | broker | 이 노드를 브로커 역할로 지정. 컨트롤러는 아님 |
listeners | PLAINTEXT://10.65.41.102:9092,SSL://10.65.41.102:19092 | 브로커가 클라이언트 요청을 수신할 Listener (내부 IP 기반) |
advertised.listeners | PLAINTEXT://kaf01.hjchae.com:9092,SSL://kaf01.hjchae.com:19092 | 외부 클라이언트에게 브로커를 노출할 주소 (DNS 기반으로 접근 유도) |
listener.security.protocol.map | CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL | 각 Listener 이름별 보안 프로토콜 지정 |
controller.quorum.voters | 4@10.65.41.105:9093,... | 메타데이터를 관리하는 컨트롤러 클러스터의 구성. 브로커는 여기 참여하지 않음 |
controller.listener.names | CONTROLLER | 브로커가 연결할 컨트롤러 listener 이름 (KRaft 모드 필수) |
num.network.threads | 3 | Kafka 브로커의 네트워크 처리 스레드 수 |
num.io.threads | 8 | Kafka 브로커의 I/O 처리용 스레드 수 (디스크 포함) |
socket.send.buffer.bytes | 102400 | 소켓 전송 버퍼 크기 |
socket.receive.buffer.bytes | 102400 | 소켓 수신 버퍼 크기 |
socket.request.max.bytes | 1048576000 | 수신 가능한 최대 요청 크기 (약 1GB) |
log.dirs | /data/kafka | Kafka 로그 (데이터) 저장 경로. 토픽 데이터가 저장됨 |
num.partitions | 1 | 토픽 생성 시 기본 파티션 수 |
num.recovery.threads.per.data.dir | 1 | Kafka 재시작 시 로그 복구용 스레드 수 |
offsets.topic.replication.factor | 1 | __consumer_offsets 토픽 복제 수. 운영 환경에서는 3 권장 |
transaction.state.log.replication.factor | 1 | 트랜잭션 상태 로그의 복제 수 |
transaction.state.log.min.isr | 1 | 트랜잭션 로그 처리 시 필요한 최소 ISR 수 |
log.retention.hours | 168 | 로그 보존 시간. 7일 (기본) |
log.retention.check.interval.ms | 300000 | 5분마다 로그 파일을 검사하여 삭제 조건 만족 시 제거 |
group.initial.rebalance.delay.ms | 0 | 컨슈머 그룹의 리밸런싱 초기 지연 시간 (즉시 리밸런싱) |
JMX Monitoring Agent 추가
- Prometheus로 Kafka 지표를 모니터링 하기 위해 JMX 수집 Agent를 구성한다.
vim /data/kafka_2.13-3.8.0/bin/kafka-server-start.sh
# [root@kaf02 kafka_2.13-3.8.0]# vim bin/kafka-server-start.sh
,,,
COMMAND=$1
case $COMMAND in
-daemon)
EXTRA_ARGS="-daemon "$EXTRA_ARGS
shift
;;
*)
;;
esac
export KAFKA_OPTS="-javaagent:/data/kafka_2.13-3.8.0/mon/jmx_prometheus_javaagent-1.0.1.jar=7071:/data/kafka_2.13-3.8.0/mon/kafka-2_0_0.yml" # JMX Monoitoring 추가
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
|
- 아래 경로에서 파일들을 받을 수 있다.
- https://github.com/prometheus/jmx_exporter
- https://github.com/prometheus/jmx_exporter/blob/main/examples/kafka-2_0_0.yml
[root@kaf02 mon]# ls /data/kafka_2.13-3.8.0/mon
jmx_prometheus_javaagent-1.0.1.jar kafka-2_0_0.yml kafka_exporter node_exporter node_exporter-1.6.1.linux-386.tar.gz start_exporter.txt
|
Broker 실행
- Controller가 모두 정상적으로 기동된 후 기동
/data/kafka_2.13-3.8.0/bin/kafka-server-start.sh -daemon /data/kafka_2.13-3.8.0/config/server.properties
Monitoring VM
- Prometheus, Grafana 구성에 대한 설명
Kafka Exporter
- 각 카프카 클러스터 노드에 설치하여 지표 탐색
- Kafka 토픽에 대한 정보를 수집하는 Agent
- 데이터 송수신 용량, 속도, 시간
- 토픽 당 복제, 파티션 수
- 컨슈머 지표 등
- https://github.com/danielqsj/kafka_exporter
[root@kaf02 mon]# ls /data/kafka_2.13-3.8.0/mon
kafka_exporter node_exporter
실행
nohup /data/kafka_2.13-3.8.0/mon/kafka_exporter &
|
Prometheus 설정
# [root@tool prometheus-2.53.2.linux-amd64]# vim prometheus.yml
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).
# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
# scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
# - job_name: "prometheus"
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
# static_configs:
# - targets: ["localhost:9090"]
scrape_configs:
# The job name is added as a label job=<job_name> to any timeseries scraped from this config.
- job_name: 'kafka'
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets:
- '10.65.41.102:9308' # kafka_exporter
- '10.65.41.102:7071' # JMX exporter
- '10.65.41.103:9308' # kafka_exporter
- '10.65.41.103:7071' # JMX exporter
- '10.65.41.104:9308' # kafka_exporter
- '10.65.41.104:7071' # JMX exporter
|
실행
nohup ./prometheus --config.file=/root/prometheus-2.53.2.linux-amd64/prometheus.yml &
|
Grafana
RPM 설치
- 아래 경로에서 파일 다운로드