IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> FlinkSql读取Pulsar中的Json结构数据并保存 -> 正文阅读

[大数据]FlinkSql读取Pulsar中的Json结构数据并保存


前言

Flink和Pulsar是当前大数据常用的组件,他们的优势和特点在此不在赘述。可参考Flink官网Pulsar官网
我使用的Flink版本为1.12,Pulsar版本为2.9.0。
此文章的背景为Canal采集MySQL中的binlog数据,写入Pulsar,由Flink解析Pulsar中的Json数据,写入到存储中。


一、确定写入pulsar中的数据结构

在pulsar的目录下执行以下命令:
查看最早的一条数据:

bin/pulsar-client consume --subscription-position Earliest persistent://public/default/yourtopicname -s "first-subscription"

查看最新的一条数据:

bin/pulsar-client consume persistent://public/default/yourtopicname -s "first-subscription"

二、分析Pulsar中的数据结构

将一步骤中数据的内容进行解析

可在https://www.json.cn/中进行结构化输出
输出结果:
data中为时间的数据,mysqlType为数据的类型。

{
    "data":[
        {
            "wp_web_page_sk":"60",
            "wp_web_page_id":"AAAAAAAAKDAAAAAA",
            "wp_rec_start_date":"2001-09-03",
            "wp_rec_end_date":"0000-00-00",
            "wp_creation_date_sk":"2450813",
            "wp_access_date_sk":"2452566",
            "wp_autogen_flag":"Y",
            "wp_customer_sk":"80555",
            "wp_url":"http://www.foo.com",
            "wp_type":"welcome",
            "wp_char_count":"6577",
            "wp_link_count":"24",
            "wp_image_count":"2",
            "wp_max_ad_count":"3"
        }
    ],
    "database":"tpcds_01",
    "es":1656468147000,
    "id":152155,
    "isDdl":false,
    "mysqlType":{
        "wp_web_page_sk":"int(11)",
        "wp_web_page_id":"char(16)",
        "wp_rec_start_date":"date",
        "wp_rec_end_date":"date",
        "wp_creation_date_sk":"int(11)",
        "wp_access_date_sk":"int(11)",
        "wp_autogen_flag":"char(1)",
        "wp_customer_sk":"int(11)",
        "wp_url":"varchar(100)",
        "wp_type":"char(50)",
        "wp_char_count":"int(11)",
        "wp_link_count":"int(11)",
        "wp_image_count":"int(11)",
        "wp_max_ad_count":"int(11)"
    },
    "old":null,
    "pkNames":[
        "wp_web_page_sk"
    ],
    "sql":"",
    "sqlType":{
        "wp_web_page_sk":4,
        "wp_web_page_id":1,
        "wp_rec_start_date":91,
        "wp_rec_end_date":91,
        "wp_creation_date_sk":4,
        "wp_access_date_sk":4,
        "wp_autogen_flag":1,
        "wp_customer_sk":4,
        "wp_url":12,
        "wp_type":1,
        "wp_char_count":4,
        "wp_link_count":4,
        "wp_image_count":4,
        "wp_max_ad_count":4
    },
    "table":"web_page",
    "ts":1656468638479,
    "type":"INSERT"
}

三、写FlinkSql解析Json数据

1、创建source端

要注意'data'的格式和写法,此过程的本质为行转列的过程。
scan.startup.sub-start-offset消费节点配置可参考Git上的源码,因为这块内容目前在Flink官网上还无法找到

CREATE TABLE ods_tpcds_01_web_page_rt_source
(
    `data` ARRAY < ROW < wp_web_page_sk BIGINT,
	wp_web_page_id STRING,
	wp_rec_start_date STRING,
	wp_rec_end_date STRING,
	wp_creation_date_sk BIGINT,
	wp_access_date_sk BIGINT,
	wp_autogen_flag STRING,
	wp_customer_sk BIGINT,
	wp_url STRING,
	wp_type STRING,
	wp_char_count BIGINT,
	wp_link_count BIGINT,
	wp_image_count BIGINT,
	wp_max_ad_count BIGINT >>,
	`database` STRING,
	`isDdl` STRING,
	`table` STRING,
	`type` STRING,
	`es` BIGINT,
	`ts` BIGINT
) WITH (
      'connector' = 'pulsar',
      'topic' = 'persistent://public/default/tpcds_01_web_page',
      'service-url' = 'pulsar://xx.xx.xx.xx:6650',
      'admin-url' = 'http://xx.xx.xx.xx:8080',
      'scan.startup.mode' = 'external-subscription',
      'scan.startup.sub-name' = 'ods_tpcds_01_web_page_rt_v1',
      'scan.startup.sub-start-offset' = 'earliest',
      'format' = 'json'
      );

2、创建sink端

sink.rolling-policy.rollover-interval、sink.rolling-policy.file-size等参数可参考官网解释,自行修改
官网地址

CREATE TABLE ods_tpcds_01_web_page_rt_sink
(
    wp_web_page_sk BIGINT,
	wp_web_page_id STRING,
	wp_rec_start_date STRING,
	wp_rec_end_date STRING,
	wp_creation_date_sk BIGINT,
	wp_access_date_sk BIGINT,
	wp_autogen_flag STRING,
	wp_customer_sk BIGINT,
	wp_url STRING,
	wp_type STRING,
	wp_char_count BIGINT,
	wp_link_count BIGINT,
	wp_image_count BIGINT,
	wp_max_ad_count BIGINT,
	`database` STRING,
	`isDdl` STRING,
	`type` STRING,
	`table` STRING,
	`es` BIGINT,
	`ts` BIGINT,
	`pt` STRING
) PARTITIONED BY (pt) WITH (
      'connector' = 'filesystem',
      'path' = '可以写入hdfs或者minion等存储系统中,例如:s3a://bucket1/test/ods_poc_tpcds_01_web_page_rt_v1或者hdfs://test/ods_poc_tpcds_01_web_page_rt_v1',
      'sink.rolling-policy.rollover-interval' = '1min',
      'sink.rolling-policy.file-size' = '128M',
      'format' = 'parquet'
);

3、写入数据

INSERT INTO ods_tpcds_01_web_page_rt_sink
SELECT 
	wp_web_page_sk,
	wp_web_page_id,
	wp_rec_start_date,
	wp_rec_end_date,
	wp_creation_date_sk,
	wp_access_date_sk,
	wp_autogen_flag,
	wp_customer_sk,
	wp_url,
	wp_type,
	wp_char_count,
	wp_link_count,
	wp_image_count,
	wp_max_ad_count,
	`database`,
	`isDdl`,
	`type`,
	`table`,
	`es`,
	`ts`,
     FROM_UNIXTIME((`ts` / 1000) + 60 * 60 * 8, 'yyyy-MM-dd') as pt
FROM ods_tpcds_01_web_page_rt_source
         CROSS JOIN UNNEST(`data`) AS t(
		wp_web_page_sk,
		wp_web_page_id,
		wp_rec_start_date,
		wp_rec_end_date,
		wp_creation_date_sk,
		wp_access_date_sk,
		wp_autogen_flag,
		wp_customer_sk,
		wp_url,
		wp_type,
		wp_char_count,
		wp_link_count,
		wp_image_count,
	wp_max_ad_count 
);

总结

FlinkSql解析Json的步骤基本相似,主要是对Pulsar中Json结构的分析和创建对应的Flink表。如有描述不当,烦请指正。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-05 23:34:26  更:2022-07-05 23:35:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:32:07-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码