1. 概述
Canal是阿里巴巴开发的一项开源组件,通过将自己伪装成MySQL Slave节点来接收Master节点的Binlog日志,然后就可以实现我们的需求,例如:同步到MySQL从库、同步到Elasticsearch、同步到Kafka 等等。
2. Canal的组成
Canal主要分成 Server(服务端)和 Client(客户端)
- Server 主要由 Instance 构成,1个Server可以有多个Instance,每个Instance由包括 EventParser、EventSink、EventStore、MetaManager 四个部分组成,如下图所示:
EventParser:连接MySQL,充当 Slave 和 Master 进行交互,并且实现协议解析
EventSink:EventParser 和 EventStore 的连接器,可以对数据进行一定的处理
EventStore:数据存储,内部通过 Event[] 数组记录,因此数据是存在内存
MetaManager:维护 conf/Instance名称/meta.dat 文件,记录了Client消费的进度(binlog文件名、position等等)
- Client 称为客户端,官方提供的Client Demo是一段Java程序,使用NIO连接 Server 获取Binlog进行处理。此外,阿里也开发了几个现成的Adapter(适配器),通过配置就能实现数据同步到Log、ES、Kafka等等,不需要编写任何的Client代码。
3. Server解析
3.1 Instance组件
3.1.1 MetaManager
元数据管理器,主要是对Client、Position信息进行记录。这里使用的是 FileMixedMetaManager 文件型管理器,它继承了 MemoryMetaManager 内存型管理器,它们的关系如下: 启动的时候,会优先调用 MemoryMetaManager.start() 方法,初始化Map对象,主要用于记录每个 Client 与 Position 的对应关系。
public class MemoryMetaManager extends AbstractCanalLifeCycle implements CanalMetaManager {
protected Map<String, List<ClientIdentity>> destinations;
protected Map<ClientIdentity, MemoryClientIdentityBatch> batches;
protected Map<ClientIdentity, Position> cursors;
public void start() {
super.start();
batches = MigrateMap.makeComputingMap(MemoryClientIdentityBatch::create);
cursors = new MapMaker().makeMap();
destinations = MigrateMap.makeComputingMap(destination -> Lists.newArrayList());
}
}
然后,才是调用 FileMixedMetaManager 自己的启动逻辑:
- 判断 …/conf 目录是否存在,不存在就创建
- 判断 …/conf 目录下是否存在当前 Instance 的目录,不存在就创建
- 创建频率为1秒的定时任务,定时把内存的Meta数据刷新到 meta.dat 文件
3.1.2 EventStore
这里使用的类型为 MemoryEventStoreWithBuffer,其内部维护名称为 entries 的 Event 数组对象,默认大小为 16384。
EventStore 有 3 种主要操作:Put、Get、Ack
- Put:添加数据,通过 putSequence 标记最后一次写操作的位置,默认 -1
- Get:读取数据,由Client发起,通过 getSequence 标记当前读取到的位置,默认 -1
- Ack:确认读取,由Client发起,在Get之后调用,表示Get的数据已经消费完毕,此操作会删除数据。通过 ackSequence 标记最后 Ack 的位置,默认 -1
举个例子说明他们之间的关系:
假设初始状态下,entries 数组长度等于10 执行 put 10 笔数据,此时 putSequence = 9 执行 get 8 笔数据,此时 getSequence = 7
执行 act 5 笔数据,此时 ackSequence = 4 特殊情况一:entries满了之后,继续 put 2 笔数据,会发生什么?
EventStore 计算下标的逻辑是调用 getIndex() 方法,当下标超过最大值,会从新开始计算。也就是说,继续 put 2 笔数据会放在下标 0 - 1
特殊情况二:以上面最后一个截图为例,继续 put 8 笔数据,会发生什么?
- Put 操作之前,EventStore 会先调用 checkFreeSlotAt 进行检查,因为put 8 笔数据会放在 0 - 7,已经超过 ack 的进度(4),此时会返回 false 给 EventSink,并且不会继续执行 put 操作。
- EventSink 收到 false 结果会进入阻塞状态,直到 put 成功为止。
3.1.3 EventSink
这里使用的类型为 EntryEventSink,内部的核心方法是 sinkData() ,作用是对Event进行过滤筛选,然后调用 eventStore.tryPut(),把事件传递给EventStore进行存储。
3.1.4 EventParser
EventParser 是独立一个线程在持续的运行,它的启动代码在 AbstractEventParser.start(),主要做以下工作:
-
计算最新的position位置 – 优先读取 meta.data 记录的 position 信息 – 如果没有 meta.data 文件,或者里面没有数据,说明当前是第一次启动,此时会执行 show master status 取出最新的 position -
开始dump – 把上一步取到的 position 发送给 master,表示从当前位置开始 dump – 循环拉取binlog并且进行解析 – 投递binlog到 EventSink,最后再由 EventSink Put 到 EventStore
3.1.5 Instance总结
3.2 CanalServerWithEmbedded 组件
前面我们一直说,Server内部可以包含多个Instance,这边的Server可以理解成CanalServerWithEmbedded 对象,其内部通过canalInstances属性来记录各个Instance。
注意:CanalServerWithEmbedded 名称太长了,后面简称为 embeddedServer
private Map<String, CanalInstance> canalInstances;
embeddedServer 内部有多个重要方法,比如:get()、getWithoutAck()、subscribe()、rollback()、unsubscribe()。这些方法乍一看,有点像Client调用的方法,为什么在 embeddedServer 也会有呢?
以get()方法为例,Client执行时会通过NIO将消息发送到nettyServer,而nettyServer进一步调用 embeddedServer 的对应get()方法。其他方法也同理。
接下来,就是介绍 nettyServer
3.3 CanalServerWithNetty 组件
CanalServerWithNetty 简称 nettyServer,顾名思义,这是基于 Netty 开发的服务端对象,其业务逻辑主要由几个Handler实现,如下所示:
bootstrap.setPipelineFactory(() -> {
ChannelPipeline pipelines = Channels.pipeline();
pipelines.addLast(FixedHeaderFrameDecoder.class.getName(), new FixedHeaderFrameDecoder());
pipelines.addLast(HandshakeInitializationHandler.class.getName(),
new HandshakeInitializationHandler(childGroups));
pipelines.addLast(ClientAuthenticationHandler.class.getName(),
new ClientAuthenticationHandler(embeddedServer));
SessionHandler sessionHandler = new SessionHandler(embeddedServer);
pipelines.addLast(SessionHandler.class.getName(), sessionHandler);
return pipelines;
});
nettyServer 与 embeddedServer 的关系,咱们在上一步也讲过了。nettyServer 接收到 Client 的业务请求,会调用 embeddedServer 的对应方法。
也就是说,nettyServer 负责对外监听,embeddedServer 负责执行业务
4. Client解析
根据官方提供的 Demo,Client 通常是一段 NIO 程序,它的业务一般分成这几个步骤:
- connect:连接 nettyServer
- subscribe:订阅需要监听 binlog 的表
- 开启死循环,执行下面这几步↓↓↓
- get:获取 binlog 增量数据
- ack:确认操作
- rollback:回滚操作
- 其他:针对取到的 binlog 执行特定的业务需求
Client 与 Server 的交互,可以用这张图表示:
5. 结束语
如果你原先对 Canal 只是停留在使用和配置的程度,那么读完这篇文章,应该会有更深层次的理解。这里呢,对源代码没怎么讲解,那样1篇文章是讲不完的,后续我会继续深究 Canal 实现的细节点,到时主要就是结合源码了。
|