Kafka

Kafka SSL 구성

marcel 2023. 5. 8. 16:35
더보기

개요

카프카는 최초 설정 시 보안 설정을 하지 않으면 카프카와 통신이 가능한 모든 클라이언트와 연결이 가능합니다. 

이는 보통 사내의 안전한 네트워크에서 운용하는 경우이기 때문에 보안을 적용하지 않는 것으로 사용자의 입장에서는 편리하지만 외부에서 접속하는 사용자에 대해서는 정보 유출의 위협이 존재합니다. 

 

이런 취약한 부분을 보완하기 위해 암호화(SSL), 인증(SASL), 권한(ACL)의 세 가지 보안 요소가 필요합니다. 

여기서 암호화는 카프카와 카프카 클라이언트 사이의 통신에 암호화를 적용하는 것입니다. 일반적으로 SSL을 사용합니다. 

 

SSL(Secure Socket Layer)은 서버와 서버 사이, 서버와 클라이언트 사이에서 통신 보안을 적용하기 위한 표준 암호 규약입니다. 마치 웹에서 HTTP 프로토콜의 암호화 통신인 HTTPS 프로토콜과 같다고 할 수 있습니다. 

SSL은 효율적인 고성능을 위해 하나의 키를 이용하여 통신하는 '대칭 키' 방식과 2개의 키를 이용하는 '비대칭 키' 방식을 혼용하는 방식으로 사용하여 다소 복잡하지만 높은 성능을 갖기 위해 최적화 되어 있습니다. 

SSL 구성도

테스트 환경에서 위와 같이 구성하였습니다. 


서버 SSL 구성

먼저 Kafka 폴더에 ssl 폴더를 추가로 만들어주고 환경 변수를 이용하여 password를 등록합니다. 

$ sudo mkdir -p /usr/local/kafka/ssl
$ cd /usr/local/kafka/ssl
$ export SSLPASS=password

 

브로커의 키스토어를 생성합니다. 테스트이기에 임의로 생성한 것으로 Hostname 또한 각자 설정에 따라 등록합니다. 

그 후 아래 리스트 조회 명령어를 통해 등록된 키스토어를 조회할 수 있습니다. 

$ sudo keytool -keystore kafka.server.keystore.jks -alias localhost -keyalg RSA -validity 365 -genkey -storepass $SSLPASS -keypass $SSLPASS -dname "CN=kafka-1.host.name" -storetype pkcs12

$ keytool -list -v -keystore kafka.server.keystore.jks

 

운영, 개발 환경에서는 공인 인증서를 사용하여 위조를 방지하고 안전한 통신을 보장합니다. 

공인 인증서는 인증기관(CA)에 일부 비용을 지불하고 인증서를 발급받게 됩니다. 

다만 개발 환경이 아닌 테스트 환경이기 때문에 자체 생성, 서명한 사설 인증서를 통해 진행하도록 하겠습니다. 

다음과 같이 openssl 명령어를 통해 CA 인증서를 생성합니다. 

$ sudo openssl req -new -x509 -keyout ca-key -out ca-cert -days 356 -subj "/CN=host.inzng" -nodes

 

여기까지 된 상황이라면 ls 명령어를 통해 아래 세 가지 파일을 확인할 수 있습니다. 

ca-cert ca-key kafka.server.keystore.jks

 

서버가 공인 인증서를 발급받은 경우 클라이언트는 서버 측으로 접속하여 서버의 인증서가 신뢰할 수 있는 것인지 확인합니다. 이 절차가 확인되면 보안 통신을 시작하게 됩니다. 

다만, 테스트 서버에서는 자체 서명된 CA 인증서를 클라이언트가 신뢰할 수 있도록 트러스트스토어에 추가하는 절차가 필요합니다. 

또한 아래 리스트 명령어를 통해 트러스트스토어의 내용을 조회할 수 있습니다. 

$ sudo keytool -keystore kafka.server.truststore.jks -alias CARoot -importcert -file ca-cert -storepass $SSLPASS -keypass $SSLPASS

$ keytool -list -v -keystore kafka.server.truststore.jks

 

다음 단계로, 키스토어에 저장된 모든 인증서는 자체 서명된 CA의 서명을 받아야 합니다. 

이 절차를 통해 클라이언트가 인증서 요청을 보냈을 때, 해당 인증서를 신뢰할 수 있기 때문입니다. 

다음 명령어를 통해 키스토어에서 인증서를 추출합니다. 

$ sudo keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file -storepass $SSLPASS -keypass $SSLPASS

 

ls 명령어를 통해 cert-file 파일이 생성된 것을 확인했다면 생성된 cert-file에 다음과 같이 자체 서명된 CA 서명을 적용합니다. 

