Dev_duri

PrestoDB/Kafka 본문

Kafka

PrestoDB/Kafka

희쨔응 2023. 2. 16. 17:05

PrestoDB KafkaConnector는 kafka Topic의 Data를 테이블 형식으로 정형화 하여 SQL쿼리로 Topic의 Data를 조회 할수 있게 하고 JDBC를 제공 하여 Produser/Consumer API개발에 편의성을 제공합니다.

 

Kafdrop 으로 Kafka Topic Data 조회

 

PrestoDB catalog에 kafka 프로퍼티 추가

/presto-server-0.273.3/etc/catalog/kafka.properties

#커넥터 종류
connector.name=kafka
#kafka 브로커 서버 정보
kafka.nodes=ip:9092,ip:9092,ip:9092
#정형화할 Topic name
kafka.table-names=tpch.customer,tpch.orders,tpch.lineitem,tpch.part,tpch.partsupp,tpch.supplier,tpch.nation,tpch.region
kafka.hide-internal-columns=false

 

PrestoDB 실행 후  Topic Table조회

 

Table에 담긴 json형식의 데이터 매핑

/presto-server-0.273.3/etc/kafka/tpch.customer.json

{
    "tableName": "customer",
    "schemaName": "tpch",
    "topicName": "tpch.customer",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "LONG",
                "type": "BIGINT",
                "hidden": "false"
            }
        ]
    },
    "message": {
        "dataFormat": "json",
        "fields": [
            {
                "name": "row_number",
                "mapping": "rowNumber",
                "type": "BIGINT"
            },
            {
                "name": "customer_key",
                "mapping": "customerKey",
                "type": "BIGINT"
            },
            {
                "name": "name",
                "mapping": "name",
                "type": "VARCHAR"
            },
            {
                "name": "address",
                "mapping": "address",
                "type": "VARCHAR"
            },
            {
                "name": "nation_key",
                "mapping": "nationKey",
                "type": "BIGINT"
            },
            {
                "name": "phone",
                "mapping": "phone",
                "type": "VARCHAR"
            },
            {
                "name": "account_balance",
                "mapping": "accountBalance",
                "type": "DOUBLE"
            },
            {
                "name": "market_segment",
                "mapping": "marketSegment",
                "type": "VARCHAR"
            },
            {
                "name": "comment",
                "mapping": "comment",
                "type": "VARCHAR"
            }
        ]
    }
}

 

PrestoDB 재 실행 후  해당 Topic Table 조회

 

PrestoDB Dbeaver 원격 접속 후 해당 Table 조회

 

'Kafka' 카테고리의 다른 글

Kafka SSL 구성  (0) 2023.05.08
Kafka sink Connector ( MongoDB )  (0) 2023.05.08
Impala/Kudu Kafka Connector  (0) 2023.02.16
Apache Beam  (0) 2023.02.16
Spark (Kafka to Hadoop)-pyspark  (0) 2023.02.16