IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 与消息队列(Kafka)进行数据对接,修改canal输出的JSON格式 -> 正文阅读

[大数据]与消息队列(Kafka)进行数据对接,修改canal输出的JSON格式

与消息队列(Kafka)进行数据对接,修改canal输出的JSON格式

介绍

目前各类资产数据分布在不同系统,基于现在管理需求,需要将各系统中资产数据采集到大数据底座中进行统一存储与管理。采用消息队列(Kafka)的方式进行对接,要求每次以增量数据的方式发送,CUD(创建、更新、删除)操作均要求将整行数据传输到消息通道中。

  • 根据需求在服务器的CentOS7服务器上配置测试环境,调试消息推送。
  • 调整消息输出格式,完成项目要求的数据对接信息。

一、测试环境部署

JDK = 1.8
MySQL =5.7.0
zookeeper = 3.7.0
canan = 1.1.5

二、配置运行环境

1. mysql 配置

1、运行vim /etc/my.cnf,修改mysql配置文件my.cnf, 开启 log_bin

[mysqld]
log-bin=mysql-bin
server-id=246

2、修改完配置文件后,重新启动mysql服务。

systemctl restart mysqld.service

3、查看mysql的log-bin是否开启成功。

mysql -uroot -p'mysql的登录密码'

查看配置结果是否成功:

mysql> SHOW VARIABLES LIKE '%bin%';

在这里插入图片描述

2. 安装zookeeper

1、canal和kafka都依赖于zookeeper做服务协调,需要部署并配置zookeeper注册中心,这里选用的是3.7.0版本。(注意要下载apache-zookeeper-3.7.0-bin.tar.gz带有bin的版本)。

cd /root/software
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz
tar -zvxf apache-zookeeper-3.7.0-bin.tar.gz
mv ./apache-zookeeper-3.7.0-bin /usr/local/zookeeper
cd /usr/local/zookeeper
mkdir data
mkdir logs
cd ./conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg

2、修改配置文件,修改dataDir=/usr/local/zookeeper/data,添加dataLogDir=/usr/local/zookeeper/logs,由于仅用于测试,所以只采用单实例的没有集群配置节点。

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181

同时,增加

echo "1">/usr/local/zookeeper/data/myid

3、启动zookeeper注册中心,默认的端口是2181

/usr/local/zookeeper/bin/zkServer.sh start

查看启动状态:

/usr/local/zookeeper/bin/zkServer.sh status

在这里插入图片描述

3.安装Kafka

1、Kafka是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper,在此选用kafka_2.12-2.6.2版本

mkdir /usr/local/kafka
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.6.2/kafka_2.12-2.6.2.tgz
cd /usr/local/kafka
tar -zxvf kafka_2.12-2.6.2.tgz
rm -rf kafka_2.12-2.6.2.tgz

2、修改配置文件:vim /usr/local/kafka/kafka_2.12-2.6.2/config/server.properties,注意需要配置host.namelisteners=PLAINTEXT://:9092否则只能本地才能访问

zookeeper.connect=localhost:2181
host.name=192.168.110.244
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://192.168.110.244:9092

3、启动kafka

/usr/local/kafka/kafka_2.12-2.6.2/bin/kafka-server-start.sh -daemon  /usr/local/kafka/kafka_2.12-2.6.2/config/server.properties & 

查看所有topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

查看指定topic 下面的数据:

/usr/local/kafka/kafka_2.12-2.6.2/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic example

在这里插入图片描述

4.安装canal数据同步

1、下载Canal的v1.1.5发布版,canal.deployer-1.1.5.tar.gz

mkdir /usr/local/canal
cd /usr/local/canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
tar -zxvf canal.deployer-1.1.5.tar.gz
rm -rf canal.deployer-1.1.5.tar.gz

解压之后目录如下:

- bin   # 运维脚本
- conf  # 配置文件
  canal_local.properties  # canal本地配置,一般不需要动
  canal.properties        # canal服务配置
  logback.xml             # logback日志配置
  metrics                 # 度量统计配置
  spring                  # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
  example                 # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
    instance.properties   # 实例配置,一般指单个数据库的配置
