#数据库开启binlog 查看我之前的教程 docker 创建MySQL
docker cp mysql:/etc/mysql/mysql.conf.d/mysqld.cnf /
vim /mysqld.cnf
i
##添加下面的内容
log-bin=mysql-bin
server-id=1
保存 重启mysql容器
安装canal
docker run -p 11111:11111 --name=canal -id canal/canal-server
##进入 canal 容器
docker exec -it canal bash
#编辑
cd /home/admin/canal-server/conf/example/
vi instance.properties
保存 重启cannel容器
java客户端
##导入两个依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
<!-- Message、CanalEntry.Entry等来自此安装包 -->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.protocol</artifactId>
<version>1.1.5</version>
</dependency>
测试用例
package com.zixi;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class Simple {
private static final String SERVER_ADDRESS = "服务地址";
private static final Integer PORT = 11111;
private static final String DESTINATION = "example";
private static final String USERNAME = "";
private static final String PASSWORD = "";
public static void main(String[] args) {
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(SERVER_ADDRESS, PORT), DESTINATION, USERNAME, PASSWORD);
canalConnector.connect();
canalConnector.subscribe(".*\\..*");
canalConnector.rollback();
for (; ; ) {
Message message = canalConnector.getWithoutAck(100);
long batchId = message.getId();
if (batchId != -1) {
System.out.println("msgId -> " + batchId);
printEnity(message.getEntries());
}
}
}
private static void printEnity(List<CanalEntry.Entry> entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() != CanalEntry.EntryType.ROWDATA) {
continue;
}
try {
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
System.out.println(rowChange.getEventType());
switch (rowChange.getEventType()) {
case INSERT:
String tableName = entry.getHeader().getTableName();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
System.out.println(afterColumnsList);
break;
case UPDATE:
List<CanalEntry.Column> afterColumnsList2 = rowData.getAfterColumnsList();
System.out.println("新插入的数据是:" + afterColumnsList2);
break;
case DELETE:
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
System.out.println("被删除的数据是:" + beforeColumnsList);
break;
default:
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
}
}
|