Kafka

Strimzi Kafka NodePort 연결

marcel 2025. 4. 21. 08:34

NodePort란

NodePort는 Kubernetes 서비스 중 하나의 유형입니다.

이런 서비스가 생성되면 Kubernetes는 클러스터의 모든 노드에 포트를 할당하고,

이 포트로 들어오는 모든 트래픽이 해당 서비스로, 그리고 궁극적으로 그 서비스 뒤에 있는 파드(Pod)로 라우팅되도록 보장합니다.

트래픽 라우팅은 Kubernetes의 kube-proxy 컴포넌트가 수행합니다.

 

파드가 어떤 노드에서 실행되느냐는 중요하지 않습니다.

NodePort는 모든 노드에서 열려 있기 때문에, 트래픽은 항상 파드에 도달할 수 있습니다.

따라서, 클라이언트는 Kubernetes 클러스터의 어떤 노드든 상관없이, 해당 노드의 NodePort에 연결하기만 하면 되고, 나머지는 Kubernetes가 처리합니다.

기본적으로 NodePort는 30000~32767 범위의 포트에서 선택됩니다. (임의로 선택할 수 있습니다. )

 

노드 주소를 얻기 위해, init 컨테이너는 Kubernetes API와 통신하여 노드 리소스(node resource) 정보를 가져와야 합니다.
이 노드 리소스의 status 필드에는 일반적으로 다음 중 하나 이상의 주소가 포함되어 있습니다:

  • External DNS
  • External IP
  • Internal DNS
  • Internal IP
  • Hostname

때로는 이들 중 일부만 존재할 수 있습니다.
이 경우, init 컨테이너는 위의 우선순위 순서대로 주소를 검색하며, 가장 먼저 발견된 주소를 사용하게 됩니다.

노드 주소가 설정되면, Kafka 클라이언트는 부트스트랩 NodePort 서비스를 사용하여 Kafka 클러스터에 초기 연결을 시도할 수 있습니다.

그 후, 클라이언트는 이 연결을 통해 클러스터의 브로커 메타데이터를 받아오며, 이 메타데이터에는 각 브로커에 대응되는 NodePort 주소 정보가 포함되어 있어, 클라이언트는 이후 해당 브로커와 직접 연결해 메시지를 송수신하게 됩니다.

 

Kafka Cluster NodePort 적용

NodePort를 이용한 Kafka 외부 노출

  • Strimzi Operator 사용 시 필요한 CR은 들어가 있기 때문에 Kafka.yaml 양식에 아래 내용만 추가해주면 적용이 가능합니다. 
  • Kafka는 클러스터의 모든 브로커를 라운드로빈(Round-robin) 방식으로 접근하는 구조가 아니며, 각 클라이언트는 반드시 개별 브로커에 직접 접근해야 합니다.
  • Kubernetes 클러스터 내부에서는 파드의 DNS 이름을 advertised 주소로 설정하면 가능했지만, Kubernetes 외부에서는 파드의 호스트명이나 IP를 인식할 수 없기 때문에 사용할 수 없습니다.
spec:
  ,,,
  kafka:
    authorization:
      ,,,
    config:
      ,,,
    listeners:
      ,,,
      - authentication:
          type: scram-sha-512 # Sasl 보안 설정
        configuration:
          bootstrap:
            nodePort: 32100
          brokers:
            - broker: 0
              nodePort: 32000
            - broker: 1
              nodePort: 32001
            - broker: 2
              nodePort: 32002
        name: nodeports
        port: 9094
        tls: false # SSL 설정(True / False)
        type: nodeport

구성 

파드 호스트명이나 IP를 사용하는 대신, Strimzi는 Kafka 브로커마다 별도의 서비스를 생성합니다.

즉, Kafka 클러스터에 브로커가 N개 있다면, 총 N + 1 개의 NodePort 서비스가 생성됩니다:

  • 1개는 부트스트랩용 서비스
  • 클라이언트가 처음 연결할 때 사용되며 Kafka 클러스터의 메타데이터를 수신함
  • 나머지 N개는 개별 브로커용 서비스
  • 클라이언트가 특정 브로커에 직접 연결할 수 있도록 함

이 모든 서비스는 NodePort 타입으로 생성되며,

각 서비스는 서로 다른 NodePort 번호를 할당받아 브로커 간 트래픽을 구분할 수 있게 됩니다.

노드 주소는 브로커의 advertised address로 설정되고,

Kafka 클라이언트는 이를 통해 다음과 같은 방식으로 연결을 진행합니다.

  1. 먼저 bootstrap NodePort 서비스를 통해 Kafka 클러스터에 초기 연결을 시도합니다.
  2. 이 연결을 통해 Kafka 클라이언트는 클러스터의 브로커 메타데이터를 받아옵니다.
  3. 이후 클라이언트는 각 브로커별로 설정된 개별 NodePort 주소로 직접 연결하여 메시지를 주고받게 됩니다.

통신을 위한 Sasl User 생성

  • User 생성
[root@bastion01 scram-sha-512-auth]# oc apply -f user.yaml -n strimzi-cluster
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: admin
  namespace: strimzi-cluster
  labels:
    strimzi.io/cluster: test-cluster
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      # Example ACL rules for consuming from my-topic using consumer group my-group
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operations:
          - DescribeConfigs
          - Describe
          - Read
        host: "*"
      - resource:
          type: group
          name: my-group
          patternType: literal
        operations:
          - Read
        host: "*"
      # Example ACL rules for producing to topic my-topic
      - resource:
          type: topic
          name: my-topic
          patternType: literal
        operations:
          - Create
          - Describe
          - Write
        host: "*"
  • 생성된 User Secret 확인
  • 이 Secret의 Password, Sasl.jaas.config을 통해 외부 Client에서 Node Port를 통해 통신할 수 있다. 

외부 Kafka Client와 통신 

public class Producer_all {
    public static void main(String[] args) {
        // 토픽 이름
        String topic = "my-topic1";

        // 브로커 정보 (DNS 이름과 포트)
        String bootstrapServers = "openshift-workernode-ip1: nodeport , openshift-workernode-ip12:nodeport";

        // Producer 설정
        Properties props = new Properties();

        // DNS 관련 추가 설정
        props.put("security.protocol", "SASL_PLAINTEXT"); # 통신 프로토콜
        props.put("sasl.mechanism", "SCRAM-SHA-512"); # Sasl 
        props.put("sasl.jaas.config",
            "org.apache.kafka.common.security.scram.ScramLoginModule required " +
            "username=\"admin\" password=\"Pssword";"); # 직접 jaas config 입력

        // Producer 생성
        Producer<String, String> producer = new KafkaProducer<>(props);

        try {
            // 메시지 전송
            for (int i = 0; i < 10; i++) {
                String key = "key-" + i;
                String value = "value-" + i;

                // 메시지 전송 및 전송 메타데이터 반환
                RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, key, value)).get();

                // 전송 결과 출력
                System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, offset=%d)\n",
                        key, value, metadata.partition(), metadata.offset());
            }
        } catch (ExecutionException | InterruptedException e) {
            System.err.println("Error while sending message: " + e.getMessage());
        } finally {
            producer.close(); // Producer 닫기
        }
    }
  • 송신 결과