IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Canal实时采集mysql TCP,kafka 连接 -> 正文阅读

[大数据]Canal实时采集mysql TCP,kafka 连接

Canal简介

Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。
目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的 Otter 中间件,基于 Canal)。

MySQL 的 Binlog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML(除
了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL 的二进制日志是事务安全型的。
一般来说开启二进制日志大概会有 1%的性能损耗。二进制有两个最重要的使用场景:
其一:MySQL Replication 在 Master 端开启 Binlog,Master 把它的二进制日志传递给 Slaves来达到 Master-Slave 数据一致的目的。
其二:自然就是数据恢复了,通过使用 MySQL Binlog 工具来使恢复数据。
二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有
的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件。

Binlog 的分类

MySQL Binlog 的格式有三种,分别是 STATEMENT,MIXED,ROW。在配置文件中可以选择配
置 binlog_format= statement|mixed|row。三种格式的区别:
1)statement:语句级,binlog 会记录每次一执行写操作的语句。相对 row 模式节省空
间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志
进行恢复,由于执行时间不同可能产生的数据就不同。
优点:节省空间。
缺点:有可能造成数据不一致。
2)row:行级, binlog 会记录每次操作后每行记录的变化。
优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录
执行后的效果。
缺点:占用较大空间。
3)mixed:statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement
模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含
AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照
ROW 的方式进行处理
优点:节省空间,同时兼顾了一定的一致性。
缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对
binlog 的监控的情况都不方便。
综合上面对比,Canal 想做监控分析,选择 row 格式比较合适。

连接Tcp

  1. 配置canal.properties
[root@niit02 conf]# pwd
/training/canal/conf
[root@niit02 conf]# vi canal.properties

在这里插入图片描述
2. 配置

[root@niit02 example]# pwd
/training/canal/conf/example
[root@niit02 example]# vi instance.properties
[root@niit02 example]#

在这里插入图片描述
3. ideal新建maven项目
pom 依赖

<dependencies>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

client代码

package com.canal;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.List;

public class TCPclinet {
    public static void main(String[] args) throws InvalidProtocolBufferException {
        CanalConnector canalConnector = CanalConnectors.newClusterConnector(Collections.singletonList(new InetSocketAddress("192.168.55.131", 11111)), "example", "canal", "Canal123!");
        while (true){
            //获取连接
            canalConnector.connect();
            //监控的数据库
            canalConnector.subscribe("gmall.*");
            //获取Message
            Message message = canalConnector.get(100);
            List<CanalEntry.Entry> entries = message.getEntries();
            if (entries.size()<=0){
                System.out.println("没有数据,休息一会");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }else {
                for (CanalEntry.Entry entry : entries) {
                    String tableName = entry.getHeader().getTableName();
                    //获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();
                    //判断类型是不是rowdata
                    if (entryType.equals(CanalEntry.EntryType.ROWDATA)){
                        ByteString storeValue = entry.getStoreValue();
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                        CanalEntry.EventType eventType = rowChange.getEventType();
                        List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
                        for (CanalEntry.RowData rowData : rowDatasList) {
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            JSONObject beforeData = new JSONObject();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeData.put(column.getName(),column.getValue());

                            }
                            JSONObject afterdata = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterdata.put(column.getName(),column.getValue());

                            }
                            System.out.println("TableName:" + tableName
                                    +
                                    ",EventType:" + eventType +
                                    ",After:" + beforeData +
                                    ",After:" + afterdata);
                        }
                    }
                }

            }
        }
    }
}

TCP测试

点击运行代码 在数据库中插入信息
在这里插入图片描述

canal连接kafka

1.修改 canal.properties
在这里插入图片描述
在这里插入图片描述
2. 修改 instance.properties

在这里插入图片描述

在这里插入图片描述
启动canal

[root@niit02 canal]# bin/startup.sh

启动kafka

[root@niit02 kafka_2.11-2.2.1]# bin/kafka-server-start.sh ./config/server.properties

启动消费端

[root@niit02 kafka_2.11-2.2.1]# bin/kafka-console-consumer.sh --bootstrap-server niit02:9092 --topic canal_test --from-beginning

往数据中插入信息,查看服务端
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-14 10:00:40  更:2022-05-14 10:01:12 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 5:41:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码