Kafka

Kafka Kraft Mode 구축

marcel 2025. 4. 10. 10:25

개요

Kafka Cluster의 기본적인 설치 과정

구성

  • 온프레미스 환경을 기준으로 구성
  • Java : openjdk version "17.0.13" 2024-10-15 LTS
  • VM : 7 대 (Broker : 3, Controller : 3 / Monitoring VM : prometheus, grafana, UI For Kafka )
Controller (KRaft) 3대 kaf04,kaf05,kaf07 메타데이터 관리. ZooKeeper 없이 운영
Broker 3대 kaf01,kaf02,kaf03 메시지 송수신 및 저장
Monitoring 1대 mon1 Prometheus, Grafana, UI for kafka 등 구성 가능

설정 

Controller VM

controller.properties

  • 카프카 컨트롤러를 위한 설정

vim /data/kafka_2.13-3.8.0/config/kraft/controller.properties

# config/kraft/controller.properties
############################# Server Basics #############################
 
# The role of this server. Setting this puts us in KRaft mode
process.roles=controller
 
# The node id associated with this instance's roles
node.id=4
 
# The connect string for the controller quorum
# controller.quorum.voters=1@localhost:9093
controller.quorum.voters=4@10.65.41.105:9093,5@10.65.41.106:9093,6@10.65.41.107:9093
 
############################# Socket Server Settings #############################
listeners=CONTROLLER://10.65.41.105:9093
 
# This is required if running in KRaft mode.
controller.listener.names=CONTROLLER
 
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
listener.security.protocol.map=CONTROLLER:PLAINTEXT
 
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
 
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
 
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
 
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
 
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
 
############################# Log Basics #############################
log.dirs=/data/kafka
 
# The default number of log partitions per topic. More partitions allow greater
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
num.recovery.threads.per.data.dir=1
 
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
############################ Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
 
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
#log.retention.bytes=1073741824
 
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
 
# The interval at which log segments are checked to see if they can be deleted according
log.retention.check.interval.ms=300000
설정들
process.roles controller
  • 이 노드는 KRaft 기반의 컨트롤러로 동작함
node.id 4
  • 이 컨트롤러 노드의 고유 ID 
  • Controller, Broker 구분 없이 전체 Cluster에서의 ID
controller.quorum.voters 4@10.65.41.105:9093,5@10.65.41.106:9093,6@10.65.41.107:9093
  • KRaft 모드의 컨트롤러 클러스터(Quorum) 구성
  • 여기서는 총 3대 컨트롤러 노드를 구성하며, node.id@ip:port 형식
listeners CONTROLLER://10.65.41.105:9093
  • 컨트롤러 REST 또는 내부 통신용 Listener 설정
  • CONTROLLER라는 이름으로 바인딩
controller.listener.names CONTROLLER
  • 컨트롤러가 어떤 Listener를 사용할 것인지 명시
listener.security.protocol.map CONTROLLER:PLAINTEXT
  • CONTROLLER Listener는 암호화되지 않은 PLAINTEXT 프로토콜 사용
  • SSL이나 SASL 안 쓰는 경우 기본값
num.network.threads=
num.io.threads
3
8
  • Kafka 내부에서 네트워크 및 디스크 처리용 스레드 수
  • 일반적으로 컨트롤러는 큰 부하가 없기 때문에 기본값 유지해도 됨
socket.send.buffer.bytes
102400  
socket.receive.buffer.bytes
102400  
socket.request.max.bytes 104857600
  • 소켓 통신 시 사용되는 버퍼 크기 및 요청 최대 크기
log.dirs /data/kafka
  • 로그 및 내부 메타데이터 저장 경로 (데이터 Dir)
  • KRaft 모드에서는 **메타데이터 로그 (metadata.log)**도 이 경로에 저장됨
log.retention.hours 168 로그 보존 시간 (7일)
log.segment.bytes 1073741824 로그 파일이 1GB를 넘으면 새 segment 생성
log.retention.check.interval.ms 300000 5분마다 로그 보존 조건 확인 후 삭제 여부 판단

