IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink sql实例问题记录 -> 正文阅读

[大数据]Flink sql实例问题记录

Flink sql

Flink sql

消费Kafka数据

先创建流环境和表环境

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
        env.setParallelism(1);
 //设置检查点
        env.enableCheckpointing(5000L);

先通过连接器建表

字段顺序可以不一致,但是字段名和属性要与kafka内的字段属性一致,要能匹配上,这里默认的是按照处理时间来操作,用来触发窗口,若使用处理时间用函数

`ts` AS PROCTIME()
    //这种方式直接可以用来触发窗口,无需定义这个字段为水位线,如果是按照时间时间,或者数据中有时间戳,则需要定义水位线 格式如下  以下是乱序 有延迟 ts代表的就是指定的水位线的字段
    watermark for ts as ts - interval '5' second
tEnv.executeSql(
                    "CREATE TABLE kafka_table (" +
                            "`action_type` VARCHAR,\n" +
                            "`allocated_qty` DOUBLE,\n" +
                            "`batch_no` VARCHAR,\n" +
                            "`brand_code` VARCHAR,\n" +
                            "`brand_name` VARCHAR,\n" +
                            "`client_code` VARCHAR,\n" +
                            "`client_name` VARCHAR,\n" +
                            "`container_code` VARCHAR,\n" +
                            "`country_of_origin` VARCHAR,\n" +
                            "`created_by_user` VARCHAR,\n" +
                            "`created_by_user_id` BIGINT,\n" +
                            "`created_dtm_loc` TIMESTAMP,\n" +
                            "`created_office` VARCHAR,\n" +
                            "`created_time_zone` VARCHAR,\n" +
                            "`csNum` DOUBLE,\n" +
                            "`db` VARCHAR,\n" +
                            "`eventTimeDay` DATE,\n" +
                            "`exp_date` DATE,\n" +
                            "`from_location_code` VARCHAR,\n" +
                            "`frozen_qty` DOUBLE,\n" +
                            "`in_transit_qty` DOUBLE,\n" +
                            "`inv_adjustment_qty` DOUBLE,\n" +
                            "`inv_adjustment_type` VARCHAR,\n" +
                            "`inv_location_inventory_id` BIGINT,\n" +
                            "`inv_lot_id` BIGINT,\n" +
                            "`inv_transaction_id` BIGINT,\n" +
                            "`inventory_quality` VARCHAR,\n" +
                            "`inventory_type` VARCHAR,\n" +
                            "`item_code` VARCHAR,\n" +
                            "`item_name` VARCHAR,\n" +
                            "`location_code` VARCHAR,\n" +
                            "`locked` TINYINT,\n" +
                            "`lot_attr1` VARCHAR,\n" +
                            "`lot_attr10` VARCHAR,\n" +
                            "`lot_attr11` VARCHAR,\n" +
                            "`lot_attr12` VARCHAR,\n" +
                            "`lot_attr13` VARCHAR,\n" +
                            "`lot_attr14` VARCHAR,\n" +
                            "`lot_attr15` VARCHAR,\n" +
                            "`lot_attr16` DATE,\n" +
                            "`lot_attr2` VARCHAR,\n" +
                            "`lot_attr3` VARCHAR,\n" +
                            "`lot_attr4` VARCHAR,\n" +
                            "`lot_attr5` VARCHAR,\n" +
                            "`lot_attr6` VARCHAR,\n" +
                            "`lot_attr7` VARCHAR,\n" +
                            "`lot_attr8` VARCHAR,\n" +
                            "`lot_attr9` VARCHAR,\n" +
                            "`lpn_no` VARCHAR,\n" +
                            "`mfg_date` DATE,\n" +
                            "`on_hand_qty` DOUBLE,\n" +
                            "`op` VARCHAR,\n" +
                            "`owner_code` VARCHAR,\n" +
                            "`pack_code` VARCHAR,\n" +
                            "`palletNum` DOUBLE,\n" +
                            "`po_no` VARCHAR,\n" +
                            "`primaryKey` VARCHAR,\n" +
                            "`priority` SMALLINT,\n" +
                            "`process_state` SMALLINT,\n" +
                            "`qty_uom` VARCHAR,\n" +
                            "`received_date` DATE,\n" +
                            "`record_version` BIGINT,\n" +
                            "`ref_code_id_1` VARCHAR,\n" +
                            "`ref_code_id_10` VARCHAR,\n" +
                            "`ref_code_id_2` VARCHAR,\n" +
                            "`ref_code_id_3` VARCHAR,\n" +
                            "`ref_code_id_4` VARCHAR,\n" +
                            "`ref_code_id_5` VARCHAR,\n" +
                            "`ref_code_id_6` VARCHAR,\n" +
                            "`ref_code_id_7` VARCHAR,\n" +
                            "`ref_code_id_8` VARCHAR,\n" +
                            "`ref_code_id_9` VARCHAR,\n" +
                            "`ref_value_1` VARCHAR,\n" +
                            "`ref_value_10` VARCHAR,\n" +
                            "`ref_value_2` VARCHAR,\n" +
                            "`ref_value_3` VARCHAR,\n" +
                            "`ref_value_4` VARCHAR,\n" +
                            "`ref_value_5` VARCHAR,\n" +
                            "`ref_value_6` VARCHAR,\n" +
                            "`ref_value_7` VARCHAR,\n" +
                            "`ref_value_8` VARCHAR,\n" +
                            "`ref_value_9` VARCHAR,\n" +
                            "`reference_line_no` VARCHAR,\n" +
                            "`reference_no` VARCHAR,\n" +
                            "`reference_type` VARCHAR,\n" +
                            "`sales_no` VARCHAR,\n" +
                            "`sales_status` VARCHAR,\n" +
                            "`table` VARCHAR,\n" +
                            "`total_gross_weight` DOUBLE,\n" +
                            "`total_net_weight` DOUBLE,\n" +
                            "`total_weight` DOUBLE,\n" +
                            "`transaction_no` VARCHAR,\n" +
                            "`updated_by_user` VARCHAR,\n" +
                            "`updated_by_user_id` BIGINT,\n" +
                            "`updated_dtm_loc` TIMESTAMP,\n" +
                            "`updated_office` VARCHAR,\n" +
                            "`updated_time_zone` VARCHAR,\n" +
                            "`vendor_code` VARCHAR,\n" +
                            "`vendor_name` VARCHAR,\n" +
                            "`volume` DOUBLE,\n" +
                            "`warehouse_code` VARCHAR,\n" +
                            "`warehouse_id` BIGINT,\n" +
//                            "`process_time` AS PROCTIME(),\n" +
//                            "`ts` as LOCALTIMESTAMP,\n" +
                            "`ts` AS PROCTIME()\n" +
//                            " watermark for ts as ts - interval '5' second\n" +
                            ")WITH (\n" +
                            "'connector' = 'kafka',\n" +
                            "'topic' = 'dwd_inv_transaction',\n" +
                            "'properties.bootstrap.servers' = 'shucang001:9092,shucang002:9092,shucang003:9092',\n" +
                            //ads_test 的消费者组
//                            "'properties.group.id' = 'testGroup',\n" +
                            //ads_test01的消费者组
                            "'properties.group.id' = 'testGroup05',\n" +
                            //无提交offet的 从最开始消费
                            "'scan.startup.mode' = 'earliest-offset',\n" +
                            //从已提交的offset开始消费 无提交的从最新的开始消费
//                            "'scan.startup.mode' = 'latest-offset',\n" +
                            "'format' = 'json'\n" +
                            ")"
        );

