Dev_duri

Flink 본문

Kafka

Flink

marcel 2023. 1. 31. 17:36

apache Storm, Spark Streaming과 같은 스트리밍 & 배치 프로세싱 플랫폼입니다.
Flink는 Streaming model이 batch가 아닌 native 방식으로 스트림 처리에 대해 low latency 특성을 가지고 있습니다 또한 Exactly-once를 보장하고 높은 처리량을 보이기 때문에 최근 스트림 처리 분야에서 곽광받고 있습니다.
현재 개발서버에 구축이 완료 되었고 아래와 같이 테스트 하였습니다.
 
Flink 실행 화면
 
Flink web 화면

 

Flink Kafka Consumer 예제 소스

package org.apache.flink.streaming.examples.kafka;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

public class ReadFromKafka {

	public static void main(String[] args) throws Exception {
		// parse input arguments
		final ParameterTool parameterTool = ParameterTool.fromArgs(args);

		if(parameterTool.getNumberOfParameters() < 4) {
			System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
					"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
			return;
		}

		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.getConfig().disableSysoutLogging();
		env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
		env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
		env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface

		DataStream<String> messageStream = env
				.addSource(new FlinkKafkaConsumer08<>(
						parameterTool.getRequired("topic"),
						new SimpleStringSchema(),
						parameterTool.getProperties()));

		// write kafka stream to standard out.
		messageStream.print();

		env.execute("Read from Kafka example");
	}
}

 

해당 소스를 .jar 로 export 시킨후
해당 jar를 Add

jar 실행시 Argument 등록

Job overview

Job Config 확인 jar 실행시 등록했던 Argument가 잘 적용됨

해당 Topic에 들어있던 Data가 정상적으로 Consume 됨

'Kafka' 카테고리의 다른 글

Impala/Kudu Kafka Connector  (0) 2023.02.16
Apache Beam  (0) 2023.02.16
Spark (Kafka to Hadoop)-pyspark  (0) 2023.02.16
Kafka 기본 명령어  (0) 2022.12.10
Kafka_Tutorial_Word  (0) 2022.12.04