Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | ||||||
| 2 | 3 | 4 | 5 | 6 | 7 | 8 |
| 9 | 10 | 11 | 12 | 13 | 14 | 15 |
| 16 | 17 | 18 | 19 | 20 | 21 | 22 |
| 23 | 24 | 25 | 26 | 27 | 28 | 29 |
| 30 |
Tags
- PrestoDB
- elasticsearch
- ui for kafka
- pyspark
- elasticearch
- MariaDB
- kafka ui
- kibana
- fluentd
- kafka connect
- Python
- PostgreSQL
- MongoDB
- naverdevelopers
- logstash
- SSL
- Kafka
Archives
- Today
- Total
Dev_duri
Apache Beam 본문

Apache Beam은 ETL, batch, streaming 파이프라인을 처리하기 위한 unified programming model 입니다. Beam의 가장 큰 특징은 다양한 랭귀지와 다양한 runner를 지원한다는 것 이고 Beam SDK를 통해 다양한 runner에서 데이터를 처리할 수 있게 합니다.
Beam SDK 를 사용하여 테스트 서버에 구축되어 있는 Kafka 에 간단한 데모를 Python으로 구현 하였습니다.
Producer 소스
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
with beam.Pipeline(options=PipelineOptions()) as p:
notifications = (p
| "Creating data" >> beam.Create([('dev_1', '{"device": "0001", status": "healthy"}')])
| "Pushing messages to Kafka" >> kafkaio.KafkaProduce(
topic='토픽네임',
servers="브로커IP:9092"
)
)
notifications | 'Writing to stdout' >> beam.Map(print)
구동 결과

Kafdrop으로 해당 Topic에 PUB한 Data를 확인
Consumer 소스
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from beam_nuggets.io import kafkaio
consumer_config = {"topic": "토픽네임",
"bootstrap_servers": "브로커IP:9092",
"group_id": "heejae_beam_test"}
with beam.Pipeline(options=PipelineOptions()) as p:
notifications = p | "Reading messages from Kafka" >> kafkaio.KafkaConsume(
consumer_config=consumer_config,
value_decoder=bytes.decode, # optional
)
notifications | 'Writing to stdout' >> beam.Map(print)
구동 결과

해당 Topic에 적재된 Data를 Print한다
'Kafka' 카테고리의 다른 글
| PrestoDB/Kafka (0) | 2023.02.16 |
|---|---|
| Impala/Kudu Kafka Connector (0) | 2023.02.16 |
| Spark (Kafka to Hadoop)-pyspark (0) | 2023.02.16 |
| Flink (0) | 2023.01.31 |
| Kafka 기본 명령어 (0) | 2022.12.10 |