Canal-Spring-Boot-Starter 使用
先上代码
1、在spring boot 项目配置文件 application.yml内增加以下内容
spring:
canal:
instances:
example:
host: 192.168.10.179
port: 11111
user-name: canal
password: canal
batch-size: 600
retry-count: 5
cluster-enabled: false
zookeeper-address:
acquire-interval: 1000
subscribe: .*\\..*
2、在spring boot 项目中的代码使用实例
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.duxinglangzi.canal.starter.annotation.CanalListener;
import com.duxinglangzi.canal.starter.annotation.CanalUpdateListener;
import org.springframework.stereotype.Service;
import java.util.stream.Collectors;
@Service
public class CanalListenerTest {
@CanalUpdateListener(destination = "example", database = "books", table = {"users"})
public void listenerExampleBooksUsers(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
printChange("listenerExampleBooksUsers",eventType, rowData);
}
@CanalUpdateListener(destination = "example", database = "books", table = {"books"})
public void listenerExampleBooksBooks(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
printChange("listenerExampleBooksBooks",eventType, rowData);
}
@CanalListener(destination = "example", database = "books", eventType = CanalEntry.EventType.UPDATE)
public void listenerExampleBooksAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
printChange("listenerExampleBooksAll",eventType, rowData);
}
@CanalListener(destination = "example", eventType = CanalEntry.EventType.UPDATE)
public void listenerExampleAll(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
printChange("listenerExampleAll",eventType, rowData);
}
@CanalListener(eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
public void listenerAllDml(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
printChange("listenerAllDml",eventType, rowData);
}
public void printChange(String method,CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
if (eventType == CanalEntry.EventType.DELETE) {
rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
System.out.println("[方法: "+method+" , delete 语句 ] --->> 字段名: " + ele.getName() + ", 删除的值为: " + ele.getValue());
});
}
if (eventType == CanalEntry.EventType.INSERT) {
rowData.getAfterColumnsList().stream().collect(Collectors.toList()).forEach(ele -> {
System.out.println("[方法: "+method+" ,insert 语句 ] --->> 字段名: " + ele.getName() + ", 新增的值为: " + ele.getValue());
});
}
if (eventType == CanalEntry.EventType.UPDATE) {
for (int i = 0; i < rowData.getAfterColumnsList().size(); i++) {
CanalEntry.Column afterColumn = rowData.getAfterColumnsList().get(i);
CanalEntry.Column beforeColumn = rowData.getBeforeColumnsList().get(i);
System.out.println("[方法: "+method+" , update 语句 ] -->> 字段名," + afterColumn.getName() +
" , 是否修改: " + afterColumn.getUpdated() +
" , 修改前的值: " + beforeColumn.getValue() +
" , 修改后的值: " + afterColumn.getValue());
}
}
}
}
以上展示了在 spring boot 项目中 canal starter 的基本使用
3、源码地址
对于 canal-spring-boot-starter 源代码为楼主自己封装, 地址: 大型全球交友平台地址 github
如果有需要的同学,可以自行下载源码进行修改和自定义封装
|