前言
Flink和Pulsar是当前大数据常用的组件,他们的优势和特点在此不在赘述。可参考Flink官网和Pulsar官网。 我使用的Flink版本为1.12,Pulsar版本为2.9.0。 此文章的背景为Canal采集MySQL中的binlog数据,写入Pulsar,由Flink解析Pulsar中的Json数据,写入到存储中。
一、确定写入pulsar中的数据结构
在pulsar的目录下执行以下命令: 查看最早的一条数据:
bin/pulsar-client consume --subscription-position Earliest persistent://public/default/yourtopicname -s "first-subscription"
查看最新的一条数据:
bin/pulsar-client consume persistent://public/default/yourtopicname -s "first-subscription"
二、分析Pulsar中的数据结构
将一步骤中数据的内容进行解析
可在https://www.json.cn/中进行结构化输出 输出结果: data中为时间的数据,mysqlType为数据的类型。
{
"data":[
{
"wp_web_page_sk":"60",
"wp_web_page_id":"AAAAAAAAKDAAAAAA",
"wp_rec_start_date":"2001-09-03",
"wp_rec_end_date":"0000-00-00",
"wp_creation_date_sk":"2450813",
"wp_access_date_sk":"2452566",
"wp_autogen_flag":"Y",
"wp_customer_sk":"80555",
"wp_url":"http://www.foo.com",
"wp_type":"welcome",
"wp_char_count":"6577",
"wp_link_count":"24",
"wp_image_count":"2",
"wp_max_ad_count":"3"
}
],
"database":"tpcds_01",
"es":1656468147000,
"id":152155,
"isDdl":false,
"mysqlType":{
"wp_web_page_sk":"int(11)",
"wp_web_page_id":"char(16)",
"wp_rec_start_date":"date",
"wp_rec_end_date":"date",
"wp_creation_date_sk":"int(11)",
"wp_access_date_sk":"int(11)",
"wp_autogen_flag":"char(1)",
"wp_customer_sk":"int(11)",
"wp_url":"varchar(100)",
"wp_type":"char(50)",
"wp_char_count":"int(11)",
"wp_link_count":"int(11)",
"wp_image_count":"int(11)",
"wp_max_ad_count":"int(11)"
},
"old":null,
"pkNames":[
"wp_web_page_sk"
],
"sql":"",
"sqlType":{
"wp_web_page_sk":4,
"wp_web_page_id":1,
"wp_rec_start_date":91,
"wp_rec_end_date":91,
"wp_creation_date_sk":4,
"wp_access_date_sk":4,
"wp_autogen_flag":1,
"wp_customer_sk":4,
"wp_url":12,
"wp_type":1,
"wp_char_count":4,
"wp_link_count":4,
"wp_image_count":4,
"wp_max_ad_count":4
},
"table":"web_page",
"ts":1656468638479,
"type":"INSERT"
}
三、写FlinkSql解析Json数据
1、创建source端
要注意'data'的格式和写法,此过程的本质为行转列的过程。 scan.startup.sub-start-offset消费节点配置可参考 Git上的源码,因为这块内容目前在Flink官网上还无法找到 。
CREATE TABLE ods_tpcds_01_web_page_rt_source
(
`data` ARRAY < ROW < wp_web_page_sk BIGINT,
wp_web_page_id STRING,
wp_rec_start_date STRING,
wp_rec_end_date STRING,
wp_creation_date_sk BIGINT,
wp_access_date_sk BIGINT,
wp_autogen_flag STRING,
wp_customer_sk BIGINT,
wp_url STRING,
wp_type STRING,
wp_char_count BIGINT,
wp_link_count BIGINT,
wp_image_count BIGINT,
wp_max_ad_count BIGINT >>,
`database` STRING,
`isDdl` STRING,
`table` STRING,
`type` STRING,
`es` BIGINT,
`ts` BIGINT
) WITH (
'connector' = 'pulsar',
'topic' = 'persistent://public/default/tpcds_01_web_page',
'service-url' = 'pulsar://xx.xx.xx.xx:6650',
'admin-url' = 'http://xx.xx.xx.xx:8080',
'scan.startup.mode' = 'external-subscription',
'scan.startup.sub-name' = 'ods_tpcds_01_web_page_rt_v1',
'scan.startup.sub-start-offset' = 'earliest',
'format' = 'json'
);
2、创建sink端
sink.rolling-policy.rollover-interval、sink.rolling-policy.file-size等参数可参考官网解释,自行修改 官网地址
CREATE TABLE ods_tpcds_01_web_page_rt_sink
(
wp_web_page_sk BIGINT,
wp_web_page_id STRING,
wp_rec_start_date STRING,
wp_rec_end_date STRING,
wp_creation_date_sk BIGINT,
wp_access_date_sk BIGINT,
wp_autogen_flag STRING,
wp_customer_sk BIGINT,
wp_url STRING,
wp_type STRING,
wp_char_count BIGINT,
wp_link_count BIGINT,
wp_image_count BIGINT,
wp_max_ad_count BIGINT,
`database` STRING,
`isDdl` STRING,
`type` STRING,
`table` STRING,
`es` BIGINT,
`ts` BIGINT,
`pt` STRING
) PARTITIONED BY (pt) WITH (
'connector' = 'filesystem',
'path' = '可以写入hdfs或者minion等存储系统中,例如:s3a://bucket1/test/ods_poc_tpcds_01_web_page_rt_v1或者hdfs://test/ods_poc_tpcds_01_web_page_rt_v1',
'sink.rolling-policy.rollover-interval' = '1min',
'sink.rolling-policy.file-size' = '128M',
'format' = 'parquet'
);
3、写入数据
INSERT INTO ods_tpcds_01_web_page_rt_sink
SELECT
wp_web_page_sk,
wp_web_page_id,
wp_rec_start_date,
wp_rec_end_date,
wp_creation_date_sk,
wp_access_date_sk,
wp_autogen_flag,
wp_customer_sk,
wp_url,
wp_type,
wp_char_count,
wp_link_count,
wp_image_count,
wp_max_ad_count,
`database`,
`isDdl`,
`type`,
`table`,
`es`,
`ts`,
FROM_UNIXTIME((`ts` / 1000) + 60 * 60 * 8, 'yyyy-MM-dd') as pt
FROM ods_tpcds_01_web_page_rt_source
CROSS JOIN UNNEST(`data`) AS t(
wp_web_page_sk,
wp_web_page_id,
wp_rec_start_date,
wp_rec_end_date,
wp_creation_date_sk,
wp_access_date_sk,
wp_autogen_flag,
wp_customer_sk,
wp_url,
wp_type,
wp_char_count,
wp_link_count,
wp_image_count,
wp_max_ad_count
);
总结
FlinkSql解析Json的步骤基本相似,主要是对Pulsar中Json结构的分析和创建对应的Flink表。如有描述不当,烦请指正。
|