$ sudo openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD

 

해당 커맨드를 제대로 입력하면 서명이 완료되었다는 내용이 출력됩니다. 

마지막으로 키스토어에 자체 서명된 CA 인증서인 ca-cert와 cert-signed를 추가합니다. 

$ sudo keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert -storepass $SSLPASS -keypass $SSLPASS

$ sudo keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed -storepass $SSLPASS -keypass $SSLPASS

인증서를 신뢰합니까 라는 문장이 출력되면 y를 입력해줍니다. 

 

이후 키스토어의 내용을 조회해보면 최초 조회 때에 비해 내용이 추가된 것을 확인할 수 있습니다. 

저장소에 총 2개의 인증서가 저장되어 있으며 자체 서명된 CA 인증서 내용이 포함되어 있습니다. 

여기까지의 작업들을 클러스터 내 다른 브로커에도 동일하게 적용해야 합니다. 

 

최초 키스토어를 생성하는 작업 까지는 호스트 네임에 주의하며 동일하게 진행합니다. 

이후 Kafka-1에서 생성한 CA인증서와 키 파일을 다른 브로커에 SCP 명령어로 복사합니다. 

복사된 파일들은 mv 명령어로 ssl 폴더로 옮겨줍니다. 

$ scp ca-cert kafka-2.host.name:~
$ scp ca-key kafka-2.host.name:~
$ scp kafka.server.truststore.jks kafka-2.host.name:~

$ sudo mv kafka.server.truststore.jks /usr/local/kafka/ssl/

 

그 다음으로 1번 브로커와 같이 키스토어에 CA 인증서 추가를 진행합니다.

$ sudo keytool -keystore kafka.server.keystore.jks -alias CARoot -importcert -file ca-cert -storepass $SSLPASS -keypass $SSLPASS

$ sudo keytool -keystore kafka.server.keystore.jks -alias localhost -importcert -file cert-signed -storepass $SSLPASS -keypass $SSLPASS

이제 키스토어의 내용을 조회했을 때, 문제가 없으면 SSL 적용 준비가 끝났습니다. 


브로커 SSL 설정 추가 

#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9093
zookeeper.connect=zoo.kee.per.001:2181,zoo.kee.per.002:2181,zoo.kee.per.003:2181

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().

#advertised.listeners=PLAINTEXT://kafka.host.ip.01:9092,SSL://kafka.host.ip.01:9093
advertised.listeners=PLAINTEXT://kafka-1.host.name:9092,SSL://kafka-1.host.name:9093

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
ssl.truststore.location=/usr/local/kafka/ssl/kafka.server.truststore.jks/
ssl.truststore.password=password
ssl.keystore.location=/usr/local/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password=password
ssl.key.password=password
security.inter.broker.protocol=SSL

server.properties에 위와 같이 내용을 입력합니다. 

맨 아래 security.inter.broker.protocol 설정은 내부 브로커 통신 간에 SSL을 사용할 경우 입력합니다. 

 

완료되면 브로커를 1대씩 재시작하고 openssl 명령어를 이용하여 최종 확인합니다. 

$ openssl s_client -connect kafka-1.foo.bar:9093 -tls1 </dev/null 2>/dev/null | grep -E 'Verify return code'

위 명령어의 출력 내용이 ok 가 출력되면 ssl 통신 준비가 완료된 것입니다. 

 

$ /root/kafka_2.13-3.1.0.redhat/bin/kafka-topics.sh --bootstrap-server kafka-1.host.name:9092 -create --topic ssl-test01 --partitions 1

이제 토픽을 생성한 뒤 SSL 통신이 적용된 콘솔 프로듀서를 사용하기 위해 별도의 클라이언트 설정 파일을 만들어줍니다. 

$ vi -b /root/ssl.config 

security.protocol=SSL
ssl.truststore.location=/usr/local/kafka/ssl/kafka.client.truststore.jks
ssl.truststore.password=peterpass

프로듀서, 컨슈머

이제 위 문단에서 등록한 ssl.config를 로드하여 프로듀서와 컨슈머를 생성할 수 있습니다. 

$ /root/kafka_2.13-3.1.0.redhat/bin/kafka-console-producer.sh --broker-list kafka-1.host.name:9093 --topic ssl-test01 --producer.config /root/ssl.config

$/root/kafka_2.13-3.1.0.redhat/bin/kafka-console-consumer.sh --bootstrap-server kafka-1.host.name:9093 --topic ssl-test01 --consumer.config /root/ssl.config

 

해당 내용은 고승범 님의 "실전 카프카 개발부터 운영까지" 서적을 참고하였습니다.