Canal是什么?
canal是阿里最早为了解决异地双机房部署,跨机房部署存在数据同步的需求,在业务系统进行同步工作量比较大。进而根据了mysql主从同步的特点,canal服务监听mysql服务的binlog日志,获取增量变更进行同步。
基于日志增量订阅和消费的业务包括
- 数据库镜像
- 数据库实时备份索引构建和实时维护(拆分异构索引、倒排索引等)
- 业务 cache 刷新
- 带业务逻辑的增量数据处理
ps: 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
而在CDC领域,canal也是一种数据同步的工具,属于基于日志技术的解析工具。
MySQL主从复制原理
● MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看) ● MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log) ● MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据
Canal工具原理
● canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议 ● MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) ● canal 解析 binary log 对象(原始为 byte 流)
前提条件:
JDK环境 MySQL kafka集群
安装
1. 下载安装包 并解压缩
canal releases
tar zxvf canal.deployer-$version.tar.gz -C /xxxx/canal
2. 修改conf/canal.properties文件
这里使用的canal -> kafka 所以需要修改配置canal.serverMode=kafka ,增加kafka信息 canal.destinations为配置instance,默认是example
canal.ip =
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.zkServers =
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
canal.serverMode = kafka
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
canal.instance.detecting.enable = false
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
canal.instance.transaction.size = 1024
canal.instance.fallbackIntervalInSeconds = 60
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
canal.instance.binlog.format = ROW,STATEMENT,MIXED
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
canal.instance.get.ddl.isolation = false
canal.instance.parser.parallel = true
canal.instance.parser.parallelBufferSize = 256
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
canal.instance.tsdb.snapshot.interval = 24
canal.instance.tsdb.snapshot.expire = 360
canal.destinations = example
canal.conf.dir = ../conf
canal.auto.scan = true
canal.auto.scan.interval = 5
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
kafka.bootstrap.servers = xxxxxxx:9092,xxxxxx:9092,xxxxxxx:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =
3. 配置监听的数据库
上面配置了example实例,所以修改conf/example/instance.propertires的配置,主要修改监听的数据库,以及topic的策略
canal.instance.gtidon=false
canal.instance.master.address=xxxxxxxxxxx:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
canal.instance.tsdb.enable=true
canal.instance.dbUsername=test
canal.instance.dbPassword=test
canal.instance.connectionCharset = UTF-8
canal.instance.enableDruid=false
canal.instance.filter.regex=cdc_input\..*
canal.instance.filter.black.regex=mysql\\.slave_.*
canal.mq.topic=canal_test
canal.mq.partitionsNum=3
canal支持MQ数据的几种路由方式:单topic单分区,单topic多分区、多topic单分区、多topic多分区 重点来了: 其中我们主要关注创建topic的方式,是固定是还是按照一个表一个topic,还是一个库一个topic,以及topic的分区信息。 如果要保证消息的顺序,那么至少是一个库的所有表只往一个topic的单分区写。 或者一个表只往一个topic的单分区写。不然就会出现问题。
试想一下,如果delete语句先被接受, insert语句后接受。 目标端接收到kafka消息,想要还原数据,结果去delete的时候报错了 发现这条数据不存在,并且报错。报错还比较好解决,你可以忽略掉。但是后面insert语句来了。 你还得插入一条数据。 但是源端其实已经没有这条数据了。 这就造成两边数据不一致。
4.启动canal
sh bin/startup.sh
5.查看日志
tail -200f logs/canal/canal.log
tail -200f logs/example/example.log
6.监听数据库表插入数据
这个步骤很简单,不管是客户端还是写个程序,只需要往监听的表里插入一条数据即可
7.查看kafka数据
去到kafka安装的bin目录
./kafka-console-consumer.sh --bootstrap-server xxxx:9092,xxxx:9092,xxxx:9092 --topic test --from-beginning
8.关闭Canal
sh bin/stop.sh
附录
参数名 | 参数说明 | 默认值 |
---|
canal.mq.servers | kafka为bootstrap.servers rocketMQ中为nameserver列表 | 127.0.0.1:6667 | canal.mq.retries | 发送失败重试次数 | 0 | canal.mq.batchSize | kafka为ProducerConfig.BATCH_SIZE_CONFIG rocketMQ无意义 | 16384 | canal.mq.maxRequestSize | kafka为ProducerConfig.MAX_REQUEST_SIZE_CONFIG rocketMQ无意义 | 1048576 | canal.mq.lingerMs | kafka为ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建议将该值调大, 如: 200 rocketMQ无意义 | 1 | canal.mq.bufferMemory | kafka为ProducerConfig.BUFFER_MEMORY_CONFIG rocketMQ无意义 | 33554432 | canal.mq.acks | kafka为ProducerConfig.ACKS_CONFIG rocketMQ无意义 | all | canal.mq.kafka.kerberos.enable | kafka为ProducerConfig.ACKS_CONFIG rocketMQ无意义 | false | canal.mq.kafka.kerberos.krb5FilePath | kafka kerberos认证 rocketMQ无意义 | …/conf/kerberos/krb5.conf | canal.mq.kafka.kerberos.jaasFilePath | kafka kerberos认证 rocketMQ无意义 | …/conf/kerberos/jaas.conf | canal.mq.producerGroup | kafka无意义 rocketMQ为ProducerGroup名 | Canal-Producer | canal.mq.accessChannel | kafka无意义 rocketMQ为channel模式,如果为aliyun则配置为cloud | local | canal.mq.vhost= | rabbitMQ配置 | 无 | canal.mq.exchange= | rabbitMQ配置 | 无 | canal.mq.username= | rabbitMQ配置 | 无 | canal.mq.password= | rabbitMQ配置 | 无 | canal.mq.aliyunuid= | rabbitMQ配置 | 无 | canal.mq.canalBatchSize | 获取canal数据的批次大小 | 50 | canal.mq.canalGetTimeout | 获取canal数据的超时时间 | 100 | canal.mq.parallelThreadSize | mq数据转换并行处理的并发度 | 8 | canal.mq.flatMessage | 是否为json格式 如果设置为false,对应MQ收到的消息为protobuf格式 需要通过CanalMessageDeserializer进行解码 | false | canal.mq.topic | mq里的topic名 | 无 | canal.mq.dynamicTopic | mq里的动态topic规则, 1.1.3版本支持 | 无 | canal.mq.partition | 单队列模式的分区下标, | 1 | canal.mq.partitionsNum | 散列模式的分区数 | 无 | canal.mq.partitionHash | 散列规则定义库名.表名 : 唯一主键,比如mytest.person: id1.1.3版本支持新语法,见下文 | 无 |
|