注意:
group-offsets:从特定消费者组的 ZK / Kafka 代理中的已提交偏移开始。
earliest-offset:从可能的最早偏移量开始。
latest-offset : 从最新的偏移量开始。
timestamp:从用户提供的每个分区的时间戳开始。
specific-offsets:从用户提供的每个分区的特定偏移量开始

执行group by

String adsSql = "select \n" +
                "eventTimeDay,\n" +
                "warehouse_code,\n" +
                "client_code,\n" +
                "client_name, \n" +
                "reference_type,\n" +
                "sum(ea_num)  ea_num,\n" +
                "sum(pallet_num) pallet_num, \n" +
                "sum(cs_num) cs_num,\n" +
                "ROUND(sum(total_weight),3) total_weight,\n" +
                "ROUND(sum(total_gross_weight),3) total_gross_weight,\n" +
                "ROUND(sum(total_net_weight),3) total_net_weight\n" +
                "from \n" +
                "(\n" +
                "SELECT\n" +
                "eventTimeDay,\n" +
                "warehouse_code,\n" +
                "client_code,\n" +
                "client_name,\n" +
                "lpn_no,\n" +
                "reference_type,\n" +
                "SUM(inv_adjustment_qty) as ea_num,\n" +
                "CEILING(SUM(palletNum)) as pallet_num ,\n" +
                "CEILING(SUM(csNum)) as cs_num,\n" +
                "ROUND(SUM(total_weight),3) as total_weight,\n" +
                "ROUND(SUM(total_gross_weight),3) as total_gross_weight,\n" +
                "ROUND(SUM(total_net_weight),3) as total_net_weight\n" +
                "FROM\n" +
                "kafka_table\n" +
                "GROUP BY\n" +
                "TUMBLE(ts, INTERVAL '25' SECOND),\n" +
                "eventTimeDay,\n" +
                "warehouse_code,\n" +
                "client_code,\n" +
                "client_name,\n" +
                "lpn_no,\n" +
                "reference_type\n" +
                ") b where ea_num>0\n" +
                "group by \n" +
                "eventTimeDay,\n" +
                "warehouse_code,\n" +
                "client_code,\n" +
                "client_name,\n" +
                "reference_type";
        Table adstable = tEnv.sqlQuery(adsSql);
        tEnv.toChangelogStream(adstable).print();

