IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 【深度好文】Flink SQL流批?体化技术详解(三) -> 正文阅读

[大数据]【深度好文】Flink SQL流批?体化技术详解(三)

持续输出 敬请关注
大数据架构 ?湖仓一体化 ?流批一体 离线+实时数仓?
各种大数据解决方案 ?各种大数据新技术实践
持续输出 ?敬请关注

【深度好文】Flink SQL流批?体化技术详解(一)_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/124347634?spm=1001.2014.3001.5501【深度好文】Flink SQL流批?体化技术详解(二)_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/124454378?spm=1001.2014.3001.5501

目录

持续输出 敬请关注大数据架构 ?湖仓一体化 ?流批一体 离线+实时数仓?各种大数据解决方案 ?各种大数据新技术实践持续输出 ?敬请关注

【紧接上篇】

2.5 执?查询

2.5.1 Table API查询

2.5.2 SQL查询

2.5.3 混?Table API和SQL

2.6?Table API输出

2.7?使?聚合操作

2.8?Table API输?输出

2.8.1 内置Connectors

2.8.2 表Format

【紧接上篇】

2.5 执?查询

? ? ? ? 执?查询可以使?Table API或者SQL。

2.5.1 Table API查询

? ? ? ? Table API 查询是通过?步?步的?法调?来完成的。

TableEnvironment tableEnv = ...;
//TableEnvironment执?查询
Table orders = tableEnv.from("Orders");
//在Table上执?查询(链式调?)
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName"))
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));

2.5.2 SQL查询

? ? ? ? Flink基于ApacheCalcite实现了标准的SQL?持。在 Flink 中,?常规字符串来定义 SQL 查询语句。 SQL 查询的结果,是?个新的 Table。

// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query

2.5.3 混?Table API和SQL

? ? ? ? Table API和SQL查询返回的结构都是 Table 对象,所以可以轻松的混?:

//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()//Flink1.14开始就删除了其他的执?器了,只保留了BlinkPlanner
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table: 1)读取外部表; 2)从Table API或者SQL查询结果创建表
Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("url", DataTypes.STRING()),
DataTypes.FIELD("cTime", DataTypes.STRING())
),
row("Mary", "./home","2022-02-02 12:00:00"),
row("Bob", "./cart","2022-02-02 12:00:00"),
row("Mary", "./prod?id=1","2022-02-02 12:00:05"),
row("Liz", "./home","2022-02-02 12:01:00"),
row("Bob", "./prod?id=3","2022-02-02 12:01:30"),
row("Mary", "./prod?id=7","2022-02-02 12:01:45")
).select($("user"), $("url"),$("cTime"));
//注册表到catalog(可选的)
tEnv.createTemporaryView("sourceTable", projTable);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("url", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、 Table API和SQL混?
Table resultTable = tEnv.from("sourceTable").select($("user"), $("url"));
tEnv.createTemporaryView("resultTableView", resultTable);
Table result = tEnv.sqlQuery("select * from resultTableView where user = 'Mary'");
//5、输出(包括执?,不需要单独在调?tEnv.execute("job"))
result.executeInsert("sinkTable");

2.6?Table API输出

? ? ? ? 表的输出通过将数据写? TableSink 来实现的。 TableSink 是?个通?接?,可以 ?持不同的?件格式、存储数据库和消息队列。
? ? ? ? 具体的?法是通过 Table.executeInsert() ?法将?个 Table 写? 注册过的 TableSink 中:

//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()//Flink1.14开始就删除了其他的执?器了,只保留了BlinkPlanner
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table:这?演示?下Flink内置的datagen Connector
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build())
.option("rows-per-second", "5")
.option("fields.name.length","10")
.option("fields.age.kind","random")
.option("fields.age.min","1")
.option("fields.age.max","100")
.build();
//注册表到catalog(可选的)
tEnv.createTemporaryTable("sourceTable", sourceDescriptor);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、查询
Table resultTable = tEnv.from("sourceTable");
//5、输出
resultTable.executeInsert("sinkTable");

? ? ? ? 注意: Flink通过Table Connectors提供了?量内置的TableSink可以开箱即?,请参考《 2.4 Table API输?与输出》

2.7?使?聚合操作

? ? ? ? 以下是?个使?聚合查询的例?,这?DataStream和Table API,?家可以只?Table API:

//1、获取Stream执?环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执?环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据
DataStream<ClickLogs> clickLogs = env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"
).map(event -> {
String[] props = event.split(",");
return ClickLogs
.builder()
.user(props[0])
.url(props[1])
.cTime(props[2])
.build();
});
//4、流转换为动态表注意:上?的例?中只能使?撤回流。
Table table = tEnv.fromDataStream(clickLogs);
//5、执?Table API查询/SQL查询
/**
* select
* user,
* count(url) as cnt
* from clicks
* group by user
*/
Table resultTable = table
.groupBy($("user"))
.aggregate($("url").count().as("cnt"))
.select($("user"),$("cnt"));
//6、将Table转换为DataStream
//为false就是要撤回的
DataStream<Tuple2<Boolean,Row>> selectedClickLogs =
tEnv.toRetractStream(resultTable,Row.class);
// DataStream<Row> selectedClickLogs = tEnv.toChangelogStream(resultTable,
Schema.newBuilder()
// .column("user", "STRING")
// .column("cnt", "BIGINT")
// .build());
//7、处理结果:打印/输出
selectedClickLogs.print();
// selectedClickLogs.map(row->{
// return row.getKind().shortString();
// }).print();
//8、执?
env.execute("FlinkTableAggr")

注意:上?的例?中只能使?撤回流。

2.8?Table API输?输出

? ? ? ? Flink Table API和SQL的输?输出都是基于Table的,只要想输?或者输出就是创建Table:

?2.8.1 内置Connectors

? ? ? ? Flink 的 Table API & SQL通过Connectors连接外部系统,并执?批/流?式的读写操作。 Connectors提供了丰富的外部系统连接器,根据source和sink的类型,它们?持不同的格式,例如 CSV、 Avro、 Parquet 或 ORC。
? ? ? ? ?前?持的内置Connectors如下表所示(截?到Flink 1.14.3):

?注意:如果作为sink?家还要注意?持的输出模式( Append/Retract/Upsert)

2.8.2 表Format

? ? ? ? 数据以各种格式存储在不同的存储中( CSV、 Avro、 Parquet 或 ORC等), Flink定义了Format来?持读取不同格式的数据,详?如下链接:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/
? ? ? ? Flink当前?持的format如下:

?【下一篇】Flink SQL流批?体化技术详解(四)

持续输出 敬请关注
大数据架构 ?湖仓一体化 ?流批一体 离线+实时数仓?
各种大数据解决方案 ?各种大数据新技术实践
持续输出 ?敬请关注

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-08 08:10:56  更:2022-05-08 08:12:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:58:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码