IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Flink1.14.3 Table读写MySQL做数据聚合(1) -> 正文阅读

[大数据]Flink1.14.3 Table读写MySQL做数据聚合(1)

摘要

  1. 使用JDBC SQL Connector,Source只支持批处理,Sink支持批处理和流处理。
  2. Sink支持数据追加和更新,如果Flink Table API做聚合操作,使用Sink必须指定指定主键。
  3. 本案例独家使用Flink Table API(非SQL)方式读写MySQL,官网只讲解了SQL的使用方式。

1 需求

需求:Flink Table API从MySQL读取数据,然后做聚合操作,最后将聚合结果写入MySQL。

2 添加Maven依赖

FlinkTable集成MySQL需引?如下依赖:

<dependency>

??<groupId>org.apache.flink</groupId>

??<artifactId>flink-connector-jdbc_2.11</artifactId>

??<version>${flink.version}</version>

</dependency>

<dependency>

??<groupId>mysql</groupId>

??<artifactId>mysql-connector-java</artifactId>

??<version>5.1.38</version>

</dependency>

3 准备MySQL数据源

在MySQL的test数据库中,创建clicklog表并导入初始数据集。

DROP TABLE IF EXISTS `clicklog`;

CREATE TABLE `clicklog` (

??`user` varchar(20) NOT NULL,

??`url` varchar(100) NOT NULL,

??`cTime` varchar(30) NOT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

insert ?into `clicklog`(`user`,`url`,`cTime`) values ('Mary','./home','2022-02-02 12:00:00'),('Bob','./cart','2022-02-02 12:00:00'),('Mary','./prod?id=1','2022-02-02 12:00:05');

4 代码实现

Flink Table API读写MySQL的完整代码如下所示。

package com.bigdata.chap02;

import org.apache.flink.table.api.*;

import static org.apache.flink.table.api.Expressions.$;

public class FlinkTableAPIMySQL2MySQL {

public static void main(String[] args) {

//1、创建TableEnvironment

EnvironmentSettings settings = EnvironmentSettings

.newInstance()

.build();

TableEnvironment tEnv = TableEnvironment.create(settings);

//2、创建Mysql source table

Schema sourceschema = Schema.newBuilder()

//.primaryKey("user")

.column("user", DataTypes.STRING())

.column("url", DataTypes.STRING())

.column("cTime", DataTypes.STRING())

.build();

tEnv.createTemporaryTable("sourceTable", TableDescriptor.forConnector("jdbc")

.schema(sourceschema)

.option("url","jdbc:mysql://hadoop1:3306/test")

.option("driver","com.mysql.jdbc.Driver")

.option("table-name","clicklog")

.option("username","hive")

.option("password","hive")

.build());

tEnv.from("sourceTable").printSchema();

//3、创建MySQL sink table

Schema sinkschema = Schema.newBuilder()

//通过notNull()指定主键为非空

.column("username",DataTypes.STRING().notNull())

.column("count", DataTypes.BIGINT())

//指定主键

.primaryKey("username")

.build();

tEnv.createTemporaryTable("sinkTable", TableDescriptor.forConnector("jdbc")

.schema(sinkschema)

.option("url","jdbc:mysql://hadoop1:3306/test")

.option("driver","com.mysql.jdbc.Driver")

.option("table-name","clickcount")

.option("username","hive")

.option("password","hive")

.build());

//5、输出

Table reusltTable = tEnv.from("sourceTable")

.groupBy($("user"))

.aggregate($("url").count().as("count"))

.select($("user").as("username"), $("count"))

;

reusltTable.printSchema();

reusltTable.executeInsert("sinkTable");

}

}

备注:Flink Table API做聚合操作插入MySQL,必须指定主键(.primaryKey("username")),同时必须指定主键为非空(.column("username",DataTypes.STRING().notNull()))

5 MySQL业务建表

在MySQL的test数据库中,创建clickcount表用于Flink Table的聚合数据。

DROP TABLE IF EXISTS `clickcount`;

CREATE TABLE `clickcount` (

??`username` varchar(20) NOT NULL DEFAULT '',

??`count` int(11) DEFAULT NULL,

??PRIMARY KEY (`username`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

注意:如果clickcount表需要做更新操作,需要指定主键(primary key),如username。

6 测试运行

打开MySQL连接工具,查询clickcount表中的数据,如果聚合数据能插入clickcount表,说明Flink Table API能成功将聚合数据写入MySQL数据库。

7 注意事项

注意:

  1. 使用JDBC SQL connector过程中,作为source只支持批处理,作为sink既可以用于批处理又可以用于流处理。
  2. Sink支持数据的追加和更新,如果Flink Table API做聚合操作,使用sink更新聚合数据,必须指定指定主键。
  3. 本案例独家使用Flink Table API(非SQL)方式读写MySQL,官网只讲解了SQL的使用方式。
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-30 08:47:05  更:2022-04-30 08:48:00 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 9:59:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码