1、流表join (1)Regular Joins(inner join和left left一样) 两个流一致持续,一个流很久以后出现一个数据,只要id可以关联就会关联,这样表中的所有数据都会保存状态,随着数据变大,不适合 (2)Interval Joins (时间间隔join) 不会保留大量的状态(双流join) —只会关联这一段时间的join数据 —a只会关联b时间到b-5之间数据(b-5<a<b) (3)流表和维表关联 —维表有时也会更新,但是维表是一个有界流一下就读取完成了
这时需要使用第三方工具mysqlcdc —使用mysqlcdc关联维表 – 可以实时发现维表更新 需要将flink-sql-connector-mysql-cdc-1.1.0 上传到flink 的lib目录下
mysql cdc
1)、先进行全量表读取 2)、再通过监控 mysql 的binlog 日志实时读取新的数据
附件数据(1)
1、Regular Joins
问题:两边表的数据都会以flink状态的形式保存起来,如果表持续增长,会导致flink状态放不下,出问题
CREATE TABLE student_join (
id String,
name String,
age int,
gender STRING,
clazz STRING
) WITH (
'connector' = 'kafka',
'topic' = 'student_join',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
)
CREATE TABLE score_join (
s_id String,
c_id String,
sco int
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
)
select a.id,a.name,b.c_id,b.sco from student_join a inner join score_join b on a.id=b.s_id
select a.id,a.name,b.c_id,b.sco from student_join a left join score_join b on a.id=b.s_id
kafka-console-producer.sh --broker-list master:9092 --topic student_join
1500100001,施笑槐,22,女,文科六班
1500100002,吕金鹏,24,男,文科六班
1500100003,单乐蕊,22,女,理科六班
kafka-console-producer.sh --broker-list master:9092 --topic score_join
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137
1500100001,1000004,29
1500100001,1000005,85
1500100001,1000006,52
1500100002,1000001,139
1500100002,1000002,102
附件数据(2)
CREATE TABLE student_Interval_join (
id String,
name String,
age int,
gender STRING,
clazz STRING,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'student_join',
'properties.bootstrap.servers' = 'node1:9092,node2:9092,master:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
)
CREATE TABLE score_Interval_join (
s_id String,
c_id String,
sco int,
ts TIMESTAMP(3),
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'node1:9092,node2:9092,master:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
)
select a.id,a.name,b.c_id,b.sco from
student_Interval_join a , score_Interval_join b
WHERE a.id=b.s_id and a.ts BETWEEN b.ts - INTERVAL '5' SECOND AND b.ts
kafka-console-producer.sh --broker-list master:9092 --topic student_join
1500100001,施笑槐,22,女,文科六班,"2020-09-17 15:12:20"
1500100002,吕金鹏,24,男,文科六班,"2020-09-17 15:12:20"
1500100002,吕金鹏,24,男,文科六班,"2020-09-17 15:12:36"
1500100003,单乐蕊,22,女,理科六班
kafka-console-producer.sh --broker-list master:9092 --topic score_join
1500100001,1000001,98,"2020-09-17 15:12:22"
1500100001,1000002,5,"2020-09-17 15:12:23"
1500100001,1000002,5,"2020-09-17 15:12:40"
1500100002,1000002,5,"2020-09-17 15:12:10"
1500100001,1000003,137
1500100001,1000004,29
1500100001,1000005,85
1500100001,1000006,52
1500100002,1000001,139
1500100002,1000002,102
附件数据(3)
-- 维表在数据 mysql中 无法发现维表更新
CREATE TABLE student_mysql (
id String,
name String,
age int,
gender STRING,
clazz STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://master:3306/student?useUnicode=true&characterEncoding=utf-8&useSSL=false',
'table-name' = 'student',
'username' = 'root',
'password'= '123456'
)
-- 流表数据再kafka中
CREATE TABLE score_join1 (
s_id String,
c_id String,
sco int
) WITH (
'connector' = 'kafka',
'topic' = 'score_join',
'properties.bootstrap.servers' = 'master:9092',
'properties.group.id' = 'asdasdasd',
'format' = 'csv',
'scan.startup.mode' = 'latest-offset'
)
select a.id,a.name,b.c_id,b.sco from score_join1 b left join student_mysql a
on a.id=b.s_id
kafka-console-producer.sh --broker-list master:9092 --topic score_join
1500100001,1000001,98
1500100002,1000002,5
1500100001,1000003,137
1500100001,1000004,29
---使用mysqlcdc关联维表
-- 可以实时发现维表更新
需要将flink-sql-connector-mysql-cdc-1.1.0 上传到flink 的lib目录下
mysql cdc
1、先进行全量表读取
2、再通过监控 mysql 的binlog 日志实时读取新的数据
CREATE TABLE student_mysql_cdc (
id String,
name String,
age int,
gender STRING,
clazz STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'master',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'student',
'table-name' = 'student'
)
select a.id,a.name,b.c_id,b.sco from score_join1 b left join student_mysql_cdc a
on a.id=b.s_id
kafka-console-producer.sh --broker-list master:9092 --topic score_join
1500100001,1000001,98
1500100001,1000002,5
1500100001,1000003,137
1500100001,1000004,29
1500100001,1000005,85
1500100001,1000006,52
1500100002,1000001,139
1500100002,1000002,102
1500100003,1000002,102
1,1000002,102
2,1000002,102
|