JMX Monitoring Agent 추가

  • Prometheus로 Kafka 지표를 모니터링 하기 위해 JMX 수집 Agent를 구성한다. 

vim /data/kafka_2.13-3.8.0/bin/kafka-server-start.sh

# [root@kaf02 kafka_2.13-3.8.0]# vim bin/kafka-server-start.sh
,,,
COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac
 
export KAFKA_OPTS="-javaagent:/data/kafka_2.13-3.8.0/mon/jmx_prometheus_javaagent-1.0.1.jar=7071:/data/kafka_2.13-3.8.0/mon/kafka-2_0_0.yml" # JMX Monoitoring 추가
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
[root@kaf02 mon]# ls /data/kafka_2.13-3.8.0/mon
jmx_prometheus_javaagent-1.0.1.jar  kafka-2_0_0.yml  kafka_exporter  node_exporter  node_exporter-1.6.1.linux-386.tar.gz start_exporter.txt

Controller 실행

  • Broker보다 먼저 설정한 노드를 전부 기동 (컨트롤러 내에서는 순서 상관X)

/data/kafka_2.13-3.8.0/bin/kafka-server-start.sh -daemon /data/kafka_2.13-3.8.0/config/kraft/controller.properties

Broker VM

Broker Properties

vim /data/kafka_2.13-3.8.0/config/server.properties

# config/server.properties
############################# Server Basics #############################
 
# The id of the broker. This must be set to a unique integer for each broker.
# broker.id=1
node.id=1
 
process.roles=broker
 
############################# Socket Server Settings #############################
listeners=PLAINTEXT://10.65.41.102:9092,SSL://10.65.41.102:19092
advertised.listeners=PLAINTEXT://kaf01.hjchae.com:9092,SSL://kaf01.hjchae.com:19092
 
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
 
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8
 
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400
 
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400
 
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=1048576000
############################# Log Basics #############################
 
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka
 
# The default number of log partitions per topic. More partitions allow greater
num.partitions=1
 
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
num.recovery.threads.per.data.dir=1
 
############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
 
############################# Log Retention Policy #############################
# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168
 
# The interval at which log segments are checked to see if they can be deleted according
log.retention.check.interval.ms=300000
 
controller.quorum.voters=4@10.65.41.105:9093,5@10.65.41.106:9093,6@10.65.41.107:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL
 
############################# Group Coordinator Settings #############################
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
group.initial.rebalance.delay.ms=0
항목설정 값설명
node.id 1 이 브로커의 고유 ID. KRaft 모드에서 broker.id 대신 사용
process.roles broker 이 노드를 브로커 역할로 지정. 컨트롤러는 아님
listeners PLAINTEXT://10.65.41.102:9092,SSL://10.65.41.102:19092 브로커가 클라이언트 요청을 수신할 Listener (내부 IP 기반)
advertised.listeners PLAINTEXT://kaf01.hjchae.com:9092,SSL://kaf01.hjchae.com:19092 외부 클라이언트에게 브로커를 노출할 주소 (DNS 기반으로 접근 유도)
listener.security.protocol.map CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL 각 Listener 이름별 보안 프로토콜 지정
controller.quorum.voters 4@10.65.41.105:9093,... 메타데이터를 관리하는 컨트롤러 클러스터의 구성. 브로커는 여기 참여하지 않음
controller.listener.names CONTROLLER 브로커가 연결할 컨트롤러 listener 이름 (KRaft 모드 필수)
num.network.threads 3 Kafka 브로커의 네트워크 처리 스레드 수
num.io.threads 8 Kafka 브로커의 I/O 처리용 스레드 수 (디스크 포함)
socket.send.buffer.bytes 102400 소켓 전송 버퍼 크기
socket.receive.buffer.bytes 102400 소켓 수신 버퍼 크기
socket.request.max.bytes 1048576000 수신 가능한 최대 요청 크기 (약 1GB)
log.dirs /data/kafka Kafka 로그 (데이터) 저장 경로. 토픽 데이터가 저장됨
num.partitions 1 토픽 생성 시 기본 파티션 수
num.recovery.threads.per.data.dir 1 Kafka 재시작 시 로그 복구용 스레드 수
offsets.topic.replication.factor 1 __consumer_offsets 토픽 복제 수. 운영 환경에서는 3 권장
transaction.state.log.replication.factor 1 트랜잭션 상태 로그의 복제 수
transaction.state.log.min.isr 1 트랜잭션 로그 처리 시 필요한 최소 ISR 수
log.retention.hours 168 로그 보존 시간. 7일 (기본)
log.retention.check.interval.ms 300000 5분마다 로그 파일을 검사하여 삭제 조건 만족 시 제거
group.initial.rebalance.delay.ms 0 컨슈머 그룹의 리밸런싱 초기 지연 시간 (즉시 리밸런싱)

