Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
Tags
- MariaDB
- PrestoDB
- MongoDB
- elasticearch
- Python
- kibana
- fluentd
- ui for kafka
- pyspark
- kafka ui
- logstash
- Kafka
- SSL
- kafka connect
- naverdevelopers
- elasticsearch
- PostgreSQL
Archives
- Today
- Total
Dev_duri
File(csv) to Postgresql( pySpark ) 본문
Spark를 사용한 위와 같은 파이프라인 demo를 구성하였습니다.
특정한 파일 확장자(CSV, JSON 등)을 스파크(PySpark)를 통해 RDB에 저장합니다.
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
ip = "postgre.sql.local.host"
port = 5432
user = "username"
passwd = "password"
db = "dbname"
spark = SparkSession \
.builder \
.appName('SparkByExamples.com') \
.config("spark.driver.extraClassPath", "/root/spark-3.2.2-bin-hadoop3/jars/postgresql-42.5.4.jar") \
.getOrCreate()
df = spark.read.csv("file:///root/self.csv")
df2 = spark.read.option("header",True) \
.csv("file:///root/self.csv")
df_with_schema.printSchema()
df2.write.option("header",True) \
.csv("/tmp/spark_output/self.csv")
query1 = df2.write.format("jdbc")\
.option("url","jdbc:postgresql://{0}:{1}/{2}".format(ip, port, db)) \
.option("driver", "org.postgresql.Driver") \
.option("dbtable", "schema.self") \
.option("user", user) \
.option("password", passwd) \
.save()
> 위는 File to Postgresql 코드입니다.
Spark가 로컬 폴더에 저장되어있는 csv 파일을 조회하여 postgresql에 적재합니다.
postgresql에 적재할때에는 ip, port, dbname을 정확히 확인하고 postgresql 특성 상 db.schema.table 위치에 적재하기 떄문에 알맞은 스키마 네임 또한 확인해야 합니다.