IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> FlinkCDC2.0利用FlinkSQL采集MySQL -> 正文阅读

[大数据]FlinkCDC2.0利用FlinkSQL采集MySQL

1.依赖管理

将如下依赖包放到FLINK_HOME/lib下。

flink-sql-connector-mysql-cdc-2.2.0.jar

flink-connector-jdbc_2.11-1.14.3.jar

flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

2.Flink全局配置

修改flink-conf.yaml文件:

execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

execution.checkpointing.max-concurrent-checkpoints: 1

execution.checkpointing.mode: EXACTLY_ONCE

execution.checkpointing.timeout: 10min

state.backend: filesystem

state.checkpoints.dir: hdfs://mycluster/flinkcdc-checkpoints

3.sql-client提交作业模式

1.Standalone模式

启动sql-client:bin/sql-client.sh embedded

注意,如果使用standalone模式运行,需要先启动一个Flink standalone集群,方法如下:

bin/start-cluster.sh

2.yarn-session模式(本案例使用方式)

先启动Flink yarn-session集群:bin/yarn-session.sh -s 1 -jm 1024 -tm 1024

然后再启动sql-client:bin/sql-client.sh embedded -s yarn-session

4.checkpoint配置

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#checkpointing

#sql-client设置checkpoint参数

SET 'execution.checkpointing.interval' = '10s';

SET 'parallelism.default' = '3';

5.创建source table

CREATE TABLE `cars`(

?`id` BIGINT,

?`owerId` BIGINT,

?`carCode` STRING,

?`carColor` STRING,

?`type` BIGINT,

?`remark` STRING,

?PRIMARY KEY(id) NOT ENFORCED

) WITH (

?'connector' = 'mysql-cdc',

?'hostname' = 'hadoop1',

?'port' = '3306',

?'username' = 'hive',

?'password' = 'hive',

?'database-name' = 'sca',

?'table-name' = 'cars',

?'connect.timeout' = '60s'

);

6.创建sink table

CREATE TABLE `cars_copy`(

?`id` BIGINT,

?`owerId` BIGINT,

?`carCode` STRING,

?`carColor` STRING,

?`type` BIGINT,

?`remark` STRING,

?PRIMARY KEY(id) NOT ENFORCED

) WITH (

?'connector' = 'jdbc',

?'url' = 'jdbc:mysql://hadoop1:3306/sca?useUnicode=true&characterEncoding=utf8',

?'username' = 'hive',

?'password' = 'hive',

?'table-name' = 'cars_copy',

?'sink.parallelism' = '2'

);

7.source to sink table

将采集过来的数据写入MySQL

insert into cars_copy SELECT * FROM cars;

查询结果表数据记录数

select count(*) from cars_copy

新增测试数据集(再次查看结果表)

insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096006','10244815','港T·7RONE','红色','1',NULL);

insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096007','10244816','港T·7RONE','黄色','1',NULL);

备注:如果通过手动cacel job,下次重新启动job仍然会重头采集表中的数据。

8.cacel job时保存Save point

bin/flink stop --savepointPath hdfs://mycluster/flinkcdc-savepoints -Dyarn.application.id=application_1658045078748_0001 79ce915e39fc1d18a194b6a464d7c3fd

备注:结尾一个参数为yarn中的job id,第二个参数为flink的job id。

9.cacel job之后重新恢复job

#设置job从上一次savepoint位置开始处理

SET 'execution.checkpointing.interval' = '10s';

SET 'parallelism.default' = '3';

SET 'execution.savepoint.path' = 'hdfs://mycluster/flinkcdc-savepoints/savepoint-79ce91-92206bcaaad2';

备注:该参数的值为savepoint路径。

#执行flink sql job

insert into cars_copy SELECT * FROM cars;

#新增测试数据集

insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096008','10244815','港T·7RONE','红色','1',NULL);

insert into `cars` (`id`, `owerId`, `carCode`, `carColor`, `type`, `remark`) values('10096009','10244816','港T·7RONE','黄色','1',NULL);

#再次查询结果表数据记录数

select count(*) from cars_copy

正常情况,这个时候采集的就是新增数据,历史数据不会再采集。

备注:Flink ?SQL方式采集MySQL数据,使用方便,但只支持单表。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-07-20 18:56:32  更:2022-07-20 18:56:37 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:50:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码