| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink中Table API和SQL(四) -> 正文阅读 |
|
[大数据]Flink中Table API和SQL(四) |
目录 11.6 联结(Join)查询按照数据库理论,关系型表的设计往往至少需要满足第三范式(3NF),表中的列都直接依赖于主键,这样就可以避免数据冗余和更新异常。例如商品的订单信息,我们会保存在一个 “订单表”中,而这个表中只有商品 ID,详情则需要到“商品表”按照 ID 去查询;这样的好 处是当商品信息发生变化时,只要更新商品表即可,而不需要在订单表中对所有这个商品的所 有订单进行修改。不过这样一来,我们就无法从一个单独的表中提取所有想要的数据了。 在标准 SQL 中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表 的联结(Join)。在 Flink SQL 中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表。 在流处理中,动态表的 Join 对应着两条数据流的 Join 操作。与上一节的聚合查询类似,Flink SQL 中的联结查询大体上也可以分为两类:SQL 原生的联结查询方式,和流处理中特有的联结查询。 11.6.1 常规联结查询常规联结(Regular Join)是 SQL 中原生定义的 Join 方式,是最通用的一类联结操作。它 的具体语法与标准 SQL 的联结完全相同,通过关键字 JOIN 来联结两个表,后面用关键字 ON来指明联结条件。按照习惯,我们一般以“左侧”和“右侧”来区分联结操作的两个表。 在两个动态表的联结中,任何一侧表的插入(INSERT)或更改(UPDATE)操作都会让 联结的结果表发生改变。例如,如果左侧有新数据到来,那么它会与右侧表中所有之前的数据 进行联结合并,右侧表之后到来的新数据也会与这条数据连接合并。所以,常规联结查询一般 是更新(Update)查询。 与标准 SQL 一致,Flink SQL 的常规联结也可以分为内联结(INNER JOI N)和外联结 (OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。目前仅支持“等值条件” 作为联结条件,也就是关键字 ON 后面必须是判断两表中字段相等的逻辑表达式。 1. 等值内联结(INNER Equi-JOIN) 内联结用 INNER JOIN 来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。 例如之前提到的“订单表”(定义为 Order)和“商品表”(定义为 Product)的联结查询, 就可以用以下 SQL 实现:
这里是一个内联结,联结条件是订单数据的 product_id 和商品数据的 id 相等。由于订单 表中出现的商品id一定会在商品表中出现,因此这样得到的联结结果表,就包含了订单表Order中所有订单数据对应的详细信息。 2. 等值外联结(OUTER Equi-JOIN) 与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL 支持左外(LEFT JOIN)、右外(RIGHT JOIN) 和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。例如,订单表中未必包含了商品表中的所有 ID,为了将哪些没有任何订单的商品信息也查询出来,我们就可以使用右外联结(RIGHT JOIN)。当然,外联结查询目前也仅支持等值联结条件。具体用法如下:
11.6.2 间隔联结查询在 8.3 节中,我们曾经学习过 DataStream API 中的双流 Join,包括窗口联结(window join) 和间隔联结(interval join)。两条流的 Join 就对应着 SQL 中两个表的 Join,这是流处理中特有的联结方式。目前 Flink SQL 还不支持窗口联结,而间隔联结则已经实现。 间隔联结(Interval Join)返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外,还多了一个时间间隔的限制。具体语法有以下要点:
具体定义 方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:
判断两者相等,这是最强的时间约束,要求两表中数据的时间必须完全一致才能匹配;一 般情况下,我们还是会放宽一些,给出一个间隔。间隔的定义可以用<,<=,>=,>这一类的关系不等式,也可以用 BETWEEN ... AND ...这样的表达式。 例如,我们现在除了订单表 Order 外,还有一个“发货表”Shipment,要求在收到订单后四个小时内发货。那么我们就可以用一个间隔联结查询,把所有订单与它对应的发货信息连接合并在一起返回。
在流处理中,间隔联结查询只支持具有时间属性的“仅追加”(Append-only)表。 那对于有更新操作的表,又怎么办呢?除了间隔联结之外,Flink SQL 还支持时间联结 (Temporal Join),这主要是针对“版本表”(versioned table)而言的。所谓版本表,就是记录 了数据随着时间推移版本变化的表,可以理解成一个“更新日志”(change log),它就是具有 时间属性、还会进行更新操作的表。当我们联结某个版本表时,并不是把当前的数据连接合并 起来就行了,而是希望能够根据数据发生的时间,找到当时的“版本”;这种根据更新时间提 取当时的值进行联结的操作,就叫作“时间联结”(Temporal Join)。 11.7 函数在 SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这 就是“函数”(functions)。 Flink 的 Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同:Table API 中 的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串 str 转换成全大写的形式,Table API 的写法是调用 str 这个 String对象的 upperCase()方法:
而 SQL 中的写法就是直接引用 UPPER()函数,将 str 作为参数传入:
由于 Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比 较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较 少使用。下面我们主要介绍 Flink SQL 中函数的使用。 Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用 就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。? 11.7.1 系统函数系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好 的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL 提供了 大量的系统函数,几乎支持所有的标准 SQL 中的操作,这为我们使用 SQL 编写流处理程序提 供了极大的方便。 Flink SQL 中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。 1. 标量函数(Scalar Functions) 所谓的“标量”,是指只有数值大小、没有方向的量;所以标量函数指的就是只对输入数据做转换操作、返回一个值的函数。这里的输入数据对应在表中,一般就是一行数据中 1 个或 多个字段,因此这种操作有点像流处理转换算子中的 map。另外,对于一些没有输入参数、直 接可以得到唯一结果的函数,也属于标量函数。 标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准 SQL 中也有 定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官 网的完整函数列表。 ? 比较函数(Comparison Functions) 比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。 这个比较表达式可以是用 、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。 例如:
? 逻辑函数(Logical Functions) 逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型 的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型 的值。例如:
? 算术函数(Arithmetic Functions) 进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:
? 字符串函数(String Functions) 进行字符串处理的函数。例如:
? 时间函数(Temporal Functions) 进行与时间相关操作的函数。例如:
2. 聚合函数(Aggregate Functions)? ? ?聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。 标准 SQL 中常见的聚合函数 Flink SQL 都是支持的,目前也在不断扩展,为流处理应用 提供更强大的功能。例如:
其中,RANK()和 ROW_NUMBER()一般用在 OVER 窗口中,在之前 11.5.4 小节实现 Top N 的过程中起到了非常重要的作用。 11.7.2 自定义函数(UDF)系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。事实上,系统内置函数仍然在不断扩充,如果我们认为自己实现的自定义函数足够通用、应用非常广泛,也可以在项目跟踪工 具 JIRA 上向 Flink 开发团队提出“议题”(issue),请求将新的函数添加到系统函数中。 Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。 当前 UDF主要有以下几类:
1. 整体调用流程 要想在代码中使用自定义的函数,我们需要首先自定义对应 UDF 抽象类的实现,并在表 环境中注册这个函数,然后就可以在 Table API 和 SQL 中调用了。 (1)注册函数 注册函数时需要调用表环境的 createTemporarySystemFunction()方法,传入注册的函数名 以及 UDF 类的 Class 对象:
我们自定义的 UDF 类叫作 MyFunction,它应该是上面四种 UDF 抽象类中某一个的具体 实现;在环境中将它注册为名叫 MyFunction 的函数。 这里 createTemporarySystemFunction()---全局方法的意思是创建了一个“临时系统函数”,所以MyFunction 函数名是全局的 , 可以当作系统函数来使用; 我们也可以用createTemporaryFunction()---局部方法,注册的函数就依赖于当前的数据库(database)和目录(catalog) 了,所以这就不是系统函数,而是“目录函数”(catalog function),它的完整名称应该包括所属的 database 和 catalog。 一般情况下,我们直接用 createTemporarySystemFunction()方法将 UDF 注册为系统函数就可以了。 (2)使用 Table API 调用函数 在 Table API 中,需要使用 call()方法来调用自定义函数:
这里 call()方法有两个参数,一个是注册好的函数名 MyFunction,另一个则是函数调用时本身的参数。这里我们定义 MyFunction 在调用时,需要传入的参数是 myField 字段。 此外,在 Table API 中也可以不注册函数,直接用“内联”(inline)的方式调用 UDF:
区别只是在于 call()方法第一个参数不再是注册好的函数名,而直接就是函数类的 Class对象了。 (3)在 SQL 中调用函数 当我们将函数注册为系统函数之后,在 SQL 中的调用就与内置系统函数完全一样了:
2. 标量函数(Scalar Functions) 自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数 是“一对一”的转换。 想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类 ScalarFunction,并实现叫作 eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public), 而且名字必须是 eval。求值方法 eval 可以重载多次,任何数据类型都可作为求值方法的参数 和返回值类型。 这里需要特别说明的是,ScalarFunction 抽象类中并没有定义 eval()方法,所以我们不能直 接在代码中重写(override);但 Table API 的框架底层又要求了求值方法必须名字为 eval()。这 是 Table API 和 SQL 目前还显得不够完善的地方,未来的版本应该会有所改进。 ScalarFunction 以及其它所有的 UDF 接口,都在 org.apache.flink.table.functions 中。 实例:简单的标量函数的代码实现(求HashCode):
3. 表函数(Table Functions) 跟标量函数一样,表函数的输入参数也可以是 0 个、1 个或多个标量值;不同的是,它可 以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口 TVF,本质上就是表函数。 类似地,要实现自定义的表函数,需要自定义类来继承抽象类 TableFunction,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction 类本身是有一 个泛型参数T 的,这就是表函数返回数据的类型;而 eval()方法没有返回类型,内部也没有 return语句,是通过调用 collect()方法来发送想要输出的行数据的。 我们使用表函数,可以对一行数据得到一个表,这和 Hive 中的 UDTF 非常相似。那对于 原先输入的整张表来说,又该得到什么呢?一个简单的想法是,就让输入表中的每一行,与它 转换得到的表进行联结(join),然后再拼成一个完整的大表,这就相当于对原来的表进行了 扩展。在 Hive 的 SQL 语法中,提供了“侧向视图”(lateral view,也叫横向视图)的功能,可 以将表中的一行数据拆分成多行;Flink SQL 也有类似的功能,是用 LATERAL TABLE 语法来 实现的。 在 SQL 中调用表函数,需要使用 LATERAL TABLE()来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的 Join 操作可以是直接做交叉联结(cross join), 在 FROM 后用逗号分隔两个表就可以;也可以是以 ON TRUE 为条件的左联结(LEFT JOIN)。 下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数 SplitFunction,可以将 一个字符串转换成(字符串,长度)的二元组。
4. 聚合函数(Aggregate Functions) 用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据 (也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。 聚合函数的概念我们之前已经接触过多次,如 SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定 义聚合函数来实现功能了。 自定义聚合函数需要继承抽象类 AggregateFunction。AggregateFunction 有两个泛型参数
Flink SQL 中的聚合函数的工作原理如下:
所以,每个 AggregateFunction 都必须实现以下几个方法: ? createAccumulator()
? accumulate()
? getValue()
除了上面的方法,还有几个方法是可选的。这些方法有些可以让查询更加高效,有些是在 某些特定场景下必须要实现的。比如,如果是对会话窗口进行聚合,merge()方法就是必须要实现的,它会定义累加器的合并操作,而且这个方法对一些场景的优化也很有用;而如果聚合 函数用在 OVER 窗口聚合中,就必须实现 retract()方法,保证数据可以进行撤回操作;resetAccumulator()方法则是重置累加器,这在一些批处理场景中会比较有用。 AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且 名字必须跟上面写的完全一样。 createAccumulator 、 getValue 、 getResultType 以 及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以 override;而 其他则都是底层架构约定的方法。 下面举一个具体的示例。在常用的系统内置聚合函数里,可以用 AVG()来计算平均值;如 果我们现在希望计算的是某个字段的“加权平均值”,又该怎么做呢?系统函数里没有现成的 实现,所以只能自定义一个聚合函数 WeightedAvg 来计算了。 比如我们要从学生的分数表 ScoreTable 中计算每个学生的加权平均分。为了计算加权平均值,应该从输入的每行数据中提取两个值作为参数:要计算的分数值 score,以及它的权重weight。而在聚合过程中,累加器(accumulator)需要存储当前的加权总和 sum,以及目前数据的个数 count。这可以用一个二元组来表示,也可以单独定义一个类 WeightedAvgAccum, 里面包含 sum 和 count 两个属性,用它的对象实例来作为聚合的累加器。 具体代码如下:
5. 表聚合函数(Table Aggregate Functions) 用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。 自定义表聚合函数需要继承抽象类 TableAggregateFunction。TableAggregateFunction 的结 构和原理与 AggregateFunction 非常类似,同样有两个泛型参数,用一个 ACC 类型的 累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction 中也必须对应实现: ? createAccumulator()
? accumulate()
? emitValue()---需要手动实现 所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着 AggregateFunction中的 getValue()方法;区别在于 emitValue 没有输出类型,而输入参数有两个:第一个是 ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为 Collect。 所以很明显,表聚合函数输出数据不是直接 return,而是调用 out.collect()方法,调用多次就可以输出多行数据了;这一点与表函数非常相似。另外,emitValue()在抽象类中也没有定义,无法 override, 必须手动实现。 表聚合函数得到的是一张表;在流处理中做持续查询,应该每次都会把这个表重新计算输出。如果输入一条数据后,只是对结果表里一行或几行进行了更新(Update),这时我们重新 计算整个表、全部输出显然就不够高效了。为了提高处理效率,TableAggregateFunction 还提 供了一个 emitUpdateWithRetract()方法,它可以在结果表发生变化时,以“撤回”(retract)老数 据、发送新数据的方式增量地进行更新。如果同时定义了 emitValue()和 emitUpdateWithRetract()两个方法,在进行更新操作时会优先调用 emitUpdateWithRetract()。 表聚合函数相对比较复杂,它的一个典型应用场景就是 Top N 查询。比如我们希望选出 一组数据排序后的前两名,这就是最简单的 TOP-2 查询。没有线程的系统函数,那么我们就 可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在 accumulate()方法中进行比较更新,最终在 emitValue()中调用两次out.collect()将前两名数据输出。
|
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 | -2024/11/23 5:57:00- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |