Kafka

Kafka Zookeeper에서 KRaft로

marcel 2025. 4. 10. 10:50

목표

ZooKeeper + Kafka (ZK broker)클러스터를 KRaft Controller + Kafka (KRaft broker) 클러스터로 전환합니다.

완전히 새로운 KRaft controller를 구성하고 ZK broker와 함께 마이그레이션 모드로 등록하면 ZooKeeper 메타 데이터를 KRaft controller로 마이그레이션 할 수 있습니다.

이점

  • Zookeeper 종속성 제거 : KRaft Mode는 Zookeeper를 사용하지 않고 Kafka Cluster를 운영할 수 있도록 합니다.
  • 단일 클러스터 관리 : Kafka Cluster에서 자체적인 메타데이터 저장소를 가지며 단일 시스템으로 운영할 수 있어 관리 및 운영의 복잡성이 감소합니다.
  • 성능 및 확장성 향상 : 브로커 노드 개수의 증가로 개별 파티션 수의 한계를 크게 확장하여 수백만 개의 파티션을 처리할 수 있습니다.
  • 단일 보안 모델 : KRaft Mode로 마이그레이션하면서 Kafka Cluster에 대한 보안 설정을 통합하여 단일 보안 모델을 사용할 수 있게 됩니다.

절차

0. 주의 사항

  • 해당 버전(3.5)은 프로덕션 환경에서의 마이그레이션은 권장하고 있지 않습니다. 
  • KRaft는 (Broker, Controller) 가 결합된 모드에서 마이그레이션이 진행되지 않습니다.
  • 따라서 Zookeeper 서버에 Controller를 마이그레이션 하는 작업을 우선으로 하는 것을 추천합니다.

1. ZK클러스터 환경 설정 확인

  • Cluster ID 조회
  • [root@kafka-1 kafka_2.13-3.5.0]# ./bin/zookeeper-shell.sh 10.65.40.27:2181,10.65.40.28:2181,10.65.40.29:2181
    Connecting to 10.65.40.27:2181,10.65.40.28:2181,10.65.40.29:2181
    Welcome to ZooKeeper!
    JLine support is disabled
    
    WATCHER::
    
    WatchedEvent state:SyncConnected type:None path:null
    
    # 연결된 브로커들의 id
    ls /brokers/ids
    [1, 2, 3]
    
    # Active Controller를 담당하는 Broker
    get /controller
    {"version":2,"brokerid":2,"timestamp":"1715663707891","kraftControllerEpoch":-1}
    
    # Cluster Id 조회
    get /cluster/id
    {"version":"1","id":"YDRiePSKTZaLOMm3cxcSjg"}

 

