持续输出 敬请关注 大数据架构 ?湖仓一体化 ?流批一体 离线+实时数仓? 各种大数据解决方案 ?各种大数据新技术实践 持续输出 ?敬请关注
【深度好文】Flink SQL流批?体化技术详解(一)_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/124347634?spm=1001.2014.3001.5501【深度好文】Flink SQL流批?体化技术详解(二)_大数据研习社的博客-CSDN博客持续输出 敬请关注大数据架构 湖仓一体化 流批一体 离线+实时数仓各种大数据解决方案 各种大数据新技术实践持续输出 敬请关注https://blog.csdn.net/dajiangtai007/article/details/124454378?spm=1001.2014.3001.5501
目录
持续输出 敬请关注大数据架构 ?湖仓一体化 ?流批一体 离线+实时数仓?各种大数据解决方案 ?各种大数据新技术实践持续输出 ?敬请关注
【紧接上篇】
2.5 执?查询
2.5.1 Table API查询
2.5.2 SQL查询
2.5.3 混?Table API和SQL
2.6?Table API输出
2.7?使?聚合操作
2.8?Table API输?输出
2.8.1 内置Connectors
2.8.2 表Format
【紧接上篇】
2.5 执?查询
? ? ? ? 执?查询可以使?Table API或者SQL。
2.5.1 Table API查询
? ? ? ? Table API 查询是通过?步?步的?法调?来完成的。
TableEnvironment tableEnv = ...;
//TableEnvironment执?查询
Table orders = tableEnv.from("Orders");
//在Table上执?查询(链式调?)
Table revenue = orders
.filter($("cCountry").isEqual("FRANCE"))
.groupBy($("cID"), $("cName"))
.select($("cID"), $("cName"), $("revenue").sum().as("revSum"));
2.5.2 SQL查询
? ? ? ? Flink基于ApacheCalcite实现了标准的SQL?持。在 Flink 中,?常规字符串来定义 SQL 查询语句。 SQL 查询的结果,是?个新的 Table。
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT cID, cName, SUM(revenue) AS revSum " +
"FROM Orders " +
"WHERE cCountry = 'FRANCE' " +
"GROUP BY cID, cName"
);
// emit or convert Table
// execute query
2.5.3 混?Table API和SQL
? ? ? ? Table API和SQL查询返回的结构都是 Table 对象,所以可以轻松的混?:
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()//Flink1.14开始就删除了其他的执?器了,只保留了BlinkPlanner
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table: 1)读取外部表; 2)从Table API或者SQL查询结果创建表
Table projTable = tEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("user", DataTypes.STRING()),
DataTypes.FIELD("url", DataTypes.STRING()),
DataTypes.FIELD("cTime", DataTypes.STRING())
),
row("Mary", "./home","2022-02-02 12:00:00"),
row("Bob", "./cart","2022-02-02 12:00:00"),
row("Mary", "./prod?id=1","2022-02-02 12:00:05"),
row("Liz", "./home","2022-02-02 12:01:00"),
row("Bob", "./prod?id=3","2022-02-02 12:01:30"),
row("Mary", "./prod?id=7","2022-02-02 12:01:45")
).select($("user"), $("url"),$("cTime"));
//注册表到catalog(可选的)
tEnv.createTemporaryView("sourceTable", projTable);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("user", DataTypes.STRING())
.column("url", DataTypes.STRING())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、 Table API和SQL混?
Table resultTable = tEnv.from("sourceTable").select($("user"), $("url"));
tEnv.createTemporaryView("resultTableView", resultTable);
Table result = tEnv.sqlQuery("select * from resultTableView where user = 'Mary'");
//5、输出(包括执?,不需要单独在调?tEnv.execute("job"))
result.executeInsert("sinkTable");
2.6?Table API输出
? ? ? ? 表的输出通过将数据写? TableSink 来实现的。 TableSink 是?个通?接?,可以 ?持不同的?件格式、存储数据库和消息队列。 ? ? ? ? 具体的?法是通过 Table.executeInsert() ?法将?个 Table 写? 注册过的 TableSink 中:
//1、创建TableEnvironment
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.useBlinkPlanner()//Flink1.14开始就删除了其他的执?器了,只保留了BlinkPlanner
.inStreamingMode()//默认就是StreamingMode
//.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);
//2、创建source table:这?演示?下Flink内置的datagen Connector
final TableDescriptor sourceDescriptor = TableDescriptor.forConnector("datagen")
.schema(Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build())
.option("rows-per-second", "5")
.option("fields.name.length","10")
.option("fields.age.kind","random")
.option("fields.age.min","1")
.option("fields.age.max","100")
.build();
//注册表到catalog(可选的)
tEnv.createTemporaryTable("sourceTable", sourceDescriptor);
//3、创建sink table
final Schema schema = Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.build();
tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("print")
.schema(schema)
.build());
//4、查询
Table resultTable = tEnv.from("sourceTable");
//5、输出
resultTable.executeInsert("sinkTable");
? ? ? ? 注意: Flink通过Table Connectors提供了?量内置的TableSink可以开箱即?,请参考《 2.4 Table API输?与输出》
2.7?使?聚合操作
? ? ? ? 以下是?个使?聚合查询的例?,这?DataStream和Table API,?家可以只?Table API:
//1、获取Stream执?环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//2、创建表执?环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(1);
//3、读取数据
DataStream<ClickLogs> clickLogs = env.fromElements(
"Mary,./home,2022-02-02 12:00:00",
"Bob,./cart,2022-02-02 12:00:00",
"Mary,./prod?id=1,2022-02-02 12:00:05",
"Liz,./home,2022-02-02 12:01:00",
"Bob,./prod?id=3,2022-02-02 12:01:30",
"Mary,./prod?id=7,2022-02-02 12:01:45"
).map(event -> {
String[] props = event.split(",");
return ClickLogs
.builder()
.user(props[0])
.url(props[1])
.cTime(props[2])
.build();
});
//4、流转换为动态表注意:上?的例?中只能使?撤回流。
Table table = tEnv.fromDataStream(clickLogs);
//5、执?Table API查询/SQL查询
/**
* select
* user,
* count(url) as cnt
* from clicks
* group by user
*/
Table resultTable = table
.groupBy($("user"))
.aggregate($("url").count().as("cnt"))
.select($("user"),$("cnt"));
//6、将Table转换为DataStream
//为false就是要撤回的
DataStream<Tuple2<Boolean,Row>> selectedClickLogs =
tEnv.toRetractStream(resultTable,Row.class);
// DataStream<Row> selectedClickLogs = tEnv.toChangelogStream(resultTable,
Schema.newBuilder()
// .column("user", "STRING")
// .column("cnt", "BIGINT")
// .build());
//7、处理结果:打印/输出
selectedClickLogs.print();
// selectedClickLogs.map(row->{
// return row.getKind().shortString();
// }).print();
//8、执?
env.execute("FlinkTableAggr")
注意:上?的例?中只能使?撤回流。
2.8?Table API输?输出
? ? ? ? Flink Table API和SQL的输?输出都是基于Table的,只要想输?或者输出就是创建Table:
?2.8.1 内置Connectors
? ? ? ? Flink 的 Table API & SQL通过Connectors连接外部系统,并执?批/流?式的读写操作。 Connectors提供了丰富的外部系统连接器,根据source和sink的类型,它们?持不同的格式,例如 CSV、 Avro、 Parquet 或 ORC。 ? ? ? ? ?前?持的内置Connectors如下表所示(截?到Flink 1.14.3):
?注意:如果作为sink?家还要注意?持的输出模式( Append/Retract/Upsert)
? ? ? ? 数据以各种格式存储在不同的存储中( CSV、 Avro、 Parquet 或 ORC等), Flink定义了Format来?持读取不同格式的数据,详?如下链接: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/ ? ? ? ? Flink当前?持的format如下:
?【下一篇】Flink SQL流批?体化技术详解(四)
持续输出 敬请关注 大数据架构 ?湖仓一体化 ?流批一体 离线+实时数仓? 各种大数据解决方案 ?各种大数据新技术实践 持续输出 ?敬请关注
|