IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Canal通过Kafka实现MySQL与Redis同步 -> 正文阅读

[大数据]Canal通过Kafka实现MySQL与Redis同步


Canal通过Kafka实现MySQL与Redis同步

Docker 环境安装、MySQL 安装、Redis 安装、Canal 安装、MySQL文件配置和 Canal 文件配置请移步 Canal通过TCP实现MySQL与Redis同步 查看。

Zookeeper 安装

  • 下载 Zookeeper3.7.0 的 docker 镜像:
docker pull zookeeper:3.7.0
  • 使用如下命令启动 Zookeeper 服务:
docker run --name zookeeper -p 2181:2181 --restart always -d zookeeper:3.7.0

–restart always 的设置可以使 docker 启动时同时启动 Zookeeper。

Kafka 安装

  • 下载 kafka2.13-2.8.1 的 docker 镜像:
docker run  -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.4:2181 -e 
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.4:9092 -e 
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.13-2.8.1
  • 进入 kafka 容器:
docker exec -it kafka /bin/sh
  • 进入 Kafka 安装目录下:
cd opt/kafka_2.13-2.8.1
  • 创建 topic
kafka-topics.bat --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic mall

修改 Canal 文件配置

  • 进入 Canal 容器:
docker exec -it canal /bin/bash
  • 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/instance.properties

在这里插入图片描述
在这里插入图片描述

  • 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/example/instance.properties

在这里插入图片描述

编写实体类和 Kafka 消费者

  • 引入 Kafka 依赖:
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
  • 在 application.yml 文件增加 Kafka 配置:
kafka:
    bootstrap-servers: 10.0.0.4:9092
    consumer:
      group-id: mall-master
  • 创建 CanalBean 对象接收 Kafka 消息:
package com.macro.mall.canal;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

/**
 * @ClassName CanalBean
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:46
 * @Version 1.0
 **/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalBean {

    /** 数据 */
    private List<UmsAdmin> data;

    /** 数据库名称 */
    private String database;

    private long es;

    /** 递增,从1开始 */
    private int id;

    /** 是否是DDL语句 */
    private boolean isDdl;

    /** 表结构的字段类型 */
    private MysqlType mysqlType;

    /** UPDATE语句,旧数据 */
    private String old;

    /** 主键名称 */
    private List<String> pkNames;

    /** sql语句 */
    private String sql;

    private SqlType sqlType;

    /** 表名 */
    private String table;

    private long ts;

    /** (新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 */
    private String type;
}
package com.macro.mall.canal;

/**
 * @ClassName MysqlType
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:48
 * @Version 1.0
 **/

public class MysqlType {
    private String id;
    private String username;
    private String password;
    private String icon;
    private String email;
    private String nickName;
    private String note;
    private String createTime;
    private String loginTime;
    private String status;
}
package com.macro.mall.canal;

/**
 * @ClassName SqlType
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:49
 * @Version 1.0
 **/
public class SqlType {
    private int id;
    private int username;
    private int password;
    private int icon;
    private int email;
    private int nickName;
    private int note;
    private int createTime;
    private int loginTime;
    private int status;
}
  • 创建 Kafka 消费者:
package com.macro.mall.canal;


import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * @ClassName CanalConsumer
 * @Description TODO
 * @Author 听秋
 * @Date 2022/5/9 19:49
 * @Version 1.0
 **/
@Component
@Slf4j
public class CanalConsumer {

    @Autowired
    private RedisTemplate redisTemplate;

    private static String REDIS_DATABASE = "mall";
    private static String REDIS_KEY_ADMIN = "ums:admin";

    private static String insert = "INSERT";
    private static String update = "UPDATE";


    @KafkaListener(topics = "mall")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{}, key:{}, 分区位置:{}, 下标:{}, value:{}", consumer.topic(), consumer.key(),
                consumer.partition(), consumer.offset(), value);

        // 转换为javaBean
        CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);

        // 获取类型
        String type = canalBean.getType();

        // 获取是否是DDL语句
        boolean isDdl = canalBean.isDdl();

        // 不是DDL语句
        if (!isDdl) {
            List<UmsAdmin> UmsAdmins = canalBean.getData();

            if (insert.equals(type) || update.equals(type)) {
                //新增或更新语句
                for (UmsAdmin umsAdmin : UmsAdmins) {
                    //新增到redis中
                    redisTemplate.opsForValue().set(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername(),
                            JSONObject.toJSONString(umsAdmin));
                }
            } else {
                // 删除语句
                for (UmsAdmin umsAdmin : UmsAdmins) {
                    //从redis中删除
                    redisTemplate.delete(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername());
                }
            }
        }
    }
}

验证

  • 修改前

MySQL
在这里插入图片描述
Redis
在这里插入图片描述

  • 修改后

日志
在这里插入图片描述
MySQL
在这里插入图片描述
Redis
在这里插入图片描述

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-11 16:30:44  更:2022-05-11 16:32:24 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:35:39-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码