Canal简介
Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。
MySQL 的 Binlog
MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除 了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。 一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景: 其一:MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves来达到 Master-Slave 数据一致的目的。 其二:自然就是数据恢复了,通过使用 MySQL Binlog 工具来使恢复数据。 二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。
Binlog 的分类
MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配 置 binlog_format= statement|mixed|row。三种格式的区别: 1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空 间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志 进行恢复,由于执行时间不同可能产生的数据就不同。 优点:节省空间。 缺点:有可能造成数据不一致。 2)row:行级, binlog 会记录每次操作后每行记录的变化。 优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录 执行后的效果。 缺点:占用较大空间。 3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理 优点:节省空间,同时兼顾了一定的一致性。 缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对 binlog 的监控的情况都不方便。 综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。
连接Tcp
- 配置canal.properties
[root@niit02 conf]# pwd
/training/canal/conf
[root@niit02 conf]# vi canal.properties
2. 配置
[root@niit02 example]# pwd
/training/canal/conf/example
[root@niit02 example]# vi instance.properties
[root@niit02 example]#
3. ideal新建maven项目 pom 依赖
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
client代码
package com.canal;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;
public class TCPclinet {
public static void main(String[] args) throws InvalidProtocolBufferException {
CanalConnector canalConnector = CanalConnectors.newClusterConnector(Collections.singletonList(new InetSocketAddress("192.168.55.131", 11111)), "example", "canal", "Canal123!");
while (true){
canalConnector.connect();
canalConnector.subscribe("gmall.*");
Message message = canalConnector.get(100);
List<CanalEntry.Entry> entries = message.getEntries();
if (entries.size()<=0){
System.out.println("没有数据,休息一会");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}else {
for (CanalEntry.Entry entry : entries) {
String tableName = entry.getHeader().getTableName();
CanalEntry.EntryType entryType = entry.getEntryType();
if (entryType.equals(CanalEntry.EntryType.ROWDATA)){
ByteString storeValue = entry.getStoreValue();
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
CanalEntry.EventType eventType = rowChange.getEventType();
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
for (CanalEntry.RowData rowData : rowDatasList) {
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
JSONObject beforeData = new JSONObject();
for (CanalEntry.Column column : beforeColumnsList) {
beforeData.put(column.getName(),column.getValue());
}
JSONObject afterdata = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
afterdata.put(column.getName(),column.getValue());
}
System.out.println("TableName:" + tableName
+
",EventType:" + eventType +
",After:" + beforeData +
",After:" + afterdata);
}
}
}
}
}
}
}
TCP测试
点击运行代码 在数据库中插入信息
canal连接kafka
1.修改 canal.properties 2. 修改 instance.properties
启动canal
[root@niit02 canal]# bin/startup.sh
启动kafka
[root@niit02 kafka_2.11-2.2.1]# bin/kafka-server-start.sh ./config/server.properties
启动消费端
[root@niit02 kafka_2.11-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server niit02:9092 --topic canal_test --from-beginning
往数据中插入信息,查看服务端
|