准备 :
- 安装canal (博主canal版本1.1.5)
- 安装kafka, 做接收binlog日志数据用 (博主kafka版本2.11)
- 安装两个msyql, 相当于两个mysql数据库 (博主mysql版本5.1.0)
我mysql的两台节点分别是 : 192.168.1.137, 192.168.1.138 - 开启binlog (两台mysql都需要开启)
- 在mysql中创建canal用户(两台mysql都需要创建)
这些组件的安装方式网上有很多, 这里不做赘述, 第一台mysql开启binlog :
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1
第二台mysql开启binlog :
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=2
两台mysql中都需要创建canal用户 :
创建canal用户,密码为canal
CREATE USER canal IDENTIFIED BY 'canal';
赋予canal 权限
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
刷新授权
FLUSH PRIVILEGES;
修改canal 配置文件,修改 Canal 全局设置 vim $CANAL_CONF/canal.properties $CANAL_CONF : 是canal目录下的conf目录
#################################################
######### common argument #############
################################################
# tcp, kafka, rocketMQ, rabbitMQ
# 服务方式,默认为 tcp,如果需要将 canal 消息发送到 kafka 就选 kafka
canal.serverMode = kafka
#################################################
######### destinations #############
#################################################
# 当前server上部署的instance列表,定义了canal.destinations后,需要在canal.conf.dir对应的目录下建立同名的文件,多个实例用 逗号分隔, 如下: canal.destinations = example1,example2
canal.destinations = example
# 全局的spring配置方式的组件文件 ,主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问。(更详细的说明请自行查看参数)
canal.instance.global.spring.xml = classpath:spring/group-instance.xml
##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092 #因为我kafka在本机上, 所以直接使用的127.0.0.1, 请根据实际情况自行配置
修改instance 配置文件,修改单个数据源的配置 vim $CANAL_CONF/example/instance.properties 这里的 example , 就是在 canal.properties文件中canal.destinations=参数指定的, 请根据实际情况自行配置
#################################################
# position info
#第一台mysql配置
canal.instance.master1.address=192.168.1.137:3306
canal.instance.master1.journal.name=mysql主库链接时起始的binlog文件
canal.instance.master1.position=mysql主库链接时起始的binlog偏移量
canal.instance.master1.timestamp=mysql主库链接时起始的binlog的时间戳
canal.instance.master1.gtid=是否启用mysql gtid的订阅模式
#第二台mysql配置
canal.instance.master2.address=192.168.1.138:3306
canal.instance.master2.journal.name=mysql主库链接时起始的binlog文件
canal.instance.master2.position=mysql主库链接时起始的binlog偏移量
canal.instance.master2.timestamp=mysql主库链接时起始的binlog的时间戳
canal.instance.master2.gtid=是否启用mysql gtid的订阅模式
# 第一台mysql username/password
canal.instance1.dbUsername=root
canal.instance1.dbPassword=137password
# 第二台mysql username/password
canal.instance2.dbUsername=root
canal.instance2.dbPassword=137password
# 白名单,选择监控哪些表
canal.instance1.filter.regex=.*\\..*
canal.instance2.filter.regex=dm.test1,dm.zzz_test2
# 黑名单,选择不监控哪些表
canal.instance.filter.black.regex=mysql\\.slave_.*
# topic的名字
canal.mq.topic=cctv1
# topic的分区
canal.mq.partition=0
#################################################
修改全局的spring配置方式的组件文件 vim $CANAL_CONF/spring/group-instance.xml 修改文件中value的值,应该和 instance.properties 的名称一样. 实际情况请自己指定, 前面列号是我canal1.1.5的列号,一切按自己的实际情况进行指定
<?xml version="1.0" encoding="UTF-8"?>
........
这里省略多余部分
80 <!-- 解析过滤处理 -->
81 <property name="eventFilter">
82 <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
83 <constructor-arg index="0" value="${canal.instance1.filter.regex:}" />
84 </bean>
</property>
........
这里省略多余部分
117 <!-- 解析数据库信息 -->
118 <property name="masterInfo">
119 <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
120 <property name="address" value="${canal.instance.master1.address}" />
121 <property name="username" value="${canal.instance1.dbUsername:retl}" />
122 <property name="password" value="${canal.instance1.dbPassword:retl}" />
123 <property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
124 <property name="enableDruid" value="${canal.instance.enableDruid:false}" />
125 <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
126 </bean>
127 </property>
........
这里省略多余部分
139 <!-- 解析起始位点 -->
140 <property name="masterPosition">
141 <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
142 <property name="journalName" value="${canal.instance.master1.journal.name}" />
143 <property name="position" value="${canal.instance.master1.position}" />
144 <property name="timestamp" value="${canal.instance.master1.timestamp}" />
145 <property name="gtid" value="${canal.instance.master1.gtid}" />
146 </bean>
147 </property>
........
这里省略多余部分
191 <!-- 解析过滤处理 -->
192 <property name="eventFilter">
193 <bean class="com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter" >
194 <constructor-arg index="0" value="${canal.instance2.filter.regex:}" />
195 </bean>
196 </property>
........
这里省略多余部分
224 <!-- 解析数据库信息 -->
225 <property name="masterInfo">
226 <bean class="com.alibaba.otter.canal.parse.support.AuthenticationInfo" init-method="initPwd">
227 <property name="address" value="${canal.instance.master2.address}" />
228 <property name="username" value="${canal.instance2.dbUsername:retl}" />
229 <property name="password" value="${canal.instance2.dbPassword:retl}" />
230 <property name="pwdPublicKey" value="${canal.instance.pwdPublicKey:retl}" />
231 <property name="enableDruid" value="${canal.instance.enableDruid:false}" />
232 <property name="defaultDatabaseName" value="${canal.instance.defaultDatabaseName:}" />
233 </bean>
234 </property>
........
这里省略多余部分
246 <!-- 解析起始位点 -->
247 <property name="masterPosition">
248 <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
249 <property name="journalName" value="${canal.instance.master2.journal.name}" />
250 <property name="position" value="${canal.instance.master2.position}" />
251 <property name="timestamp" value="${canal.instance.master2.timestamp}" />
252 <property name="gtid" value="${canal.instance.master2.gtid}" />
253 </bean>
254 </property>
........
这里省略多余部分
283 <bean id="mqConfig" class="com.alibaba.otter.canal.instance.core.CanalMQConfig">
284 <property name="topic" value="${canal.mq.topic}" />
285 <property name="dynamicTopic" value="${canal.mq.dynamicTopic}" />
286 <property name="partition" value="${canal.mq.partition}" />
287 <property name="partitionsNum" value="${canal.mq.partitionsNum}" />
288 <property name="partitionHash" value="${canal.mq.partitionHash}" />
289 <property name="dynamicTopicPartitionNum" value="${canal.mq.dynamicTopicPartitionNum}" />
290 </bean>
配置文件配置完成, 启动canal程序 sh $CANAL_HOME/startup.sh $CANAL_HOME : Canal的bin目录
[root@master bin]# ./startup.sh
cd to /home/softs/canal/bin for workaround relative path
LOG CONFIGURATION : /home/softs/canal/bin/../conf/logback.xml
canal conf : /home/softs/canal/bin/../conf/canal.properties
CLASSPATH :/home/softs/canal/bin/../conf:/home/softs/canal/bin/../lib/zookeeper-..省略中间过程
cd to /home/softs/canal/bin for continue
[root@master bin]#
启动kafka准备消费数据 : sh $KAFKA_HOME/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1 $KAFKA_HOME : kafka的bin目录, 启动命令跟实际情况自行指定 这里的cctv1是 instance.properties 配置文件中canal.mq.topic=cctv1 参数配置的 :
[root@master bin]# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1
登入192.168.1.137的mysql修改数据 :
MariaDB [dm]> update dm.test1 set gender = '68' where id = '139';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
MariaDB [dm]>
查看topic的消费情况 :
{"data":[{"id":"139","name":"102","age":"1","gender":"68","new1":null,"new3":null,"new4":null}],"database":"dm","es":1648617618000,"id":1,"isDdl":false,"mysqlType":{"id":"int(10) unsigned","name":"varchar(64)","age":"int(10)","gender":"int(12)","new1":"int(12)","new3":"int(12)","new4":"int(12)"},"old":[{"gender":"66"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4,"gender":4,"new1":4,"new3":4,"new4":4},"table":"test1","ts":1648617619429,"type":"UPDATE"}
登入192.168.1.137的mysql修改数据 :
MariaDB [dm]> update dm.test2 set hjhk2 = '12' where id = '48';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
MariaDB [dm]>
查看topic的消费情况 :
{"data":[{"id":"48","name":"da","age":"5","gender":"6","new2":"8","hjhk2":"12"}],"database":"dm","es":1648617672000,"id":2,"isDdl":false,"mysqlType":{"id":"int(20) unsigned","name":"varchar(65)","age":"int(10)","gender":"int(12)","new2":"int(14)","hjhk2":"varchar(100)"},"old":[{"hjhk2":"11"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"age":4,"gender":4,"new2":4,"hjhk2":12},"table":"test2","ts":1648617672173,"type":"UPDATE"}
两个binlog同时写入到了同一个topic中, 到此配置完成
|