IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 11.3.4、sql客户端__流表join -> 正文阅读

[大数据]11.3.4、sql客户端__流表join

1、流表join
(1)Regular Joins(inner join和left left一样)
两个流一致持续,一个流很久以后出现一个数据,只要id可以关联就会关联,这样表中的所有数据都会保存状态,随着数据变大,不适合
(2)Interval Joins (时间间隔join) 不会保留大量的状态(双流join)
—只会关联这一段时间的join数据
—a只会关联b时间到b-5之间数据(b-5<a<b)
(3)流表和维表关联
—维表有时也会更新,但是维表是一个有界流一下就读取完成了

这时需要使用第三方工具mysqlcdc
—使用mysqlcdc关联维表
– 可以实时发现维表更新 需要将flink-sql-connector-mysql-cdc-1.1.0 上传到flink 的lib目录下

mysql cdc

1)、先进行全量表读取
2)、再通过监控 mysql 的binlog 日志实时读取新的数据

附件数据(1)

1、Regular Joins 

问题:两边表的数据都会以flink状态的形式保存起来,如果表持续增长,会导致flink状态放不下,出问题

CREATE TABLE student_join (
 id String,
 name String,
 age int,
 gender STRING,
 clazz STRING
) WITH (
 'connector' = 'kafka',
 'topic' = 'student_join',
 'properties.bootstrap.servers' = 'master:9092',
 'properties.group.id' = 'asdasdasd',
 'format' = 'csv',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE score_join (
 s_id String,
 c_id String,
 sco int
) WITH (
 'connector' = 'kafka',
 'topic' = 'score_join',
 'properties.bootstrap.servers' = 'master:9092',
 'properties.group.id' = 'asdasdasd',
 'format' = 'csv',
 'scan.startup.mode' = 'latest-offset'
)


select a.id,a.name,b.c_id,b.sco from student_join a inner join score_join b on a.id=b.s_id
select a.id,a.name,b.c_id,b.sco from student_join a left join score_join b on a.id=b.s_id


kafka-console-producer.sh --broker-list master:9092  --topic student_join

1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班

kafka-console-producer.sh --broker-list master:9092 --topic score_join

1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137
1500100001,1000004,29
1500100001,1000005,85
1500100001,1000006,52
1500100002,1000001,139
1500100002,1000002,102

附件数据(2)

CREATE TABLE student_Interval_join (
 id String,
 name String,
 age int,
 gender STRING,
 clazz STRING,
 ts TIMESTAMP(3),
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'student_join',
 'properties.bootstrap.servers' = 'node1:9092,node2:9092,master:9092',
 'properties.group.id' = 'asdasdasd',
 'format' = 'csv',
 'scan.startup.mode' = 'latest-offset'
)


CREATE TABLE score_Interval_join (
 s_id String,
 c_id String,
 sco int,
 ts TIMESTAMP(3),
 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
 'connector' = 'kafka',
 'topic' = 'score_join',
 'properties.bootstrap.servers' = 'node1:9092,node2:9092,master:9092',
 'properties.group.id' = 'asdasdasd',
 'format' = 'csv',
 'scan.startup.mode' = 'latest-offset'
)


select a.id,a.name,b.c_id,b.sco from 
student_Interval_join a , score_Interval_join b 
WHERE a.id=b.s_id and a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts


kafka-console-producer.sh --broker-list master:9092 --topic student_join

1500100001,施笑槐,22,女,文科六班,"2020-09-17 15:12:20"
1500100002,吕金鹏,24,男,文科六班,"2020-09-17 15:12:20"
1500100002,吕金鹏,24,男,文科六班,"2020-09-17 15:12:36"
1500100003,单乐蕊,22,女,理科六班

kafka-console-producer.sh --broker-list master:9092 --topic score_join

1500100001,1000001,98,"2020-09-17 15:12:22"
1500100001,1000002,5,"2020-09-17 15:12:23"
1500100001,1000002,5,"2020-09-17 15:12:40"
1500100002,1000002,5,"2020-09-17 15:12:10"
1500100001,1000003,137
1500100001,1000004,29
1500100001,1000005,85
1500100001,1000006,52
1500100002,1000001,139
1500100002,1000002,102

附件数据(3)

-- 维表在数据 mysql中       无法发现维表更新
CREATE TABLE student_mysql (
 id String,
 name String,
 age int,
 gender STRING,
 clazz STRING
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf-8&useSSL=false',
   'table-name' = 'student',
   'username' = 'root',
   'password'= '123456'
)


-- 流表数据再kafka中
CREATE TABLE score_join1 (
 s_id String,
 c_id String,
 sco int
) WITH (
 'connector' = 'kafka',
 'topic' = 'score_join',
 'properties.bootstrap.servers' = 'master:9092',
 'properties.group.id' = 'asdasdasd',
 'format' = 'csv',
 'scan.startup.mode' = 'latest-offset'
)

select a.id,a.name,b.c_id,b.sco from score_join1 b left join student_mysql a 
on a.id=b.s_id

kafka-console-producer.sh --broker-list master:9092 --topic score_join

1500100001,1000001,98
1500100002,1000002,5
1500100001,1000003,137
1500100001,1000004,29


---使用mysqlcdc关联维表
-- 可以实时发现维表更新
需要将flink-sql-connector-mysql-cdc-1.1.0 上传到flink 的lib目录下

mysql cdc  
1、先进行全量表读取
2、再通过监控  mysql 的binlog 日志实时读取新的数据

CREATE TABLE student_mysql_cdc (
 id String,
 name String,
 age int,
 gender STRING,
 clazz STRING
) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = 'master',
 'port' = '3306',
 'username' = 'root',
 'password' = '123456',
 'database-name' = 'student',
 'table-name' = 'student'
)




select a.id,a.name,b.c_id,b.sco from score_join1 b left join student_mysql_cdc a 
on a.id=b.s_id

kafka-console-producer.sh --broker-list master:9092 --topic score_join

1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137
1500100001,1000004,29
1500100001,1000005,85
1500100001,1000006,52
1500100002,1000001,139
1500100002,1000002,102
1500100003,1000002,102
1,1000002,102
2,1000002,102
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-03-15 22:37:16  更:2022-03-15 22:38:11 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 17:45:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码