| |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
-> 大数据 -> Flink1.14.3 Table读写MySQL做数据聚合(1) -> 正文阅读 |
|
[大数据]Flink1.14.3 Table读写MySQL做数据聚合(1) |
1 需求需求:Flink Table API从MySQL读取数据,然后做聚合操作,最后将聚合结果写入MySQL。 2 添加Maven依赖FlinkTable集成MySQL需引?如下依赖: <dependency> ??<groupId>org.apache.flink</groupId> ??<artifactId>flink-connector-jdbc_2.11</artifactId> ??<version>${flink.version}</version> </dependency> <dependency> ??<groupId>mysql</groupId> ??<artifactId>mysql-connector-java</artifactId> ??<version>5.1.38</version> </dependency> 3 准备MySQL数据源在MySQL的test数据库中,创建clicklog表并导入初始数据集。 DROP TABLE IF EXISTS `clicklog`; CREATE TABLE `clicklog` ( ??`user` varchar(20) NOT NULL, ??`url` varchar(100) NOT NULL, ??`cTime` varchar(30) NOT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8; insert ?into `clicklog`(`user`,`url`,`cTime`) values ('Mary','./home','2022-02-02 12:00:00'),('Bob','./cart','2022-02-02 12:00:00'),('Mary','./prod?id=1','2022-02-02 12:00:05'); 4 代码实现Flink Table API读写MySQL的完整代码如下所示。 package com.bigdata.chap02; import org.apache.flink.table.api.*; import static org.apache.flink.table.api.Expressions.$; public class FlinkTableAPIMySQL2MySQL { public static void main(String[] args) { //1、创建TableEnvironment EnvironmentSettings settings = EnvironmentSettings .newInstance() .build(); TableEnvironment tEnv = TableEnvironment.create(settings); //2、创建Mysql source table Schema sourceschema = Schema.newBuilder() //.primaryKey("user") .column("user", DataTypes.STRING()) .column("url", DataTypes.STRING()) .column("cTime", DataTypes.STRING()) .build(); tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("jdbc") .schema(sourceschema) .option("url","jdbc:mysql://hadoop1:3306/test") .option("driver","com.mysql.jdbc.Driver") .option("table-name","clicklog") .option("username","hive") .option("password","hive") .build()); tEnv.from("sourceTable").printSchema(); //3、创建MySQL sink table Schema sinkschema = Schema.newBuilder() //通过notNull()指定主键为非空 .column("username",DataTypes.STRING().notNull()) .column("count", DataTypes.BIGINT()) //指定主键 .primaryKey("username") .build(); tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("jdbc") .schema(sinkschema) .option("url","jdbc:mysql://hadoop1:3306/test") .option("driver","com.mysql.jdbc.Driver") .option("table-name","clickcount") .option("username","hive") .option("password","hive") .build()); //5、输出 Table reusltTable = tEnv.from("sourceTable") .groupBy($("user")) .aggregate($("url").count().as("count")) .select($("user").as("username"), $("count")) ; reusltTable.printSchema(); reusltTable.executeInsert("sinkTable"); } }
5 MySQL业务建表在MySQL的test数据库中,创建clickcount表用于Flink Table的聚合数据。 DROP TABLE IF EXISTS `clickcount`; CREATE TABLE `clickcount` ( ??`username` varchar(20) NOT NULL DEFAULT '', ??`count` int(11) DEFAULT NULL, ??PRIMARY KEY (`username`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
6 测试运行打开MySQL连接工具,查询clickcount表中的数据,如果聚合数据能插入clickcount表,说明Flink Table API能成功将聚合数据写入MySQL数据库。 7 注意事项注意:
|
|
|
上一篇文章 下一篇文章 查看所有文章 |
|
开发:
C++知识库
Java知识库
JavaScript
Python
PHP知识库
人工智能
区块链
大数据
移动开发
嵌入式
开发工具
数据结构与算法
开发测试
游戏开发
网络协议
系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程 数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁 |
360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 | -2024/11/24 1:02:40- |
|
网站联系: qq:121756557 email:121756557@qq.com IT数码 |