IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> canal 快速入门 -> 正文阅读

[大数据]canal 快速入门

canal.deployer-1.1.5.tar.gz
canal.admin-1.1.5.tar.gz

1.修改配置文件
vim /etc/my.cnf

log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

2.重启mysql

3.修改配置文件

vim /opt/canal/conf/example/instance.properties
canal.instance.mysql.slaveId=1234
canal.instance.master.address=hdp1:3306
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
# 检测数据库哪张表  .*\\..* 为检测全表
canal.instance.filter.regex=.*\\..*

4.启动

/opt/canal/bin/startup.sh

查看日志

tail -fn 300 /opt/canal/logs/canal/canal.log 

显示 running now 则表示运行成功

## the canal server is running now ......
tail -fn 300 /opt/canal/logs/example/example.log 

显示start successful… 表示没问题

start successful....

ClientExample
导入pom依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

代码如下:

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample {


    public static void main(String args[]) {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("hdp1",
                11111), "example", "", "");
        int batchSize = 1000;
        int emptyCount = 0;
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            int totalEmptyCount = 120;
            while (emptyCount < totalEmptyCount) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    emptyCount++;
                    System.out.println("empty count : " + emptyCount);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    emptyCount = 0;
                    // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                    printEntry(message.getEntries());
                }

                connector.ack(batchId); // 提交确认
                // connector.rollback(batchId); // 处理失败, 回滚数据
            }

            System.out.println("empty too many times, exit");
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }

            EventType eventType = rowChage.getEventType();
            System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------&gt; before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------&gt; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}

运行Client,首先启动Canal Server上述步骤4
启动Canal Client后,可以从控制台从看到类似消息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

此时代表当前数据库无变更数据
触发数据库变更
可以从控制台中看到:

================&gt; binlog[mysql-bin.000004:168242] , name[presto,user_info] , eventType : UPDATE
-------&gt; before
id : 4    update=false
nickname : job    update=false
address : 上海    update=false
-------&gt; after
id : 4    update=false
nickname : job2    update=true
address : 上海    update=false

Canal Kafka RocketMQ QuickStart

1.关闭服务

/opt/canal/bin/stop.sh

2.修改配置文件

vim /opt/canal/conf/example/instance.properties
canal.mq.partitionsNum=3
vim /opt/canal/conf/canal.properties
canal.serverMode = kafka
kafka.bootstrap.servers = hdp1:9092,hdp2:9092,hdp3:9092

创建kafka主题

./kafka-topic.sh example 3 2

启动canal

/opt/canal/bin/startup.sh

进入mysql进行修改数据

启动kafka消费者

./kafka-consumer.sh example

监听到数据:

{"data":[{"id":"4","nickname":"job3","address":"上海"}],"database":"presto","es":1629983070000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","nickname":"varchar(255)","address":"varchar(255)"},"old":[{"nickname":"job2"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"nickname":12,"address":12},"table":"user_info","ts":1629983070348,"type":"UPDATE"}

Canal HA

修改配置文件

vim /opt/canal/conf/canal.properties
canal.zkServers = hdp1:2181,hdp2:2181,hdp3:2181
#原本为file  ,改为default
#canal.instance.global.spring.xml = classpath:spring/file-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

将canal传到hdp2

xsync ./canal/ hdp 2 2

修改配置文件

vim /opt/canal/conf/example/instance.properties
canal.instance.mysql.slaveId=1235

启动hdp1 hdp2的canal

查看zookeeper

/opt/zookeeper-3.4.6/bin/zkCli.sh
[zk: localhost:2181(CONNECTED) 10] get  /otter/canal/destinations/example/running
{"active":true,"address":"192.168.56.101:11111"}
cZxid = 0x1100000071
ctime = Thu Aug 26 21:19:09 CST 2021
mZxid = 0x1100000071
mtime = Thu Aug 26 21:19:09 CST 2021
pZxid = 0x1100000071
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x27b81de17bb0000
dataLength = 48
numChildren = 0

Canal Admin

修改配置文件

vim /opt/canal-admin/conf/application.yml
 spring.datasource:
  address: hdp1:3306
  database: canal_manager
  username: root
  password: root
  driver-class-name: com.mysql.jdbc.Driver

初始化脚本:

/opt/canal-admin/conf/canal_manager.sql
mysql> source canal_manager.sql;
mysql> use canal_manager;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> show tables;
+-------------------------+
| Tables_in_canal_manager |
+-------------------------+
| canal_adapter_config    |
| canal_cluster           |
| canal_config            |
| canal_instance_config   |
| canal_node_server       |
| canal_user              |
+-------------------------+
6 rows in set (0.00 sec)

开启admin

/opt/canal-admin/bin/startup.sh

端口为:

1861 CanalAdminApplication

页面为:http://hdp1:8089/ admin 123456

在这里插入图片描述
1.新建集群
在这里插入图片描述
2.修改配置文件

vim /opt/canal/conf/canal_local.properties
canal.admin.manager = hdp1:8089
canal.admin.register.cluster = online #集群名
canal.admin.register.name = server1  #注册名

此处的user passwd 对应

canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

/opt/canal-admin/conf/application.yml

canal:
  adminUser: admin
  adminPasswd: admin

页面登陆密码为mysql中如图所示:
在这里插入图片描述
3.启动

/opt/canal/bin/startup.sh local

4.修改hdp2配置文件

vim /opt/canal/conf/canal_local.properties
canal.admin.manager = hdp1:8089
canal.admin.register.cluster = online #集群名
canal.admin.register.name = server2  #注册名

5.启动hdp2

/opt/canal/bin/startup.sh local

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述在这里插入图片描述

在这里插入图片描述
保存即可

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
修改presto下表
打开kafka消费端

{"data":[{"id":"4","nickname":"job4","address":"上海"}],"database":"presto","es":1629989877000,"id":2,"isDdl":false,"mysqlType":{"id":"int(11)","nickname":"varchar(255)","address":"varchar(255)"},"old":[{"nickname":"job3"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"nickname":12,"address":12},"table":"user_info","ts":1629989877647,"type":"UPDATE"}

可以建多个instance进行监听在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-27 11:55:47  更:2021-08-27 11:56:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:48:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码