目录
一、什么是 Canal
二、MySQL 的 Binlog
1.什么是Binlog?
2.binlog分类
三、Canal 的工作原理
1.MySQL 主从复制过程
2.canal 工作原理
3.使用场景
四、搭建
五、使用
六、总结
????????阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所 以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试 基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。
????????Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前。Canal主要支持了 MySQL的Binlog解析,解析完成后才利用Canal Client来处理获得 的相关数据 (数据库同步需要阿里的 Otter 中间件,基于 Canal)。
二、MySQL 的 Binlog
1.什么是Binlog?
????????MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDL和DML(除 了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间, MySQL 的二进 制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
????????其一 MySQL Replication 在 Master 端开启 Binlog, Master 把它的二进制日志传递给 Slaves 来达到 Master-Slave 数据一致的目的。
????????其二:自然就是数据恢复了,通过使用MySQL Binlog工具来使恢复数据。
????????二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除 了数据查询语句)语句事件。
2.binlog分类
????????MySQL Binlog的格式有三种,分别是STATEMENT;MIXED,ROW。在配置文件中可以选择配置 binlog_format= statement|mixed|row。三种格式的区别:
- statement:语句级,binlog会记录每次一执行写操作的语句。相对row模式节省空 间,但是可能产生不一致性,比如“update tt set create_date= now()”,如果用binlog日志 进行恢复,由于执行时间不同可能产生的数据就不同。
优点:节省空间。 缺点:有可能造成数据不一致。
? ? ? 2.row:行级,binlog会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录 执行后的效果。
缺点:占用较大空间。
? ? 3.mixed: statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题,默认还是statement,在某些情况下譬如:当函数中包含UUID()时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。 缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对 binlog 的监控的情况都不方便。
综合上面对比, Canal 想做监控分析,选择 row 格式比较合适。
- MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
- MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
- MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
2.canal 工作原理
- canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
1)阿里Otter中间件的一部分
Otter 是阿里用于进行异地数据库之间的同步框架, Canal 是其中一部分。
?2)更新缓存 缓存一致性问题解决方案。
四、搭建
canal 搭建
五、使用
1.TCP模式处理数据
package com.lujianing.test.canal;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
//获取连接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"example", "", "");
while (true) {
//连接
canalConnector.connect();
//订阅数据库
canalConnector.subscribe("canal.*");
//获取数据
Message message = canalConnector.get(100);
//获取Entry集合
List<CanalEntry.Entry> entries = message.getEntries();
//判断集合是否为空,如果为空,则等待一会继续拉取数据
if (entries.size() <= 0) {
System.out.println("当次抓取没有数据,休息一会。。。。。。");
Thread.sleep(1000);
} else {
//遍历entries,单条解析
for (CanalEntry.Entry entry : entries) {
//1.获取表名
String tableName = entry.getHeader().getTableName();
//2.获取类型
CanalEntry.EntryType entryType = entry.getEntryType();
//3.获取序列化后的数据
ByteString storeValue = entry.getStoreValue();
//4.判断当前entryType类型是否为ROWDATA
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
//5.反序列化数据
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//6.获取当前事件的操作类型
CanalEntry.EventType eventType = rowChange.getEventType();
//7.获取数据集
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
//8.遍历rowDataList,并打印数据集
for (CanalEntry.RowData rowData : rowDataList) {
JSONObject beforeData = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
beforeData.put(column.getName(), column.getValue());
}
JSONObject afterData = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
//数据打印
System.out.println("Table:" + tableName +",EventType:" + eventType +",Before:" + beforeData +",After:" + afterData);
}
} else {
System.out.println("当前操作类型为:" + entryType);
}
}
}
}
}
}
2.通过消息中间件处理数据 如kafka。
修改 canal.properties:
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置
canal.mq.servers = a1:9092,a1:9092,a3:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false
修改 instance.properties
# mq config
# 指定Topic名称 和 分区数量
canal.mq.topic=canal_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
#解决顺序消费问题
canal.mq.partitionHash=mydatabase.mytable
????????配置了kafka 的partitionHash ,并且我们一个Topic 就是一个表。这样的效果就是,一个表的数据只会推到一个固定的partition 中,然后再推给consumer 进行消费处理,同步到新的数据库。通过这种方式,解决了之前碰到的binlog 日志顺序处理的问题。
重启canal以加载配置信息.
启动Kafka消费者来查看是否运行:
bin/kafka-console-consumer.sh --bootstrap-server a1:9092 --topic canal_test
执行一条插入sql语句:
INSERT INTO t_user VALUES('zs','male'),('ls','female');
Kafka控制台出现数据,此处可以监控该canal_test topic主题。从而进行操作。
六、总结
????????canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时。
????????通常采用配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理。
支持集群使用zk。
借鉴:alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)
|