- lib   # 服务依赖包
- logs  # 日志文件输出目录

2、在开发和测试环境把logback.xml的日志级别改为DEBUG,方便查找问题。主要需要修改canal.propertiesinstance.properties两个配置文件。
canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。
  • canal.serverMode配置项指定为kafka,可选值有tcpkafkarocketmqrabbitmq,默认是tcp
  • 1.1.5版本中的配置 kafka.bootstrap.servers = 192.168.110.244:9092
canal.instance.parser.parallelThreadSize = 16
canal.serverMode = kafka
kafka.bootstrap.servers = 192.168.110.244:9092

instance.properties一般指一个数据库实例的配置,canal架构支持一个canal服务实例,处理多个数据库实例的binlog异步解析。主要需要修改的配置项主要包括:

canal.instance.mysql.slaveId = 2460                #需要配置一个和Master节点的服务ID完全不同的值
canal.instance.master.address=192.168.110.246:3306 #数据库地址
canal.instance.dbUsername=root                     #账号
canal.instance.dbPassword=Abcde12345!@#$%          #密码
canal.instance.defaultDatabaseName = test          #默认的数据库(好像用处不大)
canal.instance.filter.regex=test.B,test.C          #只对两个表的变动作监听
canal.mq.topic=example                             #解析完的binlog结构化数据会发送到Kafka的命名为example的topic(改成自己的)

三、修改canal的输出格式

1.默认输出格式

{"data":[{"id":"2","order_id":"10086","amount":"10087.0","create_time":"2021-08-04 18:05:05"}],"database":"test","es":1628071686000,"id":3,"isDdl":false,"mysqlType":{"id":"bigint(20)","order_id":"varchar(64)","amount":"decimal(10,2)","create_time":"datetime"},"old":[{"amount":"999.0"}],"ts":1628071686131,"type":"UPDATE","pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order"}

2、项目要求的输出格式

消息格式(JSON): 
{
	"table":"表名",
	"op_type":"操作类型",
	"op_ts":"操作时间",
	"data":{
		"数据字段名称":"数据字段的值"
	}
}

字段、值和类型说明:
- table:表名均采用大写形式,并用下划线分隔命名方式;
- op_type:I(创建)、U(更新)、D(删除) ,如无操作类型该字段可不传值;
- 时间字段格式:yyyy-MM-dd HH24:mm:ss(示例:2021-07-20 16:11:26);
- 数据字段名称:均采用大写形式,并用下划线分隔命名方式;
- 数据字段值:除数值类型外,均采用字符串类型。

3、由于默认输出格式不满足项目要求,所以需要修改源代码,调整输出的消息格式;

下载源代码releases
在这里插入图片描述

4、下载源代码后,导入项目中,修改FlatMessageMQMessageUtils两个类中的代码

FlatMessage类修改:

  • ts改成op_ts并设置成字符型,然后再转换的时候ToString()
  • type改成op_type
  • List<Map<String, String>> 改成Map<String, String>,数组改成对象;

源代码如下:

public class FlatMessage implements Serializable {

    private static final long serialVersionUID = -3386650678735860050L;
    private long id;
    private String database;
    private String table;
    private List<String> pkNames;
    private Boolean isDdl;
    private String op_type;
    // binlog executeTime
    private Long es;
    // dml build timeStamp
    private String op_ts;
    private String sql;
    private Map<String, Integer> sqlType;
    private Map<String, String> mysqlType;
    private Map<String, String> data;
    private Map<String, String> old;

    public FlatMessage() {
    }

