1、流处理中的表
在 Flink 中使用表和 SQL基本上跟其它场景是一样的;不过对于表和流的转换,却稍显复杂。当我们将一个 Table 转换成 DataStream 时,有“**仅插入流”(Insert-Only Streams)和“更新日志流”(Changelog Streams)**两种不同的方式,具体使用哪种方式取决于表中是否存在更新(update)操作。
这种麻烦其实是不可避免的。Table API 和 SQL 本质上都是基于关系型表的操作方式;而关系型表(Table)本身是有界的,更适合批处理的场景。所以在 MySQL、Hive这样的固定数据集中进行查询,使用 SQL 就会显得得心应手。
而对于 Flink 这样的流处理框架来说,要处理的是源源不断到来的无界数据流,我们无法等到数据都到齐再做查询,每来一条数据就应该更新一次结果;这时如果一定要使用表和 SQL 进行处理,就会显得有些别扭了,需要引入一些特殊的概念。我们可以将关系型表/SQL 与流处理做一个对比
关系型数据库的查询和流查询的对比
关系代数 / SQL | 流处理 | 关系(或表)是有界(多)元组集合。 | 流是一个无限元组序列。 | 对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。 | 流式查询在启动时不能访问所有数据,必须“等待”数据流入。 | 批处理查询在产生固定大小的结果后终止。 | 流查询不断地根据接收到的记录更新其结果,并且始终不会结束。 |
2、动态表和持续查询
流处理面对的数据是连续不断的,这导致了流处理中的“表”跟我们熟悉的关系型数据库中的表完全不同;而基于表执行的查询操作,也就有了新的含义。
如果我们希望把流数据转换成表的形式,那么这表中的数据就会不断增长;
如果进一步基于表执行 SQL 查询,那么得到的结果就不是一成不变的,而是会随着新数据的到来持续更新。
1.1 动态表(Dynamic Tables)
当流中有新数据到来,初始的表中会插入一行;而基于这个表定义的 SQL 查询,就应该在之前的基础上更新结果。这样得到的表就会不断地动态变化,被称为“动态表”(Dynamic Tables)。
动态表是Flink在Table API和SQL中的核心概念,它为流数据处理提供了表和SQL支持。
我们所熟悉的表一般用来做批处理,面向的是固定的数据集,可以认为是“静态表”;而动态表则完全不同,它里面的数据会随时间变化。
其实动态表的概念,我们在传统的关系型数据库中已经有所接触。数据库中的表,其实是一系列?INSERT、UPDATE?和?DELETE?语句执行的结果;在关系型数据库中,我们一般把它称为更新日志流(changelog stream)。
如果我们保存了表在某一时刻的快照(snapshot),那么接下来只要读取更新日志流,就可以得到表之后的变化过程和最终结果了。在很多高级关系型数据库(比如 Oracle、DB2)中都有“物化视图”(Materialized Views)的概念,可以用来缓存 SQL 查询的结果;它的更新其实就是不停地处理更新日志流的过程。Flink 中的动态表,就借鉴了物化视图的思想。
2.2 持续查询(Continuous Query)
动态表可以像静态的批处理表一样进行查询操作。由于数据在不断变化,因此基于它定义的 SQL 查询也不可能执行一次就得到最终结果。这样一来,我们对动态表的查询也就永远不会停止,一直在随着新数据的到来而继续执行。这样的查询就被称作**“持续查询”(Continuous Query)**。
对动态表定义的查询操作,都是持续查询;而持续查询的结果也会是一个动态表。由于每次数据到来都会触发查询操作,因此可以认为一次查询面对的数据集,就是当前输入动态表中收到的所有数据。这相当于是对输入动态表做了一个“快照”(snapshot),当作有限数据集进行批处理;流式数据的到来会触发连续不断的快照查询,像动画一样连贯起来,就构成了“持续查询”。
持续查询
持续查询的步骤如下:
(1)流(stream)被转换为动态表(dynamic table);
(2)对动态表进行持续查询(continuous query),生成新的动态表;
(3)生成的动态表被转换成流。
这样,只要 API 将流和动态表的转换封装起来,我们就可以直接在数据流上执行 SQL 查询,用处理表的方式来做流处理了。
3、将流转换成动态表
为了能够使用 SQL 来做流处理,我们必须先把流(stream)转换成动态表。
如果把流看作一张表,那么流中每个数据的到来,都应该看作是对表的一次插入(Insert)操作,会在表的末尾添加一行数据。因为流是连续不断的,而且之前的输出结果无法改变、只能在后面追加;所以我们其实是通过一个只有**插入操作(insert-only)的更新日志(changelog)**流,来构建一个表。
public class DynamicTable_01 { public static void main(String[] args) throws Exception { // 获取流环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取数据源 SingleOutputStreamOperator<Event> eventStream = env .fromElements( new Event("Alice", "./home", 1000L), new Event("Bob", "./cart", 1000L), new Event("Alice", "./prod?id=1", 5 * 1000L), new Event("Cary", "./home", 60 * 1000L), new Event("Bob", "./prod?id=3", 90 * 1000L), new Event("Alice", "./prod?id=7", 105 * 1000L) ); // 获取表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将数据流转换成表 tableEnv.createTemporaryView("EventTable", eventStream, $("user"), $("url"), $("ts")); // 统计每个用户的点击次数 Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user"); // 将表转换成数据流,在控制台打印输出 tableEnv.toChangelogStream(urlCountTable).print("count"); // 执行程序 env.execute(); }}
count> +I[Alice, 1] # 新增count> +I[Bob, 1] # 新增count> -U[Alice, 1] # 删除上一条 alicecount> +U[Alice, 2] # 新增一条 alice,并且增加count值count> +I[Cary, 1] # 新增count> -U[Bob, 1] # 删除上一条 Bobcount> +U[Bob, 2] # 新增一条 Bob,并且增加count值count> -U[Alice, 2] count> +U[Alice, 3]
我们现在的输入数据,就是用户在网站上的点击访问行为,数据类型被包装为 POJO 类型Event。我们将它转换成一个动态表,注册为 EventTable。
表中的字段定义如下:
[user: VARCHAR, // 用户名url: VARCHAR, // 用户访问的 URLts: BIGINT // 时间戳]
当用户点击事件到来时,就对应着动态表中的一次插入(Insert)操作,每条数据就是表中的一行;随着插入更多的点击事件,得到的动态表将不断增长。
点击事件流转换成动态表
![图片](https://mmbiz.qpic.cn/mmbiz_png/3xUEcImibkwLG4BcHuZjBOm8l1hlQcBaNOsKNWc7xEzYia1l5NGqUBKBBauDXd4edkFEeBrBxBqY4tEmzGYtIAkw/640?wx_fmt=png)
### 4、 用 SQL 持续查询
#### 4.1 更新(Update)查询
我们在代码中定义了一个 SQL 查询。
Table urlCountTable = tableEnv.sqlQuery(“SELECT?user,?COUNT(url)?as?cnt?FROM EventTable?GROUP?BY?user”);
这个查询很简单,主要是分组聚合统计每个用户的点击次数。我们把原始的动态表注册EventTable,经过查询转换后得到 urlCountTable;这个结果动态表中包含两个字段,具体定义如下:
[user: VARCHAR, // 用户名cnt: BIGINT // 用户访问 url 的次数]
当原始动态表不停地插入新的数据时,**查询得到的 urlCountTable 会持续地进行更改**。由于 count 数量可能会叠加增长,因此这里的更改操作可以是简单的**插入(Insert)**,也可以是对之前数据的更新**(Update)**。
换句话说,用来定义结果表的更新日志(changelog)流中,包含了 **INSERT** 和 **UPDATE** 两种操作。**这种持续查询被称为更新查询(Update Query)**,更新查询得到的结果表如果想要转换成 DataStream,必须调用 **t****oChangelogStream()**方法。
查询结果表的插入与更新
![图片](https://mmbiz.qpic.cn/mmbiz_png/3xUEcImibkwLG4BcHuZjBOm8l1hlQcBaNer0OWhoymRftQRIlgba2D6gsKSplT7uDPGCVbDE7fOV9ic98foryppA/640?wx_fmt=png)
具体步骤解释如下:
(1)当查询启动时,原始动态表 EventTable 为空;
(2)当第一行 Alice 的点击数据插入 EventTable 表时,查询开始计算结果表,urlCountTable中插入一行数据\[Alice,1\]。
(3)当第二行 Bob 点击数据插入 EventTable 表时,查询将更新结果表并插入新行\[Bob,1\]。
(4)第三行数据到来,同样是 Alice 的点击事件,这时不会插入新行,而是生成一个针对已有行的更新操作。这样,结果表中第一行\[Alice,1\]就更新为\[Alice,2\]。
(5)当第四行 Cary 的点击数据插入到 EventTable 表时,查询将第三行\[Cary,1\]插入到结果表中。
#### 4.2 追加(Append)查询
上面的例子中,查询过程用到了分组聚合,结果表中就会产生更新操作。如果我们执行一个简单的条件查询,结果表中就会像原始表 EventTable 一样,只有插入(Insert)操作了。
Table aliceVisitTable = tableEnv.sqlQuery(“SELECT url, user FROM EventTable WHERE user = ‘Cary’”);
这样的持续查询,就被称为追加查询(Append Query),它定义的结果表的更新日志(changelog)流中**只有 INSERT** 操作。追加查询得到的结果表,转换成 DataStream 调用方法没有限制,可以直接用 **toDataStream()**,也可以像更新查询一样调用 **toChangelogStream()****。**
这样看来,我们似乎可以总结一个规律:只要用到了聚合,在之前的结果上有叠加,就会产生更新操作,就是一个更新查询。但事实上,更新查询的判断标准是结果表中的数据是否会有 UPDATE 操作,如果聚合的结果不再改变,那么同样也不是更新查询。
**什么时候聚合的结果会保持不变呢?一个典型的例子就是窗口聚合。**
我们考虑开一个滚动窗口,统计每一小时内所有用户的点击次数,并在结果表中增加一个endT 字段,表示当前统计窗口的结束时间。这时结果表的字段定义如下:
[user: VARCHAR, // 用户名endT: TIMESTAMP, // 窗口结束时间cnt: BIGINT // 用户访问 url 的次数]
与之前的分组聚合一样,当原始动态表不停地插入新的数据时,查询得到的结果 result 会持续地进行更改。比如时间戳在 12:00:00 到 12:59:59 之间的有四条数据,其中 Alice 三次点击、Bob 一次点击;所以当水位线达到 13:00:00 时窗口关闭,输出到结果表中的就是新增两条数据\[Alice, 13:00:00, 3\]和\[Bob, 13:00:00, 1\]。同理,当下一小时的窗口关闭时,也会将统计结果追加到 result 表后面,而不会更新之前的数据。
窗口聚合结果表的变化
![图片](https://mmbiz.qpic.cn/mmbiz_png/3xUEcImibkwLG4BcHuZjBOm8l1hlQcBaNbHZ4BAcoGp1fvVZ0xeMw0ibvwcbtTbkIrHG9eSKrKxexICECic6Klj3g/640?wx_fmt=png)
所以我们发现,由于窗口的统计结果是一次性写入结果表的,所以结果表的更新日志流中只会包含插入 INSERT 操作,而没有更新 UPDATE 操作。所以这里的持续查询,依然是一个**追加(Append)查询**。结果表 result 如果转换成 DataStream,可以直接调用 toDataStream()方法。
需要注意的是,由于涉及时间窗口,我们还需要为事件时间提取时间戳和生成水位线。
public class DynamicTable_02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 读取数据源,并分配时间戳、生成水位线 SingleOutputStreamOperator eventStream = env .fromElements( new Event(“Alice”, “./home”, 1000L), new Event(“Bob”, “./cart”, 1000L), new Event(“Alice”, “./prod?id=1”, 25 * 60 * 1000L), new Event(“Alice”, “./prod?id=4”, 55 * 60 * 1000L), new Event(“Bob”, “./prod?id=5”, 3600 * 1000L + 60 * 1000L), new Event(“Cary”, “./home”, 3600 * 1000L + 30 * 60 * 1000L), new Event(“Cary”, “./prod?id=7”, 3600 * 1000L + 59 * 60 * 1000L) ) .assignTimestampsAndWatermarks( WatermarkStrategy.forMonotonousTimestamps() .withTimestampAssigner( new SerializableTimestampAssigner() { @Override public long extractTimestamp(Event element, long recordTimestamp) { return element.ts; } }) ); // 创建表环境 StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 将数据流转换成表,并指定时间属性 Table eventTable = tableEnv.fromDataStream( eventStream, $(“user”), $(“url”), $(“ts”).rowtime() ); // 为方便在 SQL 中引用,在环境中注册表 EventTable tableEnv.createTemporaryView(“EventTable”, eventTable); // 设置 1 小时滚动窗口,执行 SQL 统计查询 Table result = tableEnv .sqlQuery( "SELECT " + "user, " + "window_end AS endT, " + // 窗口结束时间 "COUNT(url) AS cnt " + // 统计 url 访问次数 "FROM TABLE( " + "TUMBLE( TABLE EventTable,DESCRIPTOR(ts),INTERVAL ‘1’ HOUR)) " + // 1 小时滚动窗口 "GROUP BY user, window_start, window_end " ); tableEnv.toDataStream(result).print(); env.execute(); }}
运行结果如下:
+I[Alice, 1970-01-01T01:00, 3]+I[Bob, 1970-01-01T01:00, 1]+I[Bob, 1970-01-01T02:00, 1]+I[Cary, 1970-01-01T02:00, 2]
可以看到,所有输出结果都以+I 为前缀,表示都是以 INSERT 操作追加到结果表中的;这是一个追加查询,所以我们直接使用 toDataStream()转换成流是没有问题的。这里输出的window\_end 是一个 TIMESTAMP 类型;由于我们直接以一个长整型数作为事件发生的时间戳,所以可以看到对应的都是 1970 年 1 月 1 日的时间。
#### 4.3 查询限制
在实际应用中,有些持续查询会因为计算代价太高而受到限制。所谓的“代价太高”,可能是由于需要维护的状态持续增长,也可能是由于更新数据的计算太复杂。
* **状态大小**
用持续查询做流处理,往往会运行至少几周到几个月;所以持续查询处理的数据总量可能非常大。
例如我们之前举的更新查询的例子,需要记录每个用户访问 url 的次数。如果随着时间的推移用户数越来越大,那么要维护的状态也将逐渐增长,最终可能会耗尽存储空间导致查询失败。
SELECT user, COUNT(url)FROM clicksGROUP BY user;
* **更新计算**
对于有些查询来说,更新计算的复杂度可能很高。每来一条新的数据,更新结果的时候可能需要全部重新计算,并且对很多已经输出的行进行更新。一个典型的例子就是 RANK()函数,它会基于一组数据计算当前值的排名。
例如下面的 SQL 查询,会根据用户最后一次点击的时间为每个用户计算一个排名。当我们收到一个新的数据,用户的最后一次点击时间(lastAction)就会更新,进而所有用户必须重新排序计算一个新的排名。当一个用户的排名发生改变时,被他超过的那些用户的排名也会改变;这样的更新操作无疑代价巨大,而且还会随着用户的增多越来越严重。
SELECT user, RANK() OVER (ORDER BY lastAction)FROM (SELECT user, MAX(ts) AS lastAction FROM EventTable GROUP BY user);
这样的查询操作,就不太适合作为连续查询在流处理中执行。这里 RANK()的使用要配合一个 OVER 子句,这是所谓的“开窗聚合”。
### 5、将动态表转换为流
与关系型数据库中的表一样,动态表也可以通过插入(Insert)、更新(Update)和删除(Delete)操作,进行持续的更改。将动态表转换为流或将其写入外部系统时,就需要对这些更改操作进行编码,通过发送编码消息的方式告诉外部系统要执行的操作。在 Flink 中,Table API 和 SQL支持**三种编码**方式:
#### 5.1 仅追加(Append-only)流
仅通过插入(Insert)更改来修改的动态表,可以直接转换为“仅追加”流。这个流中发出的数据,其实就是动态表中新增的每一行。
#### 5.2 撤回(Retract)流
撤回流是包含两类消息的流,添加(add)消息和撤回(retract)消息。
具体的编码规则是:
* INSERT 插入操作编码为 add 消息;
* DELETE 删除操作编码为 retract消息;
* UPDATE 更新操作则编码为被更改行的 retract 消息,和更新后行(新行)的 add 消息。
这样,我们可以通过编码后的消息指明所有的增删改操作,一个动态表就可以转换为撤回流了。
可以看到,更新操作对于撤回流来说,对应着两个消息:之前数据的撤回(删除)和新数据的插入。
动态表转换为撤回流
![图片](https://img-blog.csdnimg.cn/img_convert/87ca0aafbefebcd9089ede9577c3aea4.png)
用+代表 add 消息(对应插入 INSERT 操作)
用-代表 retract 消息(对应删除DELETE 操作)
当 Alice 的第一个点击事件到来时,结果表新增一条数据\[Alice, 1\];而当 Alice的第二个点击事件到来时,结果表会将\[Alice, 1\]更新为\[Alice, 2\],对应的编码就是删除\[Alice, 1\]、插入\[Alice, 2\]。这样当一个外部系统收到这样的两条消息时,就知道是要对 Alice 的点击统计次数进行更新了。
#### 5.3 更新插入(Upsert)流
更新插入流中只包含两种类型的消息:**更新插入(upsert)消息和删除(delete)消息。**
所谓的“upsert”其实是“update”和“insert”的合成词,所以对于更新插入流来说,
INSERT 插入操作和UPDATE更新操作,统一被编码为upsert消息;而DELETE删除操作则被编码为delete消息。
既然更新插入流中不区分插入(insert)和更新(update),那我们自然会想到一个问题:如果希望更新一行数据时,怎么保证最后做的操作不是插入呢?
这就需要动态表中**必须有唯一的键(key)**。通过这个 key 进行查询,如果存在对应的数据就做更新(update),如果不存在就直接插入(insert)。这是一个动态表可以转换为更新插入流的必要条件。当然,收到这条流中数据的外部系统,也需要知道这唯一的键(key),这样才能正确地处理消息。
动态表转换为更新插入流
![图片](https://img-blog.csdnimg.cn/img_convert/df0f64759501b0f8977c0f047695e229.png)
可以看到,更新插入流跟撤回流的主要区别在于,更新(update)操作由于有 key 的存在,只需要用单条消息编码就可以,因此效率更高。
需要注意的是,在代码里将动态表转换为 DataStream 时,只支持仅追加(append-only)和撤回(retract)流,我们调用 toChangelogStream()得到的其实就是撤回流;这也很好理解,DataStream 中并没有 key 的定义,所以只能通过两条消息一减一增来表示更新操作。而**连接到外部系统时,则可以支持不同的编码方法,这取决于外部系统本身的特性。**
![图片](https://img-blog.csdnimg.cn/img_convert/1616483199e310418de6d8e3ed35d25d.gif)
|