JMX Monitoring Agent 추가

  • Prometheus로 Kafka 지표를 모니터링 하기 위해 JMX 수집 Agent를 구성한다. 

vim /data/kafka_2.13-3.8.0/bin/kafka-server-start.sh

# [root@kaf02 kafka_2.13-3.8.0]# vim bin/kafka-server-start.sh
,,,
COMMAND=$1
case $COMMAND in
  -daemon)
    EXTRA_ARGS="-daemon "$EXTRA_ARGS
    shift
    ;;
  *)
    ;;
esac
 
export KAFKA_OPTS="-javaagent:/data/kafka_2.13-3.8.0/mon/jmx_prometheus_javaagent-1.0.1.jar=7071:/data/kafka_2.13-3.8.0/mon/kafka-2_0_0.yml" # JMX Monoitoring 추가
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
[root@kaf02 mon]# ls /data/kafka_2.13-3.8.0/mon
jmx_prometheus_javaagent-1.0.1.jar  kafka-2_0_0.yml  kafka_exporter  node_exporter  node_exporter-1.6.1.linux-386.tar.gz start_exporter.txt

Broker 실행

  • Controller가 모두 정상적으로 기동된 후 기동

/data/kafka_2.13-3.8.0/bin/kafka-server-start.sh -daemon /data/kafka_2.13-3.8.0/config/server.properties

Monitoring VM

  • Prometheus, Grafana 구성에 대한 설명

Kafka Exporter

  • 각 카프카 클러스터 노드에 설치하여 지표 탐색

[root@kaf02 mon]# ls /data/kafka_2.13-3.8.0/mon
 kafka_exporter node_exporter 

실행

nohup /data/kafka_2.13-3.8.0/mon/kafka_exporter &

 

Prometheus 설정

# [root@tool prometheus-2.53.2.linux-amd64]# vim prometheus.yml
global:
  scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).
 
# Alertmanager configuration
alerting:
  alertmanagers:
    - static_configs:
        - targets:
          # - alertmanager:9093
 
# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"
 
# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
# scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  #  - job_name: "prometheus"
    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.
 
        #    static_configs:
        #      - targets: ["localhost:9090"]
scrape_configs:
    # The job name is added as a label job=<job_name> to any timeseries scraped from this config.
  - job_name: 'kafka'
 
      # metrics_path defaults to '/metrics'
      # scheme defaults to 'http'.
 
    static_configs:
      - targets:
        - '10.65.41.102:9308' # kafka_exporter
        - '10.65.41.102:7071' # JMX exporter
        - '10.65.41.103:9308' # kafka_exporter
        - '10.65.41.103:7071' # JMX exporter
        - '10.65.41.104:9308' # kafka_exporter
        - '10.65.41.104:7071' # JMX exporter

 

실행

nohup ./prometheus --config.file=/root/prometheus-2.53.2.linux-amd64/prometheus.yml &

 

Grafana

RPM 설치

  • 아래 경로에서 파일 다운로드