IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Canal Mysql binlog 同步至 ElasticSearch 详细介绍 -> 正文阅读

[大数据]Canal Mysql binlog 同步至 ElasticSearch 详细介绍


本文详细介绍Canal 配置保存 ElasticSearch

Canal从零配置使用参考:https://blog.csdn.net/zhangshenghang/article/details/120361721

数据同步ElasticSearch

我们接着在之前配置Hbase基础上直接修改配置,实现同时同步ElasticSearch

单表基本配置

  • 1.修改启动器配置 {canal-apapter}/conf/application.yml
server:
  port: 8081
logging:
  level:
    com.alibaba.otter.canal.client.adapter: DEBUG
    com.alibaba.otter.canal.client.adapter.hbase: DEBUG
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  # tcp kafka rocketMQ rabbitMQ canal-server运行的模式,TCP模式就是直连客户端,不经过中间件。kafka和mq是消息队列的模式
  mode: tcp 
#  flatMessage: true
  zookeeperHosts: 
  syncBatchSize: 1
  retries: 0
  timeout: 1000
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer 指定canal-server的地址和端口
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts: 127.0.0.1:2181
    canal.tcp.batch.size: 1
    canal.tcp.username:
    canal.tcp.password:
    

  srcDataSources: # 数据源配置,从哪里获取数据
    defaultDS: # 指定一个名字,在ES的配置中会用到,唯一
      url: jdbc:mysql://127.0.0.1:3306/test2?useUnicode=true
      username: root
      password: *****
  canalAdapters:
  - instance: example # canal instance Name or mq topic name 指定在canal配置的实例名称
    groups:
    - groupId: g1 
      outerAdapters:
      - name: logger
#      - name: rdb
#        key: mysql1
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
      - name: hbase
        properties:
          hbase.zookeeper.quorum: sangfor.abdi.node3,sangfor.abdi.node2,sangfor.abdi.node1
          hbase.zookeeper.property.clientPort: 2181
          zookeeper.znode.parent: /hbase-unsecure
      - name: es7 # config目录下的子目录名称
        hosts: 192.168.168.2:9300 # 127.0.0.1:9200 for rest mode
        properties:
          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
          cluster.name: my_application
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address

  • 2.ElasticSearch 表映射文件
# 指定数据源,这个值和adapter的application.yml文件中配置的srcDataSources值对应。
dataSourceKey: defaultDS
# 指定canal-server中配置的某个实例的名字,不同实例对应不同业务
destination: example
# 组ID ,tcp方式这里填写空,不要填写值,不然可能会接收不到数据
groupId: 
# ES的mapping(映射)
esMapping:
  # ES索引名称
  _index: testsync2
  # ES标示文档的唯一标示,通常对应数据表中的主键ID字段
  _id: _id
#  upsert: true
#  pk: id
# 数据表每个字段映射到表中的具体名称,不能重复
  sql: "select a.id as _id, a.name,a.age,a.age_2,a.message,a.insert_time from testsync as a"
#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>={}"
  commitBatch: 10
  • 3 重启服务
bin/restart.sh

写入数据

INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());
INSERT INTO testsync(id,name,age,insert_time) values(UUID(),UUID(),2,now());

查看adapter日志

2021-09-20 13:53:07.279 [pool-1-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"}
2021-09-20 13:53:07.286 [pool-1-thread-1] DEBUG c.a.o.c.client.adapter.hbase.service.HbaseSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"}
2021-09-20 13:53:07.287 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: {"data":[{"id":"05fabf89-19d7-11ec-bbe0-708cb6f5eaa6","name":"05fabfb4-19d7-11ec-bbe0-708cb6f5eaa6","age":2,"age_2":null,"message":null,"insert_time":1632117185000}],"database":"test2","destination":"example","es":1632117185000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"testsync","ts":1632117187278,"type":"INSERT"} 
Affected indexes: testsync2 

查看ElasticSearch数据
在这里插入图片描述
至此写入ElasticSearch、Hbase成功

适配器映射文件详细介绍(单表、多表映射介绍)

${adapter}/conf/es7/xxx.yml

dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 对应application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId:                        # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user           # es 的索引名称
  _type: _doc                   # es 的type名称, es7下无需配置此项
  _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
#  pk: id                       # 如果不需要_id, 则需要指定一个属性为主键属性
  # sql映射
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time, c.labels as _labels from user a
        left join role b on b.id=a.role_id
        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id"
#  objFields:
#    _labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
#    _obj: object               # json对象
  etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数
  commitBatch: 3000                         # 提交批大小

sql映射说明:

sql支持多表关联自由组合, 但是有一定的限制:

  1. 主表不能为子查询语句
  2. 只能使用left outer join即最左表一定要是主表
  3. 关联从表如果是子查询不能有多张表
  4. 主sql中不能有where查询条件(从表子查询中可以有where条件但是不推荐, 可能会造成数据同步的不一致, 比如修改了where条件中的字段内容)
  5. 关联条件只允许主外键的’='操作不能出现其他常量判断比如: on a.role_id=b.id and b.statues=1
  6. 关联条件必须要有一个字段出现在主查询语句中比如: on a.role_id=b.id 其中的 a.role_id 或者 b.id 必须出现在主select语句中

Elastic Search的mapping 属性与sql的查询值将一一对应(不支持 select *), 比如: select a.id as _id, a.name, a.email as _email from user, 其中name将映射到es mapping的name field, _email将 映射到mapping的_email field, 这里以别名(如果有别名)作为最终的映射字段. 这里的_id可以填写到配置文件的 _id: _id映射.

单表映射索引示例sql

select a.id as _id, a.name, a.role_id, a.c_time from user a

该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "c_time": {
                        "type": "date"
                    }
                }
            }
        }
    }
}

