与消息队列(Kafka)进行数据对接,修改canal输出的JSON格式
介绍
目前各类资产数据分布在不同系统,基于现在管理需求,需要将各系统中资产数据采集到大数据底座中进行统一存储与管理。采用消息队列(Kafka)的方式进行对接,要求每次以增量数据的方式发送,CUD(创建、更新、删除)操作均要求将整行数据传输到消息通道中。
一、测试环境部署
JDK = 1.8
MySQL =5.7.0
zookeeper = 3.7.0
canan = 1.1.5
二、配置运行环境
1. mysql 配置
1、运行vim /etc/my.cnf ,修改mysql配置文件my.cnf , 开启 log_bin 。
[mysqld]
log-bin=mysql-bin
server-id=246
2、修改完配置文件后,重新启动mysql服务。
systemctl restart mysqld.service
3、查看mysql的log-bin是否开启成功。
mysql -uroot -p'mysql的登录密码'
查看配置结果是否成功:
mysql> SHOW VARIABLES LIKE '%bin%';
2. 安装zookeeper
1、canal和kafka都依赖于zookeeper做服务协调,需要部署并配置zookeeper注册中心,这里选用的是3.7.0版本。(注意要下载apache-zookeeper-3.7.0-bin.tar.gz 带有bin的版本)。
cd /root/software
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
tar -zvxf apache-zookeeper-3.7.0-bin.tar.gz
mv ./apache-zookeeper-3.7.0-bin /usr/local/zookeeper
cd /usr/local/zookeeper
mkdir data
mkdir logs
cd ./conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg
2、修改配置文件,修改dataDir=/usr/local/zookeeper/data ,添加dataLogDir=/usr/local/zookeeper/logs ,由于仅用于测试,所以只采用单实例的没有集群配置节点。
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
同时,增加
echo "1">/usr/local/zookeeper/data/myid
3、启动zookeeper注册中心,默认的端口是2181 。
/usr/local/zookeeper/bin/zkServer.sh start
查看启动状态:
/usr/local/zookeeper/bin/zkServer.sh status
3.安装Kafka
1、Kafka 是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper ,在此选用kafka_2.12-2.6.2版本
mkdir /usr/local/kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz
cd /usr/local/kafka
tar -zxvf kafka_2.12-2.6.2.tgz
rm -rf kafka_2.12-2.6.2.tgz
2、修改配置文件:vim /usr/local/kafka/kafka_2.12-2.6.2/config/server.properties ,注意需要配置host.name ,listeners=PLAINTEXT://:9092 否则只能本地才能访问
zookeeper.connect=localhost:2181
host.name=192.168.110.244
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.110.244:9092
3、启动kafka
/usr/local/kafka/kafka_2.12-2.6.2/bin/kafka-server-start.sh -daemon /usr/local/kafka/kafka_2.12-2.6.2/config/server.properties &
查看所有topic:
bin/kafka-topics.sh --list --zookeeper localhost:2181
查看指定topic 下面的数据:
/usr/local/kafka/kafka_2.12-2.6.2/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic example
4.安装canal数据同步
1、下载Canal的v1.1.5发布版,canal.deployer-1.1.5.tar.gz 。
mkdir /usr/local/canal
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz
rm -rf canal.deployer-1.1.5.tar.gz
解压之后目录如下:
- bin
- conf
canal_local.properties
canal.properties
logback.xml
metrics
spring
example
instance.properties
- lib
- logs
2、在开发和测试环境把logback.xml 的日志级别改为DEBUG ,方便查找问题。主要需要修改canal.properties 和instance.properties 两个配置文件。 canal.properties 文件中,需要修改:
- 去掉
canal.instance.parser.parallelThreadSize = 16 这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。 canal.serverMode 配置项指定为kafka ,可选值有tcp 、kafka 、rocketmq 和rabbitmq ,默认是tcp 。- 1.1.5版本中的配置
kafka.bootstrap.servers = 192.168.110.244:9092
canal.instance.parser.parallelThreadSize = 16
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.110.244:9092
instance.properties 一般指一个数据库实例的配置,canal架构支持一个canal服务实例,处理多个数据库实例的binlog异步解析。主要需要修改的配置项主要包括:
canal.instance.mysql.slaveId = 2460
canal.instance.master.address=192.168.110.246:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=Abcde12345!@
canal.instance.defaultDatabaseName = test
canal.instance.filter.regex=test.B,test.C
canal.mq.topic=example
三、修改canal的输出格式
1.默认输出格式
{"data":[{"id":"2","order_id":"10086","amount":"10087.0","create_time":"2021-08-04 18:05:05"}],"database":"test","es":1628071686000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","order_id":"varchar(64)","amount":"decimal(10,2)","create_time":"datetime"},"old":[{"amount":"999.0"}],"ts":1628071686131,"type":"UPDATE","pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order"}
2、项目要求的输出格式
消息格式(JSON):
{
"table":"表名",
"op_type":"操作类型",
"op_ts":"操作时间",
"data":{
"数据字段名称":"数据字段的值"
}
}
字段、值和类型说明:
- table:表名均采用大写形式,并用下划线分隔命名方式;
- op_type:I(创建)、U(更新)、D(删除) ,如无操作类型该字段可不传值;
- 时间字段格式:yyyy-MM-dd HH24:mm:ss(示例:2021-07-20 16:11:26);
- 数据字段名称:均采用大写形式,并用下划线分隔命名方式;
- 数据字段值:除数值类型外,均采用字符串类型。
3、由于默认输出格式不满足项目要求,所以需要修改源代码,调整输出的消息格式;
下载源代码releases
4、下载源代码后,导入项目中,修改FlatMessage 和MQMessageUtils 两个类中的代码
FlatMessage 类修改:
- 把
ts 改成op_ts 并设置成字符型,然后再转换的时候ToString() ; - 把
type 改成op_type ; - 把
List<Map<String, String>> 改成Map<String, String> ,数组改成对象;
源代码如下:
public class FlatMessage implements Serializable {
private static final long serialVersionUID = -3386650678735860050L;
private long id;
private String database;
private String table;
private List<String> pkNames;
private Boolean isDdl;
private String op_type;
private Long es;
private String op_ts;
private String sql;
private Map<String, Integer> sqlType;
private Map<String, String> mysqlType;
private Map<String, String> data;
private Map<String, String> old;
public FlatMessage() {
}
public FlatMessage(long id) {
this.id = id;
}
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public String getDatabase() {
return database;
}
public void setDatabase(String database) {
this.database = database;
}
public String getTable() {
return table;
}
public void setTable(String table) {
this.table = table;
}
public List<String> getPkNames() {
return pkNames;
}
public void addPkName(String pkName) {
if (this.pkNames == null) {
this.pkNames = Lists.newArrayList();
}
this.pkNames.add(pkName);
}
public void setPkNames(List<String> pkNames) {
this.pkNames = pkNames;
}
public Boolean getIsDdl() {
return isDdl;
}
public void setIsDdl(Boolean isDdl) {
this.isDdl = isDdl;
}
public String getOp_type() {
return op_type;
}
public void setOp_type(String op_type) {
this.op_type = op_type;
}
public String getOp_ts() {
return op_ts;
}
public void setOp_ts(String op_ts) {
this.op_ts = op_ts;
}
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public Map<String, Integer> getSqlType() {
return sqlType;
}
public void setSqlType(Map<String, Integer> sqlType) {
this.sqlType = sqlType;
}
public Map<String, String> getMysqlType() {
return mysqlType;
}
public void setMysqlType(Map<String, String> mysqlType) {
this.mysqlType = mysqlType;
}
public Map<String, String> getData() {
return data;
}
public void setData(Map<String, String> data) {
this.data = data;
}
public Map<String, String> getOld() {
return old;
}
public void setOld(Map<String, String> old) {
this.old = old;
}
public Long getEs() {
return es;
}
public void setEs(Long es) {
this.es = es;
}
@Override
public String toString() {
return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", op_type="
+ op_type + ", es=" + es + ", op_ts=" + op_ts + ", sql=" + sql + ", sqlType=" + sqlType + ", mysqlType="
+ mysqlType + ", data=" + data + ", old=" + old + "]";
}
}
MQMessageUtils 类修改,基本就是修改messageConverter 函数:
-
将data中输出的字段名改成大写: -
修改op_type的操作类型: -
修改data和old的类型:
由于是数组改成对象,需要把data的输出结果放到循环中。
- 修改op_ts的类型,添加一个自定义的日期类,输出格式为:yyyy-MM-dd HH24:mm:ss
源代码如下:
public class MQMessageUtils {
private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
.softValues(),
pkHashConfigs -> {
List<PartitionData> datas = Lists.newArrayList();
String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
",",
";"),
";");
for (String pkHashConfig : pkHashConfigArray) {
PartitionData data = new PartitionData();
int i = pkHashConfig.lastIndexOf(":");
if (i > 0) {
String pkStr = pkHashConfig.substring(i + 1);
if (pkStr.equalsIgnoreCase("$pk$")) {
data.hashMode.autoPkHash = true;
} else {
data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
'^'));
}
pkHashConfig = pkHashConfig.substring(0,
i);
} else {
data.hashMode.tableHash = true;
}
if (!isWildCard(pkHashConfig)) {
data.simpleName = pkHashConfig;
} else {
data.regexFilter = new AviaterRegexFilter(pkHashConfig);
}
datas.add(data);
}
return datas;
});
private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
.softValues(),
pkHashConfigs -> {
List<DynamicTopicData> datas = Lists.newArrayList();
String[] dynamicTopicArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
",",
";"),
";");
for (String dynamicTopic : dynamicTopicArray) {
DynamicTopicData data = new DynamicTopicData();
if (!isWildCard(dynamicTopic)) {
data.simpleName = dynamicTopic;
} else {
if (dynamicTopic.contains("\\.")) {
data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
} else {
data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
}
}
datas.add(data);
}
return datas;
});
private static Map<String, List<TopicPartitionData>> topicPartitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
.softValues(),
tPConfigs -> {
List<TopicPartitionData> datas = Lists.newArrayList();
String[] tPArray = StringUtils.split(StringUtils.replace(tPConfigs,
",",
";"),
";");
for (String tPConfig : tPArray) {
TopicPartitionData data = new TopicPartitionData();
int i = tPConfig.lastIndexOf(":");
if (i > 0) {
String tStr = tPConfig.substring(0, i);
String pStr = tPConfig.substring(i + 1);
if (!isWildCard(tStr)) {
data.simpleName = tStr;
} else {
data.regexFilter = new AviaterRegexFilter(tStr);
}
if (!StringUtils.isEmpty(pStr) && StringUtils.isNumeric(pStr)) {
data.partitionNum = Integer.valueOf(pStr);
}
datas.add(data);
}
}
return datas;
});
public static Map<String, Message> messageTopics(Message message, String defaultTopic, String dynamicTopicConfigs) {
List<CanalEntry.Entry> entries;
if (message.isRaw()) {
List<ByteString> rawEntries = message.getRawEntries();
entries = new ArrayList<>(rawEntries.size());
for (ByteString byteString : rawEntries) {
CanalEntry.Entry entry;
try {
entry = CanalEntry.Entry.parseFrom(byteString);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
entries.add(entry);
}
} else {
entries = message.getEntries();
}
Map<String, Message> messages = new HashMap<>();
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
put2MapMessage(messages, message.getId(), defaultTopic, entry);
} else {
Set<String> topics = matchTopics(schemaName + "." + tableName, dynamicTopicConfigs);
if (topics != null) {
for (String topic : topics) {
put2MapMessage(messages, message.getId(), topic, entry);
}
} else {
topics = matchTopics(schemaName, dynamicTopicConfigs);
if (topics != null) {
for (String topic : topics) {
put2MapMessage(messages, message.getId(), topic, entry);
}
} else {
put2MapMessage(messages, message.getId(), defaultTopic, entry);
}
}
}
}
return messages;
}
public static EntryRowData[] buildMessageData(Message message, ThreadPoolExecutor executor) {
ExecutorTemplate template = new ExecutorTemplate(executor);
if (message.isRaw()) {
List<ByteString> rawEntries = message.getRawEntries();
final EntryRowData[] datas = new EntryRowData[rawEntries.size()];
int i = 0;
for (ByteString byteString : rawEntries) {
final int index = i;
template.submit(() -> {
try {
Entry entry = Entry.parseFrom(byteString);
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
datas[index] = new EntryRowData();
datas[index].entry = entry;
datas[index].rowChange = rowChange;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
});
i++;
}
template.waitForResult();
return datas;
} else {
final EntryRowData[] datas = new EntryRowData[message.getEntries().size()];
int i = 0;
for (Entry entry : message.getEntries()) {
final int index = i;
template.submit(() -> {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
datas[index] = new EntryRowData();
datas[index].entry = entry;
datas[index].rowChange = rowChange;
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(e);
}
});
i++;
}
template.waitForResult();
return datas;
}
}
@SuppressWarnings("unchecked")
public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum,
String pkHashConfigs, boolean databaseHash) {
if (partitionsNum == null) {
partitionsNum = 1;
}
Message[] partitionMessages = new Message[partitionsNum];
List<Entry>[] partitionEntries = new List[partitionsNum];
for (int i = 0; i < partitionsNum; i++) {
partitionEntries[i] = Collections.synchronizedList(Lists.newArrayList());
}
for (EntryRowData data : datas) {
CanalEntry.Entry entry = data.entry;
CanalEntry.RowChange rowChange = data.rowChange;
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
if (rowChange.getIsDdl()) {
partitionEntries[0].add(entry);
} else {
if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
String database = entry.getHeader().getSchemaName();
String table = entry.getHeader().getTableName();
HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
if (hashMode == null) {
partitionEntries[0].add(entry);
} else if (hashMode.tableHash) {
int hashCode = table.hashCode();
int pkHash = Math.abs(hashCode) % partitionsNum;
pkHash = Math.abs(pkHash);
partitionEntries[pkHash].add(entry);
} else {
Entry.Builder builder = Entry.newBuilder(entry);
RowChange.Builder rowChangeBuilder = RowChange.newBuilder(rowChange);
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
int hashCode = 0;
if (databaseHash) {
hashCode = database.hashCode();
}
CanalEntry.EventType eventType = rowChange.getEventType();
List<CanalEntry.Column> columns = null;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
if (hashMode.autoPkHash) {
for (CanalEntry.Column column : columns) {
if (column.getIsKey()) {
hashCode = hashCode ^ column.getValue().hashCode();
}
}
} else {
for (CanalEntry.Column column : columns) {
if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
hashCode = hashCode ^ column.getValue().hashCode();
}
}
}
int pkHash = Math.abs(hashCode) % partitionsNum;
pkHash = Math.abs(pkHash);
rowChangeBuilder.clearRowDatas();
rowChangeBuilder.addRowDatas(rowData);
builder.clearStoreValue();
builder.setStoreValue(rowChangeBuilder.build().toByteString());
partitionEntries[pkHash].add(builder.build());
}
}
} else {
partitionEntries[0].add(entry);
}
}
}
for (int i = 0; i < partitionsNum; i++) {
List<Entry> entriesTmp = partitionEntries[i];
if (!entriesTmp.isEmpty()) {
partitionMessages[i] = new Message(id, entriesTmp);
}
}
return partitionMessages;
}
public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {
List<FlatMessage> flatMessages = new ArrayList<>();
for (EntryRowData entryRowData : datas) {
CanalEntry.Entry entry = entryRowData.entry;
CanalEntry.RowChange rowChange = entryRowData.rowChange;
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.EventType eventType = rowChange.getEventType();
if (!rowChange.getIsDdl()) {
Set<String> updateSet = new HashSet<>();
boolean hasInitPkNames = false;
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
FlatMessage flatMessage = new FlatMessage(id);
flatMessages.add(flatMessage);
flatMessage.setDatabase(entry.getHeader().getSchemaName());
flatMessage.setTable(entry.getHeader().getTableName());
flatMessage.setIsDdl(rowChange.getIsDdl());
String opType = eventType.toString();
switch (opType) {
case "INSERT":
opType = "I";
break;
case "UPDATE":
opType = "U";
break;
case "DELETE":
opType = "D";
break;
}
flatMessage.setOp_type(opType);
flatMessage.setEs(entry.getHeader().getExecuteTime());
flatMessage.setOp_ts(DateUtils.getTime());
flatMessage.setSql(rowChange.getSql());
if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
&& eventType != CanalEntry.EventType.DELETE) {
continue;
}
Map<String, String> row = new LinkedHashMap<>();
List<CanalEntry.Column> columns;
if (eventType == CanalEntry.EventType.DELETE) {
columns = rowData.getBeforeColumnsList();
} else {
columns = rowData.getAfterColumnsList();
}
Map<String, Integer> sqlType = new LinkedHashMap<>();
Map<String, String> mysqlType = new LinkedHashMap<>();
Map<String, String> data = new LinkedHashMap<>();
Map<String, String> old = new LinkedHashMap<>();
for (CanalEntry.Column column : columns) {
if (!hasInitPkNames && column.getIsKey()) {
flatMessage.addPkName(column.getName());
}
sqlType.put(column.getName(), column.getSqlType());
mysqlType.put(column.getName(), column.getMysqlType());
if (column.getIsNull()) {
row.put(column.getName().toUpperCase(), null);
} else {
row.put(column.getName().toUpperCase(), column.getValue());
}
if (column.getUpdated()) {
updateSet.add(column.getName());
}
}
hasInitPkNames = true;
if (!row.isEmpty()) {
data = row;
}
if (eventType == CanalEntry.EventType.UPDATE) {
Map<String, String> rowOld = new LinkedHashMap<>();
for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
if (updateSet.contains(column.getName())) {
if (column.getIsNull()) {
rowOld.put(column.getName(), null);
} else {
rowOld.put(column.getName(), column.getValue());
}
}
}
if (!rowOld.isEmpty()) {
old = rowOld;
}
}
if (!sqlType.isEmpty()) {
flatMessage.setSqlType(sqlType);
}
if (!mysqlType.isEmpty()) {
flatMessage.setMysqlType(mysqlType);
}
if (!data.isEmpty()) {
flatMessage.setData(data);
}
if (!old.isEmpty()) {
flatMessage.setOld(old);
}
}
}
}
return flatMessages;
}
public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs,
boolean databaseHash) {
if (partitionsNum == null) {
partitionsNum = 1;
}
FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];
if (flatMessage.getIsDdl()) {
partitionMessages[0] = flatMessage;
} else {
if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
String database = flatMessage.getDatabase();
String table = flatMessage.getTable();
HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
if (hashMode == null) {
partitionMessages[0] = flatMessage;
} else if (hashMode.tableHash) {
int hashCode = table.hashCode();
int pkHash = Math.abs(hashCode) % partitionsNum;
pkHash = Math.abs(pkHash);
partitionMessages[pkHash] = flatMessage;
} else {
List<String> pkNames = hashMode.pkNames;
if (hashMode.autoPkHash) {
pkNames = flatMessage.getPkNames();
}
int idx = 0;
Map<String, String> row = flatMessage.getData();
int hashCode = 0;
if (databaseHash) {
hashCode = database.hashCode();
}
if (pkNames != null) {
for (String pkName : pkNames) {
String value = row.get(pkName);
if (value == null) {
value = "";
}
hashCode = hashCode ^ value.hashCode();
}
}
int pkHash = Math.abs(hashCode) % partitionsNum;
pkHash = Math.abs(pkHash);
FlatMessage flatMessageTmp = partitionMessages[pkHash];
if (flatMessageTmp == null) {
flatMessageTmp = new FlatMessage(flatMessage.getId());
partitionMessages[pkHash] = flatMessageTmp;
flatMessageTmp.setDatabase(flatMessage.getDatabase());
flatMessageTmp.setTable(flatMessage.getTable());
flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
flatMessageTmp.setOp_type(flatMessage.getOp_type());
flatMessageTmp.setSql(flatMessage.getSql());
flatMessageTmp.setSqlType(flatMessage.getSqlType());
flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
flatMessageTmp.setEs(flatMessage.getEs());
flatMessageTmp.setOp_ts(flatMessage.getOp_ts());
flatMessageTmp.setPkNames(flatMessage.getPkNames());
}
Map<String, String> data = flatMessageTmp.getData();
if (data == null) {
data = new LinkedHashMap<>();
flatMessageTmp.setData(data);
}
data = row;
if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
Map<String, String> old = flatMessageTmp.getOld();
if (old == null) {
old = new LinkedHashMap<>();
flatMessageTmp.setOld(old);
}
old = flatMessage.getOld();
}
idx++;
}
} else {
partitionMessages[0] = flatMessage;
}
}
return partitionMessages;
}
public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
if (StringUtils.isEmpty(pkHashConfigs)) {
return null;
}
List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
for (PartitionData data : datas) {
if (data.simpleName != null) {
if (data.simpleName.equalsIgnoreCase(name)) {
return data.hashMode;
}
} else {
if (data.regexFilter.filter(name)) {
return data.hashMode;
}
}
}
return null;
}
private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
String[] router = StringUtils.split(StringUtils.replace(dynamicTopicConfigs, ",", ";"), ";");
Set<String> topics = new HashSet<>();
for (String item : router) {
int i = item.indexOf(":");
if (i > -1) {
String topic = item.substring(0, i).trim();
String topicConfigs = item.substring(i + 1).trim();
if (matchDynamicTopic(name, topicConfigs)) {
topics.add(topic);
break;
}
} else if (matchDynamicTopic(name, item)) {
topics.add(name.toLowerCase());
break;
}
}
return topics.isEmpty() ? null : topics;
}
public static boolean matchDynamicTopic(String name, String dynamicTopicConfigs) {
if (StringUtils.isEmpty(dynamicTopicConfigs)) {
return false;
}
boolean res = false;
List<DynamicTopicData> datas = dynamicTopicDatas.get(dynamicTopicConfigs);
for (DynamicTopicData data : datas) {
if (data.simpleName != null) {
if (data.simpleName.equalsIgnoreCase(name)) {
res = true;
break;
}
} else if (name.contains(".")) {
if (data.tableRegexFilter != null && data.tableRegexFilter.filter(name)) {
res = true;
break;
}
} else {
if (data.schemaRegexFilter != null && data.schemaRegexFilter.filter(name)) {
res = true;
break;
}
}
}
return res;
}
public static boolean checkPkNamesHasContain(List<String> pkNames, String name) {
for (String pkName : pkNames) {
if (pkName.equalsIgnoreCase(name)) {
return true;
}
}
return false;
}
public static Integer parseDynamicTopicPartition(String name, String tPConfigs) {
if (!StringUtils.isEmpty(tPConfigs)) {
List<TopicPartitionData> datas = topicPartitionDatas.get(tPConfigs);
for (TopicPartitionData data : datas) {
if (data.simpleName != null) {
if (data.simpleName.equalsIgnoreCase(name)) {
return data.partitionNum;
}
} else {
if (data.regexFilter.filter(name)) {
return data.partitionNum;
}
}
}
}
return null;
}
private static boolean isWildCard(String value) {
return StringUtils.containsAny(value, new char[]{'*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
'^'});
}
private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,
CanalEntry.Entry entry) {
Message message = messageMap.get(topicName);
if (message == null) {
message = new Message(messageId, new ArrayList<>());
messageMap.put(topicName, message);
}
message.getEntries().add(entry);
}
public static class PartitionData {
public String simpleName;
public AviaterRegexFilter regexFilter;
public HashMode hashMode = new HashMode();
}
public static class HashMode {
public boolean autoPkHash = false;
public boolean tableHash = false;
public List<String> pkNames = Lists.newArrayList();
}
public static class DynamicTopicData {
public String simpleName;
public AviaterRegexFilter schemaRegexFilter;
public AviaterRegexFilter tableRegexFilter;
}
public static class TopicPartitionData {
public String simpleName;
public AviaterRegexFilter regexFilter;
public Integer partitionNum;
}
public static class EntryRowData {
public Entry entry;
public RowChange rowChange;
}
}
5、重新打包源代码,并将deployer 目录下的target 中的canal 目录进行部署。
|