    public FlatMessage(long id) {
        this.id = id;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getDatabase() {
        return database;
    }

    public void setDatabase(String database) {
        this.database = database;
    }

    public String getTable() {
        return table;
    }

    public void setTable(String table) {
        this.table = table;
    }

    public List<String> getPkNames() {
        return pkNames;
    }

    public void addPkName(String pkName) {
        if (this.pkNames == null) {
            this.pkNames = Lists.newArrayList();
        }
        this.pkNames.add(pkName);
    }

    public void setPkNames(List<String> pkNames) {
        this.pkNames = pkNames;
    }

    public Boolean getIsDdl() {
        return isDdl;
    }

    public void setIsDdl(Boolean isDdl) {
        this.isDdl = isDdl;
    }

    public String getOp_type() {
        return op_type;
    }

    public void setOp_type(String op_type) {
        this.op_type = op_type;
    }

    public String getOp_ts() {
        return op_ts;
    }

    public void setOp_ts(String op_ts) {
        this.op_ts = op_ts;
    }

    public String getSql() {
        return sql;
    }

    public void setSql(String sql) {
        this.sql = sql;
    }

    public Map<String, Integer> getSqlType() {
        return sqlType;
    }

    public void setSqlType(Map<String, Integer> sqlType) {
        this.sqlType = sqlType;
    }

    public Map<String, String> getMysqlType() {
        return mysqlType;
    }

    public void setMysqlType(Map<String, String> mysqlType) {
        this.mysqlType = mysqlType;
    }

    public Map<String, String> getData() {
        return data;
    }

    public void setData(Map<String, String> data) {
        this.data = data;
    }

    public Map<String, String> getOld() {
        return old;
    }

    public void setOld(Map<String, String> old) {
        this.old = old;
    }

    public Long getEs() {
        return es;
    }

    public void setEs(Long es) {
        this.es = es;
    }

    @Override
    public String toString() {
        return "FlatMessage [id=" + id + ", database=" + database + ", table=" + table + ", isDdl=" + isDdl + ", op_type="
                + op_type + ", es=" + es + ", op_ts=" + op_ts + ", sql=" + sql + ", sqlType=" + sqlType + ", mysqlType="
                + mysqlType + ", data=" + data + ", old=" + old + "]";
    }
}

MQMessageUtils类修改,基本就是修改messageConverter函数:

  • 将data中输出的字段名改成大写:
    在这里插入图片描述

  • 修改op_type的操作类型:
    在这里插入图片描述

  • 修改data和old的类型:
    在这里插入图片描述

由于是数组改成对象,需要把data的输出结果放到循环中。

  • 修改op_ts的类型,添加一个自定义的日期类,输出格式为:yyyy-MM-dd HH24:mm:ss

源代码如下:

public class MQMessageUtils {

    private static Map<String, List<PartitionData>> partitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
                    .softValues(),
            pkHashConfigs -> {
                List<PartitionData> datas = Lists.newArrayList();

                String[] pkHashConfigArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
                        ",",
                        ";"),
                        ";");
                // schema.table:id^name
                for (String pkHashConfig : pkHashConfigArray) {
                    PartitionData data = new PartitionData();
                    int i = pkHashConfig.lastIndexOf(":");
                    if (i > 0) {
                        String pkStr = pkHashConfig.substring(i + 1);
                        if (pkStr.equalsIgnoreCase("$pk$")) {
                            data.hashMode.autoPkHash = true;
                        } else {
                            data.hashMode.pkNames = Lists.newArrayList(StringUtils.split(pkStr,
                                    '^'));
                        }

                        pkHashConfig = pkHashConfig.substring(0,
                                i);
                    } else {
                        data.hashMode.tableHash = true;
                    }

                    if (!isWildCard(pkHashConfig)) {
                        data.simpleName = pkHashConfig;
                    } else {
                        data.regexFilter = new AviaterRegexFilter(pkHashConfig);
                    }
                    datas.add(data);
                }