单表映射索引示例sql带函数或运算操作

select a.id as _id, concat(a.name,'_test') as name, a.role_id+10000 as role_id, a.c_time from user a

函数字段后必须跟上别名, 该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "c_time": {
                        "type": "date"
                    }
                }
            }
        }
    }
}

多表映射(一对一, 多对一)索引示例sql

select a.id as _id, a.name, a.role_id, b.role_name, a.c_time from user a 
left join role b on b.id = a.role_id

注:这里join操作只能是left outer join, 第一张表必须为主表!!
该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "role_name": {
                        "type": "text"
                    },
                    "c_time": {
                        "type": "date"
                    }
                }
            }
        }
    }
}

多表映射(一对多)索引示例sql

select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a 
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id

注:left join 后的子查询只允许一张表, 即子查询中不能再包含子查询或者关联!!

该sql对应的es mapping示例:

{
    "mytest_user": {
        "mappings": {
            "_doc": {
                "properties": {
                    "name": {
                        "type": "text"
                    },
                    "role_id": {
                        "type": "long"
                    },
                    "c_time": {
                        "type": "date"
                    },
                    "labels": {
                        "type": "text"
                    }
                }
            }
        }
    }
}

其它类型的sql示例

  • geo type
select ... concat(IFNULL(a.latitude, 0), ',', IFNULL(a.longitude, 0)) AS location, ...
  • 复合主键
select concat(a.id,'_',b.type) as _id, ... from user a left join role b on b.id=a.role_id
  • 数组字段
select a.id as _id, a.name, a.role_id, c.labels, a.c_time from user a 
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id

配置中使用:

objFields:
  labels: array:;
  • 对象字段
select a.id as _id, a.name, a.role_id, c.labels, a.c_time, a.description from user a

配置中使用:

objFields:
  description: object

其中a.description字段内容为json字符串

  • 父子文档索引
    es/customer.yml
......
esMapping:
  _index: customer
  _type: _doc
  _id: id
  relations:
    customer_order:
      name: customer
  sql: "select t.id, t.name, t.email from customer t"

es/order.yml

esMapping:
  _index: customer
  _type: _doc
  _id: _id
  relations:
    customer_order:
      name: order
      parent: customer_id
  sql: "select concat('oid_', t.id) as _id,
        t.customer_id,
        t.id as order_id,
        t.serial_code as order_serial,
        t.c_time as order_time
        from biz_order t"
  skips:
    - customer_id

mapping示例:

{
  "mappings":{
    "_doc":{
      "properties":{
        "id": {
          "type": "long"
        },
        "name": {
          "type": "text"
        },
        "email": {
          "type": "text"
        },
        "order_id": {
          "type": "long"
        },
        "order_serial": {
          "type": "text"
        },
        "order_time": {
          "type": "date"
        },
        "customer_order":{
          "type":"join",
          "relations":{
            "customer":"order"
          }
        }
      }
    }
  }
}

注意事项

  • 多表映射时,主表数据必须插入,如果只插入子表不插入主表,数据无法同步到ElasticSearch;相反只插入主表,子表不进行插入,数据是可以同步到ElasticSearch的
  • 多表映射时,如果主表关联id写入后,子表再进行修改之前的关联的id为我们主表写入的id,数据是无法同步到ElasticSearch中的。
    在这里插入图片描述
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-09-24 10:37:42  更:2021-09-24 10:38:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 11:55:04-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码