IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Canal学习总结 -> 正文阅读

[大数据]Canal学习总结

目录

一、什么是 Canal

二、MySQL 的 Binlog

1.什么是Binlog?

2.binlog分类

三、Canal 的工作原理

1.MySQL 主从复制过程

2.canal 工作原理

3.使用场景

四、搭建

五、使用

六、总结


一、什么是 Canal

????????阿里巴巴 B2B 公司,因为业务的特性,卖家主要集中在国内,买家主要集中在国外,所 以衍生出了同步杭州和美国异地机房的需求,从 2010 年开始,阿里系公司开始逐步的尝试 基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

????????Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。 目前。Canal主要支持了 MySQLBinlog解析,解析完成后才利用Canal Client来处理获得 的相关数据 (数据库同步需要阿里的 Otter 中间件,基于 Canal)。

二、MySQL Binlog

1.什么是Binlog?

????????MySQL的二进制日志可以说MySQL最重要的日志了,它记录了所有的DDLDML除 了数据查询语句语句,以事件形式记录,还包含语句所执行的消耗的时间, MySQL 的二进 制日志是事务安全型的。

一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:

????????其一 MySQL Replication Master 端开启 BinlogMaster 把它的二进制日志传递给 Slaves 来达到 Master-Slave 数据一致的目的。

????????其二:自然就是数据恢复了,通过使用MySQL Binlog工具来使恢复数据。

????????二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有 的二进制文件,二进制日志文件(文件名后缀为.00000*记录数据库所有的DDLDML除 了数据查询语句语句事件。

2.binlog分类

????????MySQL Binlog的格式有三种,分别是STATEMENTMIXED,ROW在配置文件中可以选择配置 binlog_format= statement|mixed|row三种格式的区别:

  1. statement语句级binlog会记录每次一执行写操作的语句。相对row模式节省空 但是可能产生不一致性比如update tt set create_date= now()”,如果用binlog日志 进行恢复由于执行时间不同可能产生的数据就不同。

优点:节省空间。 缺点:有可能造成数据不一致。

? ? ? 2.row:行级,binlog会记录每次操作后每行记录的变化。

优点:保持数据的绝对一致性。因为不管sql是什么,引用了什么函数,他只记录 执行后的效果。

缺点:占用较大空间。

? ? 3.mixed: statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement 模式不一致问题,默认还是statement在某些情况下譬如:当函数中包含UUID()时;包含 AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照 ROW 的方式进行处理

优点:节省空间,同时兼顾了一定的一致性。 缺点:还有些极个别情况依旧会造成不一致,另外 statement mixed 对于需要对 binlog 的监控的情况都不方便。

综合上面对比, Canal 想做监控分析,选择 row 格式比较合适。

三、Canal 的工作原理

1.MySQL 主从复制过程

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

2.canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

3.使用场景

1)阿里Otter中间件的一部分

Otter 是阿里用于进行异地数据库之间的同步框架, Canal 是其中一部分。

?2)更新缓存 缓存一致性问题解决方案。

四、搭建

canal 搭建

五、使用

1.TCP模式处理数据

package com.lujianing.test.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
        //获取连接
        CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "example", "", "");
        while (true) {
            //连接
            canalConnector.connect();
            //订阅数据库
            canalConnector.subscribe("canal.*");

            //获取数据
            Message message = canalConnector.get(100);

            //获取Entry集合
            List<CanalEntry.Entry> entries = message.getEntries();

            //判断集合是否为空,如果为空,则等待一会继续拉取数据
            if (entries.size() <= 0) {
                System.out.println("当次抓取没有数据,休息一会。。。。。。");
                Thread.sleep(1000);
            } else {
                //遍历entries,单条解析
                for (CanalEntry.Entry entry : entries) {
                    //1.获取表名
                    String tableName = entry.getHeader().getTableName();
                    //2.获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    //3.获取序列化后的数据
                    ByteString storeValue = entry.getStoreValue();
                    //4.判断当前entryType类型是否为ROWDATA
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        //5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        //6.获取当前事件的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        //7.获取数据集
                        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                        //8.遍历rowDataList,并打印数据集
                        for (CanalEntry.RowData rowData : rowDataList) {
                            JSONObject beforeData = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(), column.getValue());
                            }
                            JSONObject afterData = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterData.put(column.getName(), column.getValue());
                            }
                            //数据打印
                            System.out.println("Table:" + tableName +",EventType:" + eventType +",Before:" + beforeData +",After:" + afterData);
                        }
                    } else {
                        System.out.println("当前操作类型为:" + entryType);
                    }
                }
            }
        }
    }
}

2.通过消息中间件处理数据 如kafka。

修改 canal.properties:

# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置
canal.mq.servers = a1:9092,a1:9092,a3:9092
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = false
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

修改 instance.properties

# mq config
# 指定Topic名称 和 分区数量
canal.mq.topic=canal_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mydatabase.mytable
canal.mq.partition=0
# hash partition config
canal.mq.partitionsNum=3
#解决顺序消费问题
canal.mq.partitionHash=mydatabase.mytable

????????配置了kafkapartitionHash,并且我们一个Topic就是一个表。这样的效果就是,一个表的数据只会推到一个固定的partition中,然后再推给consumer进行消费处理,同步到新的数据库。通过这种方式,解决了之前碰到的binlog日志顺序处理的问题。

重启canal以加载配置信息.

启动Kafka消费者来查看是否运行:

bin/kafka-console-consumer.sh --bootstrap-server a1:9092 --topic canal_test

执行一条插入sql语句:

INSERT INTO t_user VALUES('zs','male'),('ls','female');

Kafka控制台出现数据,此处可以监控该canal_test topic主题。从而进行操作。

六、总结

????????canal的好处在于对业务代码没有侵入,因为是基于监听binlog日志去进行同步数据的。实时性也能做到准实时。

????????通常采用配置MQ模式,配合RocketMQ或者Kafka,canal会把数据发送到MQ的topic中,然后通过消息队列的消费者进行处理

支持集群使用zk

借鉴:alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件 (github.com)

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-15 00:05:37  更:2022-04-15 00:09:57 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:41:24-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码