Canal通过Kafka实现MySQL与Redis同步
Docker 环境安装、MySQL 安装、Redis 安装、Canal 安装、MySQL文件配置和 Canal 文件配置请移步 Canal通过TCP实现MySQL与Redis同步 查看。
Zookeeper 安装
- 下载 Zookeeper3.7.0 的 docker 镜像:
docker pull zookeeper:3.7.0
docker run --name zookeeper -p 2181:2181 --restart always -d zookeeper:3.7.0
–restart always 的设置可以使 docker 启动时同时启动 Zookeeper。
Kafka 安装
- 下载 kafka2.13-2.8.1 的 docker 镜像:
docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0
-e KAFKA_ZOOKEEPER_CONNECT=10.0.0.4:2181 -e
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://10.0.0.4:9092 -e
KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka:2.13-2.8.1
docker exec -it kafka /bin/sh
cd opt/kafka_2.13-2.8.1
kafka-topics.bat --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic mall
修改 Canal 文件配置
docker exec -it canal /bin/bash
- 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/instance.properties
- 修改 canal 的配置文件 canal.properties:
vi canal-server/conf/example/instance.properties
编写实体类和 Kafka 消费者
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 在 application.yml 文件增加 Kafka 配置:
kafka:
bootstrap-servers: 10.0.0.4:9092
consumer:
group-id: mall-master
- 创建 CanalBean 对象接收 Kafka 消息:
package com.macro.mall.canal;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CanalBean {
private List<UmsAdmin> data;
private String database;
private long es;
private int id;
private boolean isDdl;
private MysqlType mysqlType;
private String old;
private List<String> pkNames;
private String sql;
private SqlType sqlType;
private String table;
private long ts;
private String type;
}
package com.macro.mall.canal;
public class MysqlType {
private String id;
private String username;
private String password;
private String icon;
private String email;
private String nickName;
private String note;
private String createTime;
private String loginTime;
private String status;
}
package com.macro.mall.canal;
public class SqlType {
private int id;
private int username;
private int password;
private int icon;
private int email;
private int nickName;
private int note;
private int createTime;
private int loginTime;
private int status;
}
package com.macro.mall.canal;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class CanalConsumer {
@Autowired
private RedisTemplate redisTemplate;
private static String REDIS_DATABASE = "mall";
private static String REDIS_KEY_ADMIN = "ums:admin";
private static String insert = "INSERT";
private static String update = "UPDATE";
@KafkaListener(topics = "mall")
public void receive(ConsumerRecord<?, ?> consumer) {
String value = (String) consumer.value();
log.info("topic名称:{}, key:{}, 分区位置:{}, 下标:{}, value:{}", consumer.topic(), consumer.key(),
consumer.partition(), consumer.offset(), value);
CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
String type = canalBean.getType();
boolean isDdl = canalBean.isDdl();
if (!isDdl) {
List<UmsAdmin> UmsAdmins = canalBean.getData();
if (insert.equals(type) || update.equals(type)) {
for (UmsAdmin umsAdmin : UmsAdmins) {
redisTemplate.opsForValue().set(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername(),
JSONObject.toJSONString(umsAdmin));
}
} else {
for (UmsAdmin umsAdmin : UmsAdmins) {
redisTemplate.delete(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + umsAdmin.getUsername());
}
}
}
}
}
验证
MySQL Redis
日志 MySQL Redis
|