需要在group by 后设置窗口的类型和触发间隔

TUMBLE(ts, INTERVAL '25' SECOND)
这里代表的是滚动窗口,触发间隔是25秒一次

sink到mysql数据库

这个表需要先创建好

String mysql = "create table ads_table(\n" +
                "  `id` STRING,\n" +
                "  `event_time_day` DATE,\n" +
                "  `warehouse_code` STRING,\n" +
                "  `client_code` STRING ,\n" +
                "  `client_name` STRING ,\n" +
                "  `reference_type` STRING,\n" +
                "  `ea_num` DOUBLE,\n" +
                "  `pallet_num` DOUBLE,\n" +
                "  `cs_num` DOUBLE,\n" +
                "  `total_weight` DOUBLE,\n" +
                "  `total_gross_weight` DOUBLE,\n" +
                "  `total_net_weight` DOUBLE,\n" +
                "   PRIMARY KEY (id) NOT ENFORCED" +
                "  ) WITH (\n" +
                "\t'connector' = 'jdbc',\n" +
                "  'driver'='com.mysql.cj.jdbc.Driver',\n" +
                "  'username' = 'root',  \n" +
                "  'password' = '123456',  \n" +
                "  'table-name' = 'ads_test01',\n" +
//                "  'table-name' = 'ads_test03',\n" +
//                "  'url' = 'jdbc:mysql://localhost:3306/test?useSSL=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai'\n" +
                "  'url' = 'jdbc:mysql://40.73.64.201:50010/test?useSSL=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=Asia/Shanghai'\n" +
                "  )";

注意此处的driver用的是高版本的,因为上面cdc用的是2.2.1,这个要求就是需要使用高版本的驱动,但是不影响来用它连接mysql5的数据库

接下来就是创建表,并往里面插入数据,这个时候会遇到primarykey的问题,我这里参考的就是通过groupby的字段来拼接一个唯一字段,往数据库里面插入数据,因为这里用了groupby,写入的时候会触发upset操作,需要通过一个主键来找到对应的行进行更新操作

 tEnv.executeSql(mysql);
        tEnv.executeSql("insert into ads_table SELECT CONCAT_WS('-',CAST(eventTimeDay AS STRING),client_code,reference_type) as id,eventTimeDay,warehouse_code,client_code,client_name,reference_type,ea_num,pallet_num,cs_num,total_weight,total_gross_weight,total_net_weight FROM "+ adstable);

问题

如果分组字段过多,这样操作可能会出现拼接的id长度过长被截断,导致id不准确,最后数据出现问题,现在想是否有一种方式能基于拼接的字符串生产一个唯一的类似数字id 将这个id写入到数据库中就能避免长度过长的情况出现,还有一个方式就是通过api去实现,手写jdbc 但是代码量就上升了,复用性也不算很高
解决方案一:
设置mysql数据库联合主键,还是只拼接2个字段,剩下字段通过设置联合主键的方式,来确定需要修改的数据位置。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-04 22:59:49  更:2022-07-04 23:00:54 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:54:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码