IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Canal同时监控两个mysql的binlog并同步至一个topic中 -> 正文阅读

[大数据]Canal同时监控两个mysql的binlog并同步至一个topic中

准备 :

  1. 安装canal (博主canal版本1.1.5)
  2. 安装kafka, 做接收binlog日志数据用 (博主kafka版本2.11)
  3. 安装两个msyql, 相当于两个mysql数据库 (博主mysql版本5.1.0)
    我mysql的两台节点分别是 : 192.168.1.137, 192.168.1.138
  4. 开启binlog (两台mysql都需要开启)
  5. 在mysql中创建canal用户(两台mysql都需要创建)

这些组件的安装方式网上有很多, 这里不做赘述,
第一台mysql开启binlog :

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # ,不要和 canal 的 slaveId 重复 ,集群模式也不可以重复

第二台mysql开启binlog :

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=2 # ,不要和 canal 的 slaveId 重复 ,集群模式也不可以重复

两台mysql中都需要创建canal用户 :

创建canal用户,密码为canal
CREATE USER canal IDENTIFIED BY 'canal'; 
赋予canal 权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
刷新授权
FLUSH PRIVILEGES;

修改canal 配置文件,修改 Canal 全局设置
vim $CANAL_CONF/canal.properties
$CANAL_CONF : 是canal目录下的conf目录

#################################################
#########               common argument         #############
################################################
# tcp, kafka, rocketMQ, rabbitMQ 
# 服务方式,默认为 tcp,如果需要将 canal 消息发送到 kafka 就选 kafka
canal.serverMode = kafka

#################################################
#########               destinations            #############
#################################################
 # 当前server上部署的instance列表,定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件,多个实例用 逗号分隔, 如下: canal.destinations = example1,example2
canal.destinations = example  
# 全局的spring配置方式的组件文件 ,主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。(更详细的说明请自行查看参数)
canal.instance.global.spring.xml = classpath:spring/group-instance.xml  

##################################################
#########                    Kafka                   #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092  #因为我kafka在本机上, 所以直接使用的127.0.0.1, 请根据实际情况自行配置

修改instance 配置文件,修改单个数据源的配置
vim $CANAL_CONF/example/instance.properties
这里的 example , 就是在 canal.properties文件中canal.destinations=参数指定的, 请根据实际情况自行配置

#################################################
# position info
#第一台mysql配置
canal.instance.master1.address=192.168.1.137:3306
canal.instance.master1.journal.name=mysql主库链接时起始的binlog文件
canal.instance.master1.position=mysql主库链接时起始的binlog偏移量
canal.instance.master1.timestamp=mysql主库链接时起始的binlog的时间戳
canal.instance.master1.gtid=是否启用mysql gtid的订阅模式
#第二台mysql配置
canal.instance.master2.address=192.168.1.138:3306
canal.instance.master2.journal.name=mysql主库链接时起始的binlog文件
canal.instance.master2.position=mysql主库链接时起始的binlog偏移量
canal.instance.master2.timestamp=mysql主库链接时起始的binlog的时间戳
canal.instance.master2.gtid=是否启用mysql gtid的订阅模式

# 第一台mysql   username/password
canal.instance1.dbUsername=root
canal.instance1.dbPassword=137password
# 第二台mysql   username/password
canal.instance2.dbUsername=root
canal.instance2.dbPassword=137password

# 白名单,选择监控哪些表
canal.instance1.filter.regex=.*\\..*  
canal.instance2.filter.regex=dm.test1,dm.zzz_test2
# 黑名单,选择不监控哪些表
canal.instance.filter.black.regex=mysql\\.slave_.* 

# topic的名字
canal.mq.topic=cctv1
# topic的分区
canal.mq.partition=0
#################################################

修改全局的spring配置方式的组件文件
vim $CANAL_CONF/spring/group-instance.xml
修改文件中value的值,应该和 instance.properties 的名称一样. 实际情况请自己指定, 前面列号是我canal1.1.5的列号,一切按自己的实际情况进行指定

<?xml version="1.0" encoding="UTF-8"?>
........
这里省略多余部分
80                <!-- 解析过滤处理 -->
81                <property name="eventFilter">
82                        <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
83                               <constructor-arg index="0" value="${canal.instance1.filter.regex:}" />
84                        </bean>
                </property>
........
这里省略多余部分
              
