11、Table API与SQL
- Flink 对批处理和流处理,提供了统一的上层 API
- Table API 是一套内嵌在 Java 和 Scala 语言中的查询API,它允许以非常直观的方式组合来自一些关系运算符的查询
- Flink 的 SQL 支持基于实现了 SQL 标准的 Apache Calcite
11.1 需要引入的 pom 依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.9.3</version>
</dependency>
11.2、两种planner区别(old/blink)
- 在 Flink 1.9 中会存在两个 Planner:Flink Planner 和 Blink Planner。
- 批流统一:Blink将批处理作业,视为流式处理的特殊情况。所以,blink不支持表和DataSet之间的转换,批处理作业将不转换为DataSet应用程序,而是跟流处理一样,转换为DataStream程序来处理。
- 因为批流统一,Blink planner也不支持BatchTableSource,而使用有界的StreamTableSource代替。
- Blink planner只支持全新的目录,不支持已弃用的ExternalCatalog。
- 旧planner和Blink planner的FilterableTableSource实现不兼容。旧的planner会把PlannerExpressions下推到filterableTableSource中,而blink planner则会把Expressions下推。
- 基于字符串的键值配置选项仅适用于Blink planner。
- PlannerConfig在两个planner中的实现不同。
- Blink planner会将多个sink优化在一个DAG中(仅在TableEnvironment上受支持,而在StreamTableEnvironment上不受支持)。而旧planner的优化总是将每一个sink放在一个新的DAG中,其中所有DAG彼此独立。
- 旧的planner不支持目录统计,而Blink planner支持。
- 两个Planner:flink-table-planner 和 flink-table-planner-blink。
- 两个Bridge:flink-table-api-scala-bridge 和 flink-table-api-java-bridge,从图中可以看出,Flink Planner 和 Blink Planner 都会依赖于具体的 JAVA API,也会依赖于具体的Bridge,通过 Bridge 可以将 API 操作相应的转化为 Scala 的 DataStream、DataSet,或者转化为 JAVA 的 DataStream 或者 Data Set。
11.3、基本程序结构
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val inputStream = env.readTextFile("..\\sensor.txt")
val dataStream = inputStream .map( data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
val settings: EnvironmentSettings = EnvironmentSettings.newInstance()
.useOldPlanner()
.inStreamingMode()
.build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env, settings)
val fTable: Table = tableEnv.fromDataStream(dataStream)
val selectedTable: Table = fTable.select('id, 'temperature) .filter("id = 'sensor_1'")
val selectedStream: DataStream[(String, Double)] = selectedTable .toAppendStream[(String, Double)]
selectedStream.print()
val rTable: Table = tableEnv.registerDataStream("t_SensorReading",dataStream)
var senTable:Table = rTable.sql("select * from t_SensorReading")
val senStream: DataStream[(String, Double)] = senTable.toRetractStream(Row.class)
senStream.print()
env.execute("table test")
11.4. TableEnvironment
- 解析的过程涉及到了三层:Table API/SQL,Blink Planner,Runtime
- 用于统一流和批处理
- Table API是Scala和Java语言集成查询API,可以非常直观的方式组合来自关系算子的查询(e.g. 选择,过滤和连接).
- Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
11.5、表(Table)
- TableEnvironment可以注册目录Catalog,并可以基于Catalog注册表
- 表(Table)是由一个"标示符"(identifier)来指定的,由3部分组成:Catalog名、数据库(database)名和对象名
- 表可以是常规的,也可以是虚拟的(视图,View)
- 常规表(Table)一般可以用来描述外部数据,比如文件、数据库表或消息队列的数据,也可以直接从DataStream转换而来
- 视图(View)可以从现有的表中创建,通常是table API或者SQL查询的一个结果集
11.5.1 创建 TableEnvironment
TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
- 在内部的 catalog 中注册
Table - 注册外部的 catalog
- 加载可插拔模块
- 执行 SQL 查询
- 注册自定义函数 (scalar、table 或 aggregation)
- 将
DataStream 或 DataSet 转换成 Table - 持有对
ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 中创建。
11.5.2 动态表
如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table
tableEnv.fromDataStream(dataStream)
或者根据字段顺序单独命名
tableEnv.fromDataStream(dataStream,’id,’timestamp .......)
最后的动态表可以转换为流进行输出
table.toAppendStream[(String,String)]
11.5.2 注册表
如果流中的数据类型是 case class 可以直接根据 case class 的结构生成 table
tableEnv.registerDataStrem("t_table",dataStream);
最后转换为留输出
tableEnv.toRetractStream(table,Row.class);
11.5.4 创建视图
tableEnv.createTemporaryView("t_table",dataStream);
tableEnv.toAppendStream(table, Row.class).print();
11.5.4 创建零时表
tableEnv.createTemporaryTable("inputTable",dataStream);
tableEnv.toAppendStream(inputTable, Row.class).print();
11.6. TableAPI语法
11.6.1. Scan, Projection, and Filter
11.6.2. Column Operations
- addColumns
- AddOrReplaceColumns
- DropColumns
- Renamecolumns
11.6.3. Aggregations
- groupBy Aggregation
- groupBy window aggregation
- over window aggregation Streaming
- distinct aggregation
- distinct
11.6.4. Joins
- inner join
- outer join
- time-windowed join
- joinLateral
- leftouterjoinLateral
11.6.5. Set Operations
- union
- unionAll
- intersect
- intersectAll
- minus
- minusAll
- in
11.6.6. OrderBy, Offset & Fetch
11.6.7. Insert
11.6.8. Window
11.7. FlinkSQL 语法
- Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite解析 SQL ,Calcite 支持标准的 ANSI SQL。
11.7.1. FlinkSQL常用算子
-
SELECT 用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。 -
WHERE 用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。 -
DISTINCT 用于从数据集/流中去重根据 SELECT 的结果进行去重。 -
GROUP BY 是对数据进行分组操作。 -
UNION 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。 -
JOIN 用于把来自两个表的数据联合起来形成结果表,Flink 支持的 JOIN 类型包括:
-
JOIN - INNER JOIN -
LEFT JOIN - LEFT OUTER JOIN -
RIGHT JOIN - RIGHT OUTER JOIN -
FULL JOIN - FULL OUTER JOIN
11.7.2. Group Window
- 根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种 Bounded Window:
- Tumble,滚动窗口,窗口数据有固定的大小,窗口数据无叠加;
- Hop,滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;
- Session,会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。
11.7.3. Flink的内置函数
|