Dev_duri

File(csv) to Postgresql( pySpark ) 본문

Spark

File(csv) to Postgresql( pySpark )

marcel 2023. 3. 28. 13:14

Spark를 사용한 위와 같은 파이프라인 demo를 구성하였습니다. 

 

특정한 파일 확장자(CSV, JSON 등)을 스파크(PySpark)를 통해 RDB에 저장합니다. 

 

예제_파일.csv

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
ip = "postgre.sql.local.host"
port = 5432
user = "username"
passwd = "password"
db = "dbname"
spark = SparkSession \
        .builder \
        .appName('SparkByExamples.com') \
        .config("spark.driver.extraClassPath", "/root/spark-3.2.2-bin-hadoop3/jars/postgresql-42.5.4.jar") \
        .getOrCreate()
df = spark.read.csv("file:///root/self.csv")
df2 = spark.read.option("header",True) \
     .csv("file:///root/self.csv")
df_with_schema.printSchema()
df2.write.option("header",True) \
 .csv("/tmp/spark_output/self.csv")
query1 = df2.write.format("jdbc")\
          .option("url","jdbc:postgresql://{0}:{1}/{2}".format(ip, port, db)) \
          .option("driver", "org.postgresql.Driver") \
          .option("dbtable", "schema.self") \
          .option("user", user) \
          .option("password", passwd) \
          .save()

> 위는 File to Postgresql 코드입니다. 

Spark가 로컬 폴더에 저장되어있는 csv 파일을 조회하여 postgresql에 적재합니다. 

postgresql에 적재할때에는 ip, port, dbname을 정확히 확인하고 postgresql 특성 상 db.schema.table 위치에 적재하기 떄문에 알맞은 스키마 네임 또한 확인해야 합니다. 

 

해당 CSV 파일 스키마 속성

 

해더 추가
postgresql에 적재된 데이터 확인( dbeaver )