117                <!-- 解析数据库信息 -->
118                <property name="masterInfo">
119                        <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
120                                <property name="address" value="${canal.instance.master1.address}" />
121                                <property name="username" value="${canal.instance1.dbUsername:retl}" />
122                                <property name="password" value="${canal.instance1.dbPassword:retl}" />
123                                <property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
124                                <property name="enableDruid" value="${canal.instance.enableDruid:false}" />
125                                <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
126                        </bean>
127                </property>               
........
这里省略多余部分
139                <!-- 解析起始位点 -->
140                <property name="masterPosition">
141                        <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
142                                <property name="journalName" value="${canal.instance.master1.journal.name}" />
143                                <property name="position" value="${canal.instance.master1.position}" />
144                                <property name="timestamp" value="${canal.instance.master1.timestamp}" />
145                                <property name="gtid" value="${canal.instance.master1.gtid}" />
146                        </bean>
147                </property>
........
这里省略多余部分                
191                <!-- 解析过滤处理 -->
192               <property name="eventFilter">
193                        <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
194                                <constructor-arg index="0" value="${canal.instance2.filter.regex:}" />
195                        </bean>
196                </property>
........
这里省略多余部分    
224                <!-- 解析数据库信息 -->
225                <property name="masterInfo">
226                        <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
227                                <property name="address" value="${canal.instance.master2.address}" />
228                                <property name="username" value="${canal.instance2.dbUsername:retl}" />
229                                <property name="password" value="${canal.instance2.dbPassword:retl}" />
230                                <property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
231                                <property name="enableDruid" value="${canal.instance.enableDruid:false}" />
232                                <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
233                        </bean>
234                </property>
........
这里省略多余部分                
246                <!-- 解析起始位点 -->
247                <property name="masterPosition">
248                        <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
249                                <property name="journalName" value="${canal.instance.master2.journal.name}" />
250                               <property name="position" value="${canal.instance.master2.position}" />
251                                <property name="timestamp" value="${canal.instance.master2.timestamp}" />
252                                <property name="gtid" value="${canal.instance.master2.gtid}" />
253                        </bean>
254                </property>
........
这里省略多余部分 
283    <bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
284        <property name="topic" value="${canal.mq.topic}" />
285                <property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
286        <property name="partition" value="${canal.mq.partition}" />
287        <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
288        <property name="partitionHash" value="${canal.mq.partitionHash}" />
289                <property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
290    </bean>

配置文件配置完成, 启动canal程序
sh $CANAL_HOME/startup.sh
$CANAL_HOME : Canal的bin目录

[root@master bin]# ./startup.sh 
cd to /home/softs/canal/bin for workaround relative path
LOG CONFIGURATION : /home/softs/canal/bin/../conf/logback.xml
canal conf : /home/softs/canal/bin/../conf/canal.properties
CLASSPATH :/home/softs/canal/bin/../conf:/home/softs/canal/bin/../lib/zookeeper-..省略中间过程
cd to /home/softs/canal/bin for continue
[root@master bin]# 

启动kafka准备消费数据 :
sh $KAFKA_HOME/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1
$KAFKA_HOME : kafka的bin目录, 启动命令跟实际情况自行指定
这里的cctv1是 instance.properties 配置文件中canal.mq.topic=cctv1 参数配置的 :

[root@master bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1

登入192.168.1.137的mysql修改数据 :

MariaDB [dm]> update dm.test1 set gender = '68' where  id = '139'; 
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MariaDB [dm]> 

查看topic的消费情况 :

{"data":[{"id":"139","name":"102","age":"1","gender":"68","new1":null,"new3":null,"new4":null}],"database":"dm","es":1648617618000,"id":1,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(64)","age":"int(10)","gender":"int(12)","new1":"int(12)","new3":"int(12)","new4":"int(12)"},"old":[{"gender":"66"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4,"gender":4,"new1":4,"new3":4,"new4":4},"table":"test1","ts":1648617619429,"type":"UPDATE"}

登入192.168.1.137的mysql修改数据 :

MariaDB [dm]> update dm.test2 set hjhk2 = '12' where id = '48'; 
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MariaDB [dm]> 

查看topic的消费情况 :

{"data":[{"id":"48","name":"da","age":"5","gender":"6","new2":"8","hjhk2":"12"}],"database":"dm","es":1648617672000,"id":2,"isDdl":false,"mysqlType":{"id":"int(20) unsigned","name":"varchar(65)","age":"int(10)","gender":"int(12)","new2":"int(14)","hjhk2":"varchar(100)"},"old":[{"hjhk2":"11"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4,"gender":4,"new2":4,"hjhk2":12},"table":"test2","ts":1648617672173,"type":"UPDATE"}

两个binlog同时写入到了同一个topic中, 到此配置完成

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 00:08:39  更:2022-04-01 00:10:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 15:01:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码