Strimzi Kafka NodePort 연결
NodePort란
NodePort는 Kubernetes 서비스 중 하나의 유형입니다.
이런 서비스가 생성되면 Kubernetes는 클러스터의 모든 노드에 포트를 할당하고,
이 포트로 들어오는 모든 트래픽이 해당 서비스로, 그리고 궁극적으로 그 서비스 뒤에 있는 파드(Pod)로 라우팅되도록 보장합니다.
트래픽 라우팅은 Kubernetes의 kube-proxy 컴포넌트가 수행합니다.
파드가 어떤 노드에서 실행되느냐는 중요하지 않습니다.
NodePort는 모든 노드에서 열려 있기 때문에, 트래픽은 항상 파드에 도달할 수 있습니다.
따라서, 클라이언트는 Kubernetes 클러스터의 어떤 노드든 상관없이, 해당 노드의 NodePort에 연결하기만 하면 되고, 나머지는 Kubernetes가 처리합니다.
기본적으로 NodePort는 30000~32767 범위의 포트에서 선택됩니다. (임의로 선택할 수 있습니다. )
노드 주소를 얻기 위해, init 컨테이너는 Kubernetes API와 통신하여 노드 리소스(node resource) 정보를 가져와야 합니다.
이 노드 리소스의 status 필드에는 일반적으로 다음 중 하나 이상의 주소가 포함되어 있습니다:
- External DNS
- External IP
- Internal DNS
- Internal IP
- Hostname
때로는 이들 중 일부만 존재할 수 있습니다.
이 경우, init 컨테이너는 위의 우선순위 순서대로 주소를 검색하며, 가장 먼저 발견된 주소를 사용하게 됩니다.
노드 주소가 설정되면, Kafka 클라이언트는 부트스트랩 NodePort 서비스를 사용하여 Kafka 클러스터에 초기 연결을 시도할 수 있습니다.
그 후, 클라이언트는 이 연결을 통해 클러스터의 브로커 메타데이터를 받아오며, 이 메타데이터에는 각 브로커에 대응되는 NodePort 주소 정보가 포함되어 있어, 클라이언트는 이후 해당 브로커와 직접 연결해 메시지를 송수신하게 됩니다.
Kafka Cluster NodePort 적용
NodePort를 이용한 Kafka 외부 노출
- Strimzi Operator 사용 시 필요한 CR은 들어가 있기 때문에 Kafka.yaml 양식에 아래 내용만 추가해주면 적용이 가능합니다.
- Kafka는 클러스터의 모든 브로커를 라운드로빈(Round-robin) 방식으로 접근하는 구조가 아니며, 각 클라이언트는 반드시 개별 브로커에 직접 접근해야 합니다.
- Kubernetes 클러스터 내부에서는 파드의 DNS 이름을 advertised 주소로 설정하면 가능했지만, Kubernetes 외부에서는 파드의 호스트명이나 IP를 인식할 수 없기 때문에 사용할 수 없습니다.
spec:
,,,
kafka:
authorization:
,,,
config:
,,,
listeners:
,,,
- authentication:
type: scram-sha-512 # Sasl 보안 설정
configuration:
bootstrap:
nodePort: 32100
brokers:
- broker: 0
nodePort: 32000
- broker: 1
nodePort: 32001
- broker: 2
nodePort: 32002
name: nodeports
port: 9094
tls: false # SSL 설정(True / False)
type: nodeport
|
구성
파드 호스트명이나 IP를 사용하는 대신, Strimzi는 Kafka 브로커마다 별도의 서비스를 생성합니다.
즉, Kafka 클러스터에 브로커가 N개 있다면, 총 N + 1 개의 NodePort 서비스가 생성됩니다:
- 1개는 부트스트랩용 서비스
- 클라이언트가 처음 연결할 때 사용되며 Kafka 클러스터의 메타데이터를 수신함
- 나머지 N개는 개별 브로커용 서비스
- 클라이언트가 특정 브로커에 직접 연결할 수 있도록 함
이 모든 서비스는 NodePort 타입으로 생성되며,
각 서비스는 서로 다른 NodePort 번호를 할당받아 브로커 간 트래픽을 구분할 수 있게 됩니다.
노드 주소는 브로커의 advertised address로 설정되고,
Kafka 클라이언트는 이를 통해 다음과 같은 방식으로 연결을 진행합니다.
- 먼저 bootstrap NodePort 서비스를 통해 Kafka 클러스터에 초기 연결을 시도합니다.
- 이 연결을 통해 Kafka 클라이언트는 클러스터의 브로커 메타데이터를 받아옵니다.
- 이후 클라이언트는 각 브로커별로 설정된 개별 NodePort 주소로 직접 연결하여 메시지를 주고받게 됩니다.
통신을 위한 Sasl User 생성
- User 생성
[root@bastion01 scram-sha-512-auth]# oc apply -f user.yaml -n strimzi-cluster
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: admin
namespace: strimzi-cluster
labels:
strimzi.io/cluster: test-cluster
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
# Example ACL rules for consuming from my-topic using consumer group my-group
- resource:
type: topic
name: my-topic
patternType: literal
operations:
- DescribeConfigs
- Describe
- Read
host: "*"
- resource:
type: group
name: my-group
patternType: literal
operations:
- Read
host: "*"
# Example ACL rules for producing to topic my-topic
- resource:
type: topic
name: my-topic
patternType: literal
operations:
- Create
- Describe
- Write
host: "*"
- 생성된 User Secret 확인
- 이 Secret의 Password, Sasl.jaas.config을 통해 외부 Client에서 Node Port를 통해 통신할 수 있다.
외부 Kafka Client와 통신
public class Producer_all {
public static void main(String[] args) {
// 토픽 이름
String topic = "my-topic1";
// 브로커 정보 (DNS 이름과 포트)
String bootstrapServers = "openshift-workernode-ip1: nodeport , openshift-workernode-ip12:nodeport";
// Producer 설정
Properties props = new Properties();
// DNS 관련 추가 설정
props.put("security.protocol", "SASL_PLAINTEXT"); # 통신 프로토콜
props.put("sasl.mechanism", "SCRAM-SHA-512"); # Sasl
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required " +
"username=\"admin\" password=\"Pssword";"); # 직접 jaas config 입력
// Producer 생성
Producer<String, String> producer = new KafkaProducer<>(props);
try {
// 메시지 전송
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "value-" + i;
// 메시지 전송 및 전송 메타데이터 반환
RecordMetadata metadata = producer.send(new ProducerRecord<>(topic, key, value)).get();
// 전송 결과 출력
System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, offset=%d)\n",
key, value, metadata.partition(), metadata.offset());
}
} catch (ExecutionException | InterruptedException e) {
System.err.println("Error while sending message: " + e.getMessage());
} finally {
producer.close(); // Producer 닫기
}
}
- 송신 결과