                return datas;
            });

    private static Map<String, List<DynamicTopicData>> dynamicTopicDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
                    .softValues(),
            pkHashConfigs -> {
                List<DynamicTopicData> datas = Lists.newArrayList();
                String[] dynamicTopicArray = StringUtils.split(StringUtils.replace(pkHashConfigs,
                        ",",
                        ";"),
                        ";");
                // schema.table
                for (String dynamicTopic : dynamicTopicArray) {
                    DynamicTopicData data = new DynamicTopicData();

                    if (!isWildCard(dynamicTopic)) {
                        data.simpleName = dynamicTopic;
                    } else {
                        if (dynamicTopic.contains("\\.")) {
                            data.tableRegexFilter = new AviaterRegexFilter(dynamicTopic);
                        } else {
                            data.schemaRegexFilter = new AviaterRegexFilter(dynamicTopic);
                        }
                    }
                    datas.add(data);
                }

                return datas;
            });

    private static Map<String, List<TopicPartitionData>> topicPartitionDatas = MigrateMap.makeComputingMap(CacheBuilder.newBuilder()
                    .softValues(),
            tPConfigs -> {
                List<TopicPartitionData> datas = Lists.newArrayList();
                String[] tPArray = StringUtils.split(StringUtils.replace(tPConfigs,
                        ",",
                        ";"),
                        ";");
                for (String tPConfig : tPArray) {
                    TopicPartitionData data = new TopicPartitionData();
                    int i = tPConfig.lastIndexOf(":");
                    if (i > 0) {
                        String tStr = tPConfig.substring(0, i);
                        String pStr = tPConfig.substring(i + 1);
                        if (!isWildCard(tStr)) {
                            data.simpleName = tStr;
                        } else {
                            data.regexFilter = new AviaterRegexFilter(tStr);
                        }
                        if (!StringUtils.isEmpty(pStr) && StringUtils.isNumeric(pStr)) {
                            data.partitionNum = Integer.valueOf(pStr);
                        }
                        datas.add(data);
                    }
                }

                return datas;
            });

    /**
     * 按 schema 或者 schema+table 将 message 分配到对应topic
     *
     * @param message             原message
     * @param defaultTopic        默认topic
     * @param dynamicTopicConfigs 动态topic规则
     * @return 分隔后的message map
     */
    public static Map<String, Message> messageTopics(Message message, String defaultTopic, String dynamicTopicConfigs) {
        List<CanalEntry.Entry> entries;
        if (message.isRaw()) {
            List<ByteString> rawEntries = message.getRawEntries();
            entries = new ArrayList<>(rawEntries.size());
            for (ByteString byteString : rawEntries) {
                CanalEntry.Entry entry;
                try {
                    entry = CanalEntry.Entry.parseFrom(byteString);
                } catch (InvalidProtocolBufferException e) {
                    throw new RuntimeException(e);
                }
                entries.add(entry);
            }
        } else {
            entries = message.getEntries();
        }
        Map<String, Message> messages = new HashMap<>();
        for (CanalEntry.Entry entry : entries) {
            // 如果有topic路由,则忽略begin/end事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            String schemaName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();

            if (StringUtils.isEmpty(schemaName) || StringUtils.isEmpty(tableName)) {
                put2MapMessage(messages, message.getId(), defaultTopic, entry);
            } else {
                Set<String> topics = matchTopics(schemaName + "." + tableName, dynamicTopicConfigs);
                if (topics != null) {
                    for (String topic : topics) {
                        put2MapMessage(messages, message.getId(), topic, entry);
                    }
                } else {
                    topics = matchTopics(schemaName, dynamicTopicConfigs);
                    if (topics != null) {
                        for (String topic : topics) {
                            put2MapMessage(messages, message.getId(), topic, entry);
                        }
                    } else {
                        put2MapMessage(messages, message.getId(), defaultTopic, entry);
                    }
                }
            }
        }
        return messages;
    }

    /**
     * 多线程构造message的rowChanged对象,比如为partition/flastMessage转化等处理 </br>
     * 因为protobuf对象的序列化和反序列化是cpu密集型,串行执行会有代价
     */
    public static EntryRowData[] buildMessageData(Message message, ThreadPoolExecutor executor) {
        ExecutorTemplate template = new ExecutorTemplate(executor);
        if (message.isRaw()) {
            List<ByteString> rawEntries = message.getRawEntries();
            final EntryRowData[] datas = new EntryRowData[rawEntries.size()];
            int i = 0;
            for (ByteString byteString : rawEntries) {
                final int index = i;
                template.submit(() -> {
                    try {
                        Entry entry = Entry.parseFrom(byteString);
                        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                        datas[index] = new EntryRowData();
                        datas[index].entry = entry;
                        datas[index].rowChange = rowChange;
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException(e);
                    }
                });

                i++;
            }

            template.waitForResult();
            return datas;
        } else {
            final EntryRowData[] datas = new EntryRowData[message.getEntries().size()];
            int i = 0;
            for (Entry entry : message.getEntries()) {
                final int index = i;
                template.submit(() -> {
                    try {
                        RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                        datas[index] = new EntryRowData();
                        datas[index].entry = entry;
                        datas[index].rowChange = rowChange;
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException(e);
                    }
                });

                i++;
            }

            template.waitForResult();
            return datas;
        }
    }

    /**
     * 将 message 分区
     *
     * @param partitionsNum 分区数
     * @param pkHashConfigs 分区库表主键正则表达式
     * @param databaseHash  是否取消根据database进行hash
     * @return 分区message数组
     */
    @SuppressWarnings("unchecked")
    public static Message[] messagePartition(EntryRowData[] datas, long id, Integer partitionsNum,
                                             String pkHashConfigs, boolean databaseHash) {
        if (partitionsNum == null) {
            partitionsNum = 1;
        }
        Message[] partitionMessages = new Message[partitionsNum];
        List<Entry>[] partitionEntries = new List[partitionsNum];
        for (int i = 0; i < partitionsNum; i++) {
            // 注意一下并发
            partitionEntries[i] = Collections.synchronizedList(Lists.newArrayList());
        }

        for (EntryRowData data : datas) {
            CanalEntry.Entry entry = data.entry;
            CanalEntry.RowChange rowChange = data.rowChange;
            // 如果有分区路由,则忽略begin/end事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            if (rowChange.getIsDdl()) {
                partitionEntries[0].add(entry);
            } else {
                if (rowChange.getRowDatasList() != null && !rowChange.getRowDatasList().isEmpty()) {
                    String database = entry.getHeader().getSchemaName();
                    String table = entry.getHeader().getTableName();
                    HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
                    if (hashMode == null) {
                        // 如果都没有匹配,发送到第一个分区
                        partitionEntries[0].add(entry);
                    } else if (hashMode.tableHash) {
                        int hashCode = table.hashCode();
                        int pkHash = Math.abs(hashCode) % partitionsNum;
                        pkHash = Math.abs(pkHash);
                        // tableHash not need split entry message
                        partitionEntries[pkHash].add(entry);
                    } else {
                        // build new entry
                        Entry.Builder builder = Entry.newBuilder(entry);
                        RowChange.Builder rowChangeBuilder = RowChange.newBuilder(rowChange);

                        for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                            int hashCode = 0;
                            if (databaseHash) {
                                hashCode = database.hashCode();
                            }
                            CanalEntry.EventType eventType = rowChange.getEventType();
                            List<CanalEntry.Column> columns = null;
                            if (eventType == CanalEntry.EventType.DELETE) {
                                columns = rowData.getBeforeColumnsList();
                            } else {
                                columns = rowData.getAfterColumnsList();
                            }

                            if (hashMode.autoPkHash) {
                                // isEmpty use default pkNames
                                for (CanalEntry.Column column : columns) {
                                    if (column.getIsKey()) {
                                        hashCode = hashCode ^ column.getValue().hashCode();
                                    }
                                }
                            } else {
                                for (CanalEntry.Column column : columns) {
                                    if (checkPkNamesHasContain(hashMode.pkNames, column.getName())) {
                                        hashCode = hashCode ^ column.getValue().hashCode();
                                    }
                                }
                            }

                            int pkHash = Math.abs(hashCode) % partitionsNum;
                            pkHash = Math.abs(pkHash);
                            // clear rowDatas
                            rowChangeBuilder.clearRowDatas();
                            rowChangeBuilder.addRowDatas(rowData);
                            builder.clearStoreValue();
                            builder.setStoreValue(rowChangeBuilder.build().toByteString());
                            partitionEntries[pkHash].add(builder.build());
                        }
                    }
                } else {
                    // 针对stmt/mixed binlog格式的query事件
                    partitionEntries[0].add(entry);
                }
            }
        }

        for (int i = 0; i < partitionsNum; i++) {
            List<Entry> entriesTmp = partitionEntries[i];
            if (!entriesTmp.isEmpty()) {
                partitionMessages[i] = new Message(id, entriesTmp);
            }
        }

        return partitionMessages;
    }

    /**
     * 将Message转换为FlatMessage
     *
     * @return FlatMessage列表
     * @author agapple 2018年12月11日 下午1:28:32
     */
    public static List<FlatMessage> messageConverter(EntryRowData[] datas, long id) {
        List<FlatMessage> flatMessages = new ArrayList<>();
        for (EntryRowData entryRowData : datas) {
            CanalEntry.Entry entry = entryRowData.entry;
            CanalEntry.RowChange rowChange = entryRowData.rowChange;
            // 如果有分区路由,则忽略begin/end事件
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }

            // build flatMessage
            CanalEntry.EventType eventType = rowChange.getEventType();


            if (!rowChange.getIsDdl()) {
                Set<String> updateSet = new HashSet<>();
                boolean hasInitPkNames = false;
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    FlatMessage flatMessage = new FlatMessage(id);
                    flatMessages.add(flatMessage);
                    flatMessage.setDatabase(entry.getHeader().getSchemaName());
                    flatMessage.setTable(entry.getHeader().getTableName());
                    flatMessage.setIsDdl(rowChange.getIsDdl());
                    String opType = eventType.toString();
                    switch (opType) {
                        case "INSERT":
                            opType = "I";
                            break;
                        case "UPDATE":
                            opType = "U";
                            break;
                        case "DELETE":
                            opType = "D";
                            break;

                    }
                    flatMessage.setOp_type(opType);
                    flatMessage.setEs(entry.getHeader().getExecuteTime());
                    //flatMessage.setOp_ts(System.currentTimeMillis());
                    flatMessage.setOp_ts(DateUtils.getTime());
                    flatMessage.setSql(rowChange.getSql());

                    if (eventType != CanalEntry.EventType.INSERT && eventType != CanalEntry.EventType.UPDATE
                            && eventType != CanalEntry.EventType.DELETE) {
                        continue;
                    }

                    Map<String, String> row = new LinkedHashMap<>();
                    List<CanalEntry.Column> columns;

                    if (eventType == CanalEntry.EventType.DELETE) {
                        columns = rowData.getBeforeColumnsList();
                    } else {
                        columns = rowData.getAfterColumnsList();
                    }

                    Map<String, Integer> sqlType = new LinkedHashMap<>();
                    Map<String, String> mysqlType = new LinkedHashMap<>();
                    Map<String, String> data = new LinkedHashMap<>();
                    Map<String, String> old = new LinkedHashMap<>();

                    for (CanalEntry.Column column : columns) {
                        if (!hasInitPkNames && column.getIsKey()) {
                            flatMessage.addPkName(column.getName());
                        }
                        sqlType.put(column.getName(), column.getSqlType());
                        mysqlType.put(column.getName(), column.getMysqlType());
                        if (column.getIsNull()) {
                            row.put(column.getName().toUpperCase(), null);
                        } else {
                            row.put(column.getName().toUpperCase(), column.getValue());
                        }
                        // 获取update为true的字段
                        if (column.getUpdated()) {
                            updateSet.add(column.getName());
                        }
                    }

                    hasInitPkNames = true;
                    if (!row.isEmpty()) {
                        data = row;
                    }

                    if (eventType == CanalEntry.EventType.UPDATE) {
                        Map<String, String> rowOld = new LinkedHashMap<>();
                        for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {
                            if (updateSet.contains(column.getName())) {
                                if (column.getIsNull()) {
                                    rowOld.put(column.getName(), null);
                                } else {
                                    rowOld.put(column.getName(), column.getValue());
                                }
                            }
                        }
                        // update操作将记录修改前的值
                        if (!rowOld.isEmpty()) {
                            old = rowOld;
                        }
                    }
                    if (!sqlType.isEmpty()) {
                        flatMessage.setSqlType(sqlType);
                    }
                    if (!mysqlType.isEmpty()) {
                        flatMessage.setMysqlType(mysqlType);
                    }
                    if (!data.isEmpty()) {
                        flatMessage.setData(data);
                    }
                    if (!old.isEmpty()) {
                        flatMessage.setOld(old);
                    }
                }
            }
        }
        return flatMessages;
    }

    /**
     * 将FlatMessage按指定的字段值hash拆分
     *
     * @param flatMessage   flatMessage
     * @param partitionsNum 分区数量
     * @param pkHashConfigs hash映射
     * @param databaseHash  是否取消根据database进行hash
     * @return 拆分后的flatMessage数组
     */
    public static FlatMessage[] messagePartition(FlatMessage flatMessage, Integer partitionsNum, String pkHashConfigs,
                                                 boolean databaseHash) {
        if (partitionsNum == null) {
            partitionsNum = 1;
        }
        FlatMessage[] partitionMessages = new FlatMessage[partitionsNum];

        if (flatMessage.getIsDdl()) {
            partitionMessages[0] = flatMessage;
        } else {
            if (flatMessage.getData() != null && !flatMessage.getData().isEmpty()) {
                String database = flatMessage.getDatabase();
                String table = flatMessage.getTable();
                HashMode hashMode = getPartitionHashColumns(database + "." + table, pkHashConfigs);
                if (hashMode == null) {
                    // 如果都没有匹配,发送到第一个分区
                    partitionMessages[0] = flatMessage;
                } else if (hashMode.tableHash) {
                    int hashCode = table.hashCode();
                    int pkHash = Math.abs(hashCode) % partitionsNum;
                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
                    pkHash = Math.abs(pkHash);
                    partitionMessages[pkHash] = flatMessage;
                } else {
                    List<String> pkNames = hashMode.pkNames;
                    if (hashMode.autoPkHash) {
                        pkNames = flatMessage.getPkNames();
                    }

                    int idx = 0;
                    Map<String, String> row = flatMessage.getData();

                    int hashCode = 0;
                    if (databaseHash) {
                        hashCode = database.hashCode();
                    }
                    if (pkNames != null) {
                        for (String pkName : pkNames) {
                            String value = row.get(pkName);
                            if (value == null) {
                                value = "";
                            }
                            hashCode = hashCode ^ value.hashCode();
                        }
                    }

                    int pkHash = Math.abs(hashCode) % partitionsNum;
                    // math.abs可能返回负值,这里再取反,把出现负值的数据还是写到固定的分区,仍然可以保证消费顺序
                    pkHash = Math.abs(pkHash);

                    FlatMessage flatMessageTmp = partitionMessages[pkHash];
                    if (flatMessageTmp == null) {
                        flatMessageTmp = new FlatMessage(flatMessage.getId());
                        partitionMessages[pkHash] = flatMessageTmp;
                        flatMessageTmp.setDatabase(flatMessage.getDatabase());
                        flatMessageTmp.setTable(flatMessage.getTable());
                        flatMessageTmp.setIsDdl(flatMessage.getIsDdl());
                        flatMessageTmp.setOp_type(flatMessage.getOp_type());
                        flatMessageTmp.setSql(flatMessage.getSql());
                        flatMessageTmp.setSqlType(flatMessage.getSqlType());
                        flatMessageTmp.setMysqlType(flatMessage.getMysqlType());
                        flatMessageTmp.setEs(flatMessage.getEs());
                        flatMessageTmp.setOp_ts(flatMessage.getOp_ts());
                        flatMessageTmp.setPkNames(flatMessage.getPkNames());
                    }
                    Map<String, String> data = flatMessageTmp.getData();
                    if (data == null) {
                        data = new LinkedHashMap<>();
                        flatMessageTmp.setData(data);
                    }
                    data = row;
                    if (flatMessage.getOld() != null && !flatMessage.getOld().isEmpty()) {
                        Map<String, String> old = flatMessageTmp.getOld();
                        if (old == null) {
                            old = new LinkedHashMap<>();
                            flatMessageTmp.setOld(old);
                        }
                        old = flatMessage.getOld();
                    }
                    idx++;

                }
            } else {
                // 针对stmt/mixed binlog格式的query事件
                partitionMessages[0] = flatMessage;
            }
        }
        return partitionMessages;
    }

    /**
     * match return List , not match return null
     */
    public static HashMode getPartitionHashColumns(String name, String pkHashConfigs) {
        if (StringUtils.isEmpty(pkHashConfigs)) {
            return null;
        }

        List<PartitionData> datas = partitionDatas.get(pkHashConfigs);
        for (PartitionData data : datas) {
            if (data.simpleName != null) {
                if (data.simpleName.equalsIgnoreCase(name)) {
                    return data.hashMode;
                }
            } else {
                if (data.regexFilter.filter(name)) {
                    return data.hashMode;
                }
            }
        }

        return null;
    }

    private static Set<String> matchTopics(String name, String dynamicTopicConfigs) {
        String[] router = StringUtils.split(StringUtils.replace(dynamicTopicConfigs, ",", ";"), ";");
        Set<String> topics = new HashSet<>();
        for (String item : router) {
            int i = item.indexOf(":");
            if (i > -1) {
                String topic = item.substring(0, i).trim();
                String topicConfigs = item.substring(i + 1).trim();
                if (matchDynamicTopic(name, topicConfigs)) {
                    topics.add(topic);
                    // 匹配了一个就退出
                    break;
                }
            } else if (matchDynamicTopic(name, item)) {
                // 匹配了一个就退出
                topics.add(name.toLowerCase());
                break;
            }
        }
        return topics.isEmpty() ? null : topics;
    }

    public static boolean matchDynamicTopic(String name, String dynamicTopicConfigs) {
        if (StringUtils.isEmpty(dynamicTopicConfigs)) {
            return false;
        }

        boolean res = false;
        List<DynamicTopicData> datas = dynamicTopicDatas.get(dynamicTopicConfigs);
        for (DynamicTopicData data : datas) {
            if (data.simpleName != null) {
                if (data.simpleName.equalsIgnoreCase(name)) {
                    res = true;
                    break;
                }
            } else if (name.contains(".")) {
                if (data.tableRegexFilter != null && data.tableRegexFilter.filter(name)) {
                    res = true;
                    break;
                }
            } else {
                if (data.schemaRegexFilter != null && data.schemaRegexFilter.filter(name)) {
                    res = true;
                    break;
                }
            }
        }
        return res;
    }

    public static boolean checkPkNamesHasContain(List<String> pkNames, String name) {
        for (String pkName : pkNames) {
            if (pkName.equalsIgnoreCase(name)) {
                return true;
            }
        }

        return false;
    }

    public static Integer parseDynamicTopicPartition(String name, String tPConfigs) {
        if (!StringUtils.isEmpty(tPConfigs)) {
            List<TopicPartitionData> datas = topicPartitionDatas.get(tPConfigs);
            for (TopicPartitionData data : datas) {
                if (data.simpleName != null) {
                    if (data.simpleName.equalsIgnoreCase(name)) {
                        return data.partitionNum;
                    }
                } else {
                    if (data.regexFilter.filter(name)) {
                        return data.partitionNum;
                    }
                }
            }
        }
        return null;
    }

    private static boolean isWildCard(String value) {
        // not contaiins '.' ?
        return StringUtils.containsAny(value, new char[]{'*', '?', '+', '|', '(', ')', '{', '}', '[', ']', '\\', '$',
                '^'});
    }

    private static void put2MapMessage(Map<String, Message> messageMap, Long messageId, String topicName,
                                       CanalEntry.Entry entry) {
        Message message = messageMap.get(topicName);
        if (message == null) {
            message = new Message(messageId, new ArrayList<>());
            messageMap.put(topicName, message);
        }
        message.getEntries().add(entry);
    }

    public static class PartitionData {

        public String simpleName;
        public AviaterRegexFilter regexFilter;
        public HashMode hashMode = new HashMode();
    }

    public static class HashMode {

        public boolean autoPkHash = false;
        public boolean tableHash = false;
        public List<String> pkNames = Lists.newArrayList();
    }

    public static class DynamicTopicData {

        public String simpleName;
        public AviaterRegexFilter schemaRegexFilter;
        public AviaterRegexFilter tableRegexFilter;
    }

    public static class TopicPartitionData {

        public String simpleName;
        public AviaterRegexFilter regexFilter;
        public Integer partitionNum;
    }

    public static class EntryRowData {

        public Entry entry;
        public RowChange rowChange;
    }
}

5、重新打包源代码,并将deployer目录下的target中的canal目录进行部署。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-10 23:08:34  更:2021-08-10 23:09:16 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年5日历 -2024/5/17 18:28:48-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码