2. KRaft Controller 생성

  • 2-1. KRaft Controller 설정
    $ ./kafka-storage.sh format --cluster-id "ZOOKEEPER_CLUSTER_ID" -c ../config/server.properties
    
    [root@kafka-4 kafka_2.13-3.5.0]# bin/kafka-storage.sh format -t lOlsolxoS1e4FanUIMi-OQ -c config/kraft/server.properties
    
    Formatting /tmp/kraft-combined-logs with metadata.version 3.5-IV2.
    
    

    ./bin/kafka-server-start.sh ./config/kraft/controller.properties
    
    ## Log
    ## Kraft controller server.log
    [2024-05-13 10:01:03,629] INFO Still waiting for ZK brokers [0, 1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:01:04,629] INFO Still waiting for ZK brokers [0, 1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:02:10,643] INFO Still waiting for ZK brokers [0, 1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:02:11,640] INFO Still waiting for ZK brokers [0, 1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    
    

    ## ZK 브로커에 아래 마이그레이션 관련 설정 추가 후 롤링 재기동 
    ## server.properties
    
    ## 마이그레이션 활성화 설정
    zookeeper.metadata.migration.enable=true
    controller.quorum.voters=3@localhost:9093 # 연결된 quorum.node 모두 기입 
    controller.listener.names=CONTROLLER
    
    ## map에 Controller 추가
    listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
    
    ## 기존 항목
    broker.id=0
    ...
    
    
  • # Kafka Server Log
    ## Kraft controller server.log
    [2024-05-13 10:01:02,656] INFO [Controller 3] Registered new broker: RegisterBrokerRecord(brokerId=0, isMigratingZkBroker=true, incarnationId=cgS-OnxQQgSUG0vtyQIJyw, brokerEpoch=198, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='hostname', port=19092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=8, maxSupportedVersion=8)], rack=null, fenced=true, inControlledShutdown=false) (org.apache.kafka.controller.ClusterControlManager)
    [2024-05-13 10:01:02,725] INFO [Controller 3] The request from broker 0 to unfence has been granted because it has caught up with the offset of its register broker record 198. (org.apache.kafka.controller.BrokerHeartbeatManager)
    [2024-05-13 10:01:03,629] INFO Still waiting for ZK brokers [1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:01:04,629] INFO Still waiting for ZK brokers [1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:02:10,643] INFO Still waiting for ZK brokers [1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:02:11,640] INFO Still waiting for ZK brokers [1, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:02:12,390] INFO [Controller 3] Registered new broker: RegisterBrokerRecord(brokerId=1, isMigratingZkBroker=true, incarnationId=_6oGLJ2CSzekK0MmNqvYMg, brokerEpoch=339, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='hostname', port=29092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=8, maxSupportedVersion=8)], rack=null, fenced=true, inControlledShutdown=false) (org.apache.kafka.controller.ClusterControlManager)
    [2024-05-13 10:02:12,454] INFO [Controller 3] The request from broker 1 to unfence has been granted because it has caught up with the offset of its register broker record 339. (org.apache.kafka.controller.BrokerHeartbeatManager)
    [2024-05-13 10:02:12,640] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:02:13,640] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:02:14,641] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:04:54,698] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:04:55,704] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:04:55,979] INFO [Controller 3] Fencing broker 0 because its session has timed out. (org.apache.kafka.controller.ReplicationControlManager)
    [2024-05-13 10:04:56,699] INFO Still waiting for ZK brokers [0, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:05:07,705] INFO Still waiting for ZK brokers [0, 2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:05:08,105] INFO [Controller 3] Re-registered broker id 0: RegisterBrokerRecord(brokerId=0, isMigratingZkBroker=true, incarnationId=aSmgQkh9RG2mCVufqfbYnQ, brokerEpoch=694, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='hostname', port=19092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=8, maxSupportedVersion=8)], rack=null, fenced=true, inControlledShutdown=false) (org.apache.kafka.controller.ClusterControlManager)
    [2024-05-13 10:05:08,196] INFO [Controller 3] The request from broker 0 to unfence has been granted because it has caught up with the offset of its register broker record 694. (org.apache.kafka.controller.BrokerHeartbeatManager)
    [2024-05-13 10:05:08,701] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDrive
    [2024-05-13 10:12:47,800] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:12:48,816] INFO Still waiting for ZK brokers [2] to register with KRaft. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:12:49,038] INFO [Controller 3] Registered new broker: RegisterBrokerRecord(brokerId=2, isMigratingZkBroker=true, incarnationId=zD9baDWpRbSSffGeF1ua8g, brokerEpoch=1617, endPoints=[BrokerEndpoint(name='PLAINTEXT', host='hostname', port=39092, securityProtocol=0)], features=[BrokerFeature(name='metadata.version', minSupportedVersion=8, maxSupportedVersion=8)], rack=null, fenced=true, inControlledShutdown=false) (org.apache.kafka.controller.ClusterControlManager)
    [2024-05-13 10:12:49,151] INFO [Controller 3] The request from broker 2 to unfence has been granted because it has caught up with the offset of its register broker record 1617. (org.apache.kafka.controller.BrokerHeartbeatManager)
    [2024-05-13 10:12:50,789] INFO KRaft controller 3 overwriting /controller to become the active controller with ZK epoch 5. The previous controller was 1. (kafka.zk.KafkaZkClient)
    [2024-05-13 10:12:50,813] INFO Successfully registered KRaft controller 3 with ZK epoch 5 (kafka.zk.KafkaZkClient)
    [2024-05-13 10:12:51,784] INFO Starting ZK migration (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:12:51,800] INFO Migrating 2 records from ZK (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:12:51,801] INFO [Controller 3] Created topic test with topic ID LsZ0KDlCT7eZEN8Bnjyvfw. (org.apache.kafka.controller.ReplicationControlManager)
    [2024-05-13 10:12:51,802] INFO [Controller 3] Created partition test-0 with topic ID LsZ0KDlCT7eZEN8Bnjyvfw and PartitionRegistration(replicas=[0], isr=[0], removingReplicas=[], addingReplicas=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=4, partitionEpoch=4). (org.apache.kafka.controller.ReplicationControlManager)
    [2024-05-13 10:12:51,822] INFO Migrating 54 records from ZK (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2024-05-13 10:12:51,822] INFO [Controller 3] Created topic __consumer_offsets with topic ID P5zRX8MSSgWwgOdTiOffsA. (org.apache.kafka.controller.ReplicationControlManager)
    [2024-05-13 10:12:51,822] INFO [Controller 3] Created partition __consumer_offsets-22 with topic ID P5zRX8MSSgWwgOdTiOffsA and PartitionRegistration(replicas=[0], isr=[0], removingReplicas=[], addingReplicas=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=4, partitionEpoch=4). (org.apache.kafka.controller.ReplicationControlManager)
    ...
    [2024-05-13 10:12:51,825] INFO [Controller 3] Created partition __consumer_offsets-40 with topic ID P5zRX8MSSgWwgOdTiOffsA and PartitionRegistration(replicas=[0], isr=[0], removingReplicas=[], addingReplicas=[], leader=0, leaderRecoveryState=RECOVERED, leaderEpoch=4, partitionEpoch=4). (org.apache.kafka.controller.ReplicationControlManager)
    [2024-05-13 10:12:51,825] INFO [Controller 3] ConfigResource(type=TOPIC, name='__consumer_offsets'): set configuration compression.type to producer (org.apache.kafka.controller.ConfigurationControlManager)
    [2024-05-13 10:12:51,825] INFO [Controller 3] ConfigResource(type=TOPIC, name='__consumer_offsets'): set configuration cleanup.policy to compact (org.apache.kafka.controller.ConfigurationControlManager)
    [2024-05-13 10:12:51,825] INFO [Controller 3] ConfigResource(type=TOPIC, name='__consumer_offsets'): set configuration segment.bytes to 104857600 (org.apache.kafka.controller.ConfigurationControlManager)
    [2024-05-13 10:12:51,880] INFO Migrating 1 records from ZK (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    [2023-02-13 10:12:51,910] INFO Completed migration of metadata from Zookeeper to KRaft. A total of 57 metadata records were generated. The current metadata offset is now 1682 with an epoch of 1. Saw 3 brokers in the migrated metadata [0, 1, 2]. (org.apache.kafka.metadata.migration.KRaftMigrationDriver)
    
    
  • 2-4. ZK Broker 마이그레이션 등록
  • 2-3. KRaft Controller 시작
    Controller에 마이그레이션을 활성화하여 실행. 이후 Controller 노드는 ZK브로커를 다시 등록하기 위한 대기 상태가 됩니다.
  • 2-2. 로그 디렉토리 생성
  • ## Kraft controller 
    ## {Kafka_HOME}/config/kraft/controller.properties
    
    ## 마이그레이션 활성화 설정
    zookeeper.metadata.migration.enable=true
    zookeeper.connect=zkbroker-1:2181/zkbroker
    
    ## kraft controller 설정
    process.roles=controller
    node.id=3
    controller.quorum.voters=3@localhost:9093
    listeners=CONTROLLER://:9093
    controller.listener.names=CONTROLLER
    ...
  • 이중 쓰기 모드

ZK Broker 롤링 재기동을 마치면 마이그레이션이 성공한 상태이며, 브로커는 아직 ZK Broker와 같이 동작합니다.

여기서 추가된 KRaft Controller는 기존 카프카 Controller와 같이 동작합니다

현재 이중 쓰기 모드 상태이므로 메타데이터는 KRaft Controller의 “__cluster_metadata”와 “zookeeper” 모두에 저장됩니다.

따라서, 완전한 KRaft 모드로 전향하기 위해서는 마이그레이션 모드를 해제하고 ZK Broker를 KRaft Broker로 설정을 변경해주어야 합니다.

3. 마이그레이션 모드 해제

  • 3-1. ZK Broker 설정
  • ## ZK broker 
    ## server.properties
    ## 설정 변경 후 Broker 롤링 재기동
    
    ## zookeeper.metadata.migration.enable=true
    
    ## 유지 및 변경 항목
    controller.quorum.voters=3@localhost:9093
    controller.listener.names=CONTROLLER
    node.id=1
    process.roles=broker
    
    ## 제거 항목
    ## broker.id=1
    ## zookeeper.connect=localhost:2181/zkbroker
    ## zookeeper.connection.timeout.ms=18000
    
    
  • 3-2. KRaft Controller 설정
  • ## Kraft controller
    ## controller.properties
    ## ## 설정 변경 후 Controller 롤링 재기동
    
    ## 수정 항목
    zookeeper.metadata.migration.enable=false
    ## zookeeper.connect=localhost:2181/zkbroker
    
    # 기존 설정
    process.roles=controller
    node.id=3
    controller.quorum.voters=3@localhost:9093
    listeners=CONTROLLER://:9093
    controller.listener.names=CONTROLLER