一.Binlog策略模式接口
public interface BinlogKeeper {
default String tableName(){
return "default_table";
}
default boolean dealMqMessage(BinlogMessage binlogMessage){
return true;
}
}
二.Binlog策略模式接口实现,其他的表Binlog都类似
@Slf4j
@Service(value = "defaultBinlogService")
public class DefaultBinlogServiceImpl implements BinlogKeeper {
public static final String TABLE_NAME = "default_table";
@Override
public String tableName() {
return TABLE_NAME;
}
@Override
public boolean dealMqMessage(BinlogMessage binlogMessage) {
String table = binlogMessage.getTable();
log.warn("待处理的binlog未配置,方法名:defaultBinlogService,表名: " + table);
return true;
}
}
三.工厂+策略模式
@Component
public class BinlogServiceFactory {
@Autowired
ApplicationContext applicationContext;
private static final Map<String, BinlogKeeper> serviceMap = new ConcurrentHashMap<>(8);
@PostConstruct
private void initServiceMap() {
Map<String, BinlogKeeper> beans = applicationContext.getBeansOfType(BinlogKeeper.class);
beans.forEach((beanName, service) -> serviceMap.put(service.tableName(), service));
}
public BinlogKeeper getInstance(String table) {
Assert.notNull(table, "表名不能为null");
if (serviceMap.containsKey(table)) {
return serviceMap.get(table);
} else {
return serviceMap.get(DefaultBinlogServiceImpl.TABLE_NAME);
}
}
}
四.BinlogMessage 实体
@Data
public class BinlogMessage implements Serializable {
private String database;
private String table;
private String type;
private String ts;
private String xid;
private boolean commit;
private String position;
private String server_id;
private Integer thread_id;
private Integer schema_id;
private String data;
private JSONObject old;
}
五.监听binlog对应的MQ消息
@Value("${xxx:50}")
private Integer limiter = 50;
@Value("${xxx:50}")
private Integer consumerConcurrency;
private RateLimiter rateLimiter;
@PostConstruct
public void init() {
if (consumerConcurrency == null || consumerConcurrency <= 0) {
rateLimiter = RateLimiter.create(limiter);
}else {
rateLimiter = RateLimiter.create(consumerConcurrency);
}
}
public void receive(String payload) {
rateLimiter.acquire();
log.info("接收到的信息:{}",payload);
BinlogMessage binlogMessage = JSON.parseObject(payload, BinlogMessage.class);
log.info("解析后的消息:{}",JSONObject.toJSONString(binlogMessage));
String table = binlogMessage.getTable();
if (StringUtils.isBlank(table) || StringUtils.isBlank(binlogMessage.getData())) {
log.error("监听到的消息,表名或表数据为空,消息体:{}",JSONObject.toJSONString(binlogMessage));
}
BinlogKeeper binlogService = binlogServiceFactory.getInstance(table);
binlogService.dealMqMessage(binlogMessage);
}
|