一、mysql整理
1、开启binlog日志
(1)mysql开启binlog日志 可以在my.ini 或 my.cnf 中配置
(2)必填内容
log-bin = mysql-bin
# 设置服务id,主从不能一致
server-id = 1
# BINLOG模式为row
binlog-format = ROW
(3)非必填内容
# 设置需要同步的数据库
binlog-do-db=ds_01
# 屏蔽系统库同步
binlog-ignore-db=mysql
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
(4) 配置完成重启服务
(5)验证是否成功
# 查看binlog是否开启 on为开启
show variables like 'log_bin'
# 查看binlog是否开启 row 模式
show variables like 'binlog_format';
# 查询 可以获取到 binlog记录日志文件名 mysql-bin.000001 ,position 4040 ,binlog_db,以及其他信息
show master status;
#创建账号,分配账号权限
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'sync_db'@'localhost'
#查看密码插件,可能影响canal 连不上数据库
select host,user,plugin from mysql.user;
二、canal安装配置
1、下载canal.deployer-1.1.5.tar.gz,解压
官方下载地址
2、进入conf文件夹 复制example文件夹 改为自定义的 实例名称
3、主要改动文件 canal.properties ,instance.properties
(1)canal.properties 去掉mq位置无用代码,追加
#模式设置为rabbitMq模式
# canal.serverMode = rabbitMQ
#地址,不需要端口
# rabbitmq.host = 127.0.0.1
#当前Vhost
# rabbitmq.virtual.host = /
#刚才配置的交换机
# rabbitmq.exchange = cannal-exchange
#刚才配置的账号密码
# rabbitmq.username = cannal
# rabbitmq.password = cannal
# rabbitmq.deliveryMode = 2
(2)、instance.properties
#数据库地址
# canal.instance.master.address=127.0.0.1:3306
#binlog文件名
# canal.instance.master.journal.name=mysql-bin.000003
# username/password 连接,其实已经配好了
# canal.instance.dbUsername=sync_db
# canal.instance.dbPassword=sync_db
# canal.instance.connectionCharset = UTF-8
# mq config, 指定 rabbitmq 设置绑定的路由
# canal.mq.topic=cannal-exchange-routing
# canal.instance.slaveId=2
三、rabbitMq安装
略。。。
四 、代码
1、application.properties 文件
rabbitmq.canal=canal
# 主题名称
rabbitmq.topic=""
# 交换机
rabbitmq.exchange=canal-exchange
# 路由
rabbitmq.routing=canal-exchange-routing
# 队列持久化
rabbitmq.queue.durable=true
# 队列私有
rabbitmq.queue.exclusive=false
# 临时队列
rabbitmq.queue.autoDelete=false
rabbitmq.exchange.durable=true
rabbitmq.exchange.autoDelete=false
# ip
spring.rabbitmq.host=localhost
# 端口号
spring.rabbitmq.port=5672
# 用户名
spring.rabbitmq.username=canalcustomer
# 密码
spring.rabbitmq.password=canalcustomer
2、
package com.example.cannal.cannal;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
@Component
public class CanalConsumer {
private static Logger log = LoggerFactory.getLogger(CanalConsumer.class);
@RabbitListener(bindings = @QueueBinding(
value = @Queue("${rabbitmq.cannal}"),
exchange = @Exchange("${rabbitmq.exchange}")
))
public void receive(Message message, Channel channel) throws IOException {
try {
String value = new String(message.getBody());
CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class);
boolean isDdl = canalBean.getIsDdl();
String type = canalBean.getType();
if (!isDdl) {
List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
long TIME_OUT = 600L;
if ("INSERT".equals(type)) {
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
System.out.println("INSERT:" + JSONObject.toJSONString(tbCommodityInfo).toString());
}
} else if ("UPDATE".equals(type)) {
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
System.out.println("UPDATE:" + JSONObject.toJSONString(tbCommodityInfo).toString());
}
} else {
for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
String id = tbCommodityInfo.getId();
System.out.println("DELETE:" + JSONObject.toJSONString(tbCommodityInfo).toString());
}
}
}
}catch(Exception e){
log.error("error",e);
}
}
}
package com.example.cannal.cannal;
import java.util.List;
public class CanalBean {
private List<TbCommodityInfo> data;
private String database;
private long es;
private int id;
private boolean isDdl;
private MysqlType mysqlType;
private String old;
private List<String> pkNames;
private String sql;
private SqlType sqlType;
private String table;
private long ts;
private String type;
public List<TbCommodityInfo> getData() {
return data;
}
public void setData(List<TbCommodityInfo> data) {
this.data = data;
}
public boolean isDdl() {
return isDdl;
}
public void setDdl(boolean ddl) {
isDdl = ddl;
}
public void setDatabase(String database) {
this.database = database;
}
public String getDatabase() {
return database;
}
public void setEs(long es) {
this.es = es;
}
public long getEs() {
return es;
}
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setIsDdl(boolean isDdl) {
this.isDdl = isDdl;
}
public boolean getIsDdl() {
return isDdl;
}
public void setMysqlType(MysqlType mysqlType) {
this.mysqlType = mysqlType;
}
public MysqlType getMysqlType() {
return mysqlType;
}
public void setOld(String old) {
this.old = old;
}
public String getOld() {
return old;
}
public void setPkNames(List<String> pkNames) {
this.pkNames = pkNames;
}
public List<String> getPkNames() {
return pkNames;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getSql() {
return sql;
}
public void setSqlType(SqlType sqlType) {
this.sqlType = sqlType;
}
public SqlType getSqlType() {
return sqlType;
}
public void setTable(String table) {
this.table = table;
}
public String getTable() {
return table;
}
public void setTs(long ts) {
this.ts = ts;
}
public long getTs() {
return ts;
}
public void setType(String type) {
this.type = type;
}
public String getType() {
return type;
}
}
package com.example.cannal.cannal;
public class MysqlType {
private String id;
private String commodity_name;
private String commodity_price;
private String number;
private String description;
public void setId(String id) {
this.id = id;
}
public String getId() {
return id;
}
public void setCommodity_name(String commodity_name) {
this.commodity_name = commodity_name;
}
public String getCommodity_name() {
return commodity_name;
}
public void setCommodity_price(String commodity_price) {
this.commodity_price = commodity_price;
}
public String getCommodity_price() {
return commodity_price;
}
public void setNumber(String number) {
this.number = number;
}
public String getNumber() {
return number;
}
public void setDescription(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
package com.example.cannal.cannal;
public class SqlType {
private int id;
private int commodity_name;
private int commodity_price;
private int number;
private int description;
public void setId(int id) {
this.id = id;
}
public int getId() {
return id;
}
public void setCommodity_name(int commodity_name) {
this.commodity_name = commodity_name;
}
public int getCommodity_name() {
return commodity_name;
}
public void setCommodity_price(int commodity_price) {
this.commodity_price = commodity_price;
}
public int getCommodity_price() {
return commodity_price;
}
public void setNumber(int number) {
this.number = number;
}
public int getNumber() {
return number;
}
public void setDescription(int description) {
this.description = description;
}
public int getDescription() {
return description;
}
}
|