Canal通过TCP实现MySQL与Redis同步
Docker 环境安装
yum install -y yum-utils device-mapper-persistent-data lvm2
yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
yum install docker-ce
systemctl start docker
MySQL 安装
docker pull mysql:5.7
docker run -p 3306:3306 --name mysql \
-v /mydata/mysql/log:/var/log/mysql \
-v /mydata/mysql/data:/var/lib/mysql \
-v /mydata/mysql/conf:/etc/mysql \
-e MYSQL_ROOT_PASSWORD=root \
-d mysql:5.7
-p 3306:3306:将容器的3306端口映射到主机的3306端口 -v /mydata/mysql/conf:/etc/mysql:将配置文件夹挂在到主机 -v /mydata/mysql/log:/var/log/mysql:将日志文件夹挂载到主机 -v /mydata/mysql/data:/var/lib/mysql/:将数据文件夹挂载到主机 -e MYSQL_ROOT_PASSWORD=root:初始化root用户的密码
docker exec -it mysql /bin/bash
- 上传 mall.sql 文件文件并拷贝到 mysql 容器的/目录下:
docker cp /mydata/mall.sql mysql:/
mysql -uroot -proot --default-character-set=utf8
create database mall character set utf8;
use mall;
source /mall.sql;
- 创建一个 reader:123456 帐号并修改权限,使得任何ip都能访问:
grant all privileges on *.* to 'reader' @'%' identified by '123456';
Redis 安装
docker pull redis:5
docker run -p 6379:6379 --name redis \
-v /mydata/redis/data:/data \
-d redis:5 redis-server --appendonly yes
- 进入Redis容器使用 redis-cli 命令进行连接:
docker exec -it redis redis-cli
安装 Canal
docker pull canal/canal-server:v1.1.4
sudo docker run -it --name canal -p 11111:11111 \
-p 8000:8000 -p 2222:2222 -p 1111:1111 -p 11112:11112 \
-p 11110:11110 -d canal/canal-server:v1.1.4
MySQL文件配置
docker exec -it mysql /bin/bash
遇到的问题:在 /etc/mysql 文件夹下没有发现 my.cnf 文件 解决办法:手动创建 my.cnf 文件
vim /etc/my.cnf
如果出现 bash: vi: command not found 这个问题,则通过以下命令 apt-get update 和 apt-get install vim 解决。
# 开启 mysql 的 binlog 模块
log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW
# server_id 需保证唯一,不能和 canal 的 slaveId 重复
server_id=1
# 需要同步的数据库名称
binlog-do-db=mall
# 忽略的数据库,建议填写
binlog-ignore-db=mysql
# 启动 mysql 时不启动 grant-tables 授权表
skip-grant-tables
- 创建一个 MySQL 用户 canal 并授予权限:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON mall.ums_admin TO 'canal'@'%';
FLUSH PRIVILEGES;
Canal 文件配置
docker exec -it canal /bin/bash
- 修改canal的配置文件canal.properties
vim canal-server/conf/example/instance.properties
注意:上图中第二处默认为不要使用 127.0.0.1,不然会出现无法连接 MySQL 的错误。
- 查看 canal-server/logs/example/example.log 日志,如下图则表示启动成功。
编写 Jedis 工具类和 Canal 配置类
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.0.25</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.0</version>
</dependency>
package com.macro.mall.canal;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
private static String ip = "10.0.0.4";
private static int port = 6379;
private static int timeout = 10000;
private static JedisPool pool = null;
static {
JedisPoolConfig config = new JedisPoolConfig();
config.setMaxTotal(1024);
config.setMaxIdle(200);
config.setMaxWaitMillis(10000);
config.setTestOnBorrow(true);
pool = new JedisPool(config, ip, port, timeout);
}
public static Jedis getJedis() {
if (pool != null) {
return pool.getResource();
} else {
return null;
}
}
public static void close(final Jedis redis) {
if (redis != null) {
redis.close();
}
}
public static boolean existKey(String key) {
return getJedis().exists(key);
}
public static void delKey(String key) {
getJedis().del(key);
}
public static String stringGet(String key) {
return getJedis().get(key);
}
public static String stringSet(String key, String value) {
return getJedis().set(key, value);
}
public static String stringSet(String key, String value, long time) {
return getJedis().set(key, value, null, null, time);
}
public static void hashSet(String key, String field, String value) {
getJedis().hset(key, field, value);
}
}
package com.macro.mall.canal;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class CanalSyncConfig {
private static String REDIS_DATABASE = "mall";
private static String REDIS_KEY_ADMIN = "ums:admin";
@Bean
public static void canalSync() {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.0.0.4",
11111), "example", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
printEntry(message.getEntries());
}
connector.ack(batchId);
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
redisDelete(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
redisInsert(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
redisUpdate(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
private static void redisInsert(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":"
+ columns.get(1).getValue(), json.toJSONString());
}
}
private static void redisUpdate(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.stringSet(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":"
+ columns.get(1).getValue(), json.toJSONString());
}
}
private static void redisDelete(List<Column> columns) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.delKey(REDIS_DATABASE + ":" + REDIS_KEY_ADMIN + ":" + columns.get(1).getValue());
}
}
}
验证
日志 MySQL Redis
MySQL 后台日志 Redis
|