IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> canal实现从mysql实时同步数据到es -> 正文阅读

[大数据]canal实现从mysql实时同步数据到es

1.环境准备:

1.1? mysql:5.7

1.2?elasticsearch:7.4.2

1.3??kibana:7.4.2

1.4 服务端:canal-deployer1.1.5

1.5 客户端:canal-adapter 1.1.5

2.下载安装MySQL注:本人项目都是docker安装? 这里就不一一展示安装步骤了? 详情见百度或者看我其他文章)修改mysql配置文件 开启binlog日志,并且以ROW方式,开启主从模式 以及logbin的文件位置 如下:

server_id=101
binlog-ignore-db=mysql
log-bin=mall-mysql-bin
binlog_cache_size=1M
binlog_format=row
expire_logs_days=7
slave_skip_errors=1062

2.1?记得重启下mysql? ?查看MySQL配置是否开启

SHOW VARIABLES LIKE 'binlog-format'; -- 结果应该是ROW
SHOW VARIABLES LIKE 'log_bin'; -- 结果应该是 ON
SHOW VARIABLES LIKE '%log%'; -- 所有binlog信息

?2.2? 然后给canal创建一个canal账户? 命令如下:

CREATE USER canal IDENTIFIED BY 'canal'; ?
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

2.3 创建一个canal-test库和sys_log测试表

DROP TABLE IF EXISTS `sys_log`;
CREATE TABLE `sys_log` ?(
? `id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '编号',
? `type` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '1' COMMENT '日志类型',
? `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '日志标题',
? `create_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建者',
? `create_date` datetime NULL DEFAULT NULL COMMENT '创建时间',
? `remote_addr` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '操作IP地址',
? `user_agent` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '用户代理',
? `request_uri` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '请求URI',
? `method` varchar(5) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '操作方式',
? `params` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '操作提交的数据',
? `exception` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '异常信息',
? PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '日志表' ROW_FORMAT = Dynamic;

3.下载安装ES和Kibana? 这里需要注意的是两个版本需要一致? ?这里就不介绍了? ? 如下我自己安装的:

4.下载canal 的服务器端??canal-deployer1.1.5 和客户端?canal-adapter 1.1.5? 下载地址为:https://github.com/alibaba/canal/releases

5. 解压这两个项目 命令:

tar -zxvf??canal.adapter-1.1.5.tar.gz

tar -zxvf??canal.deployer-1.1.5.tar.gz

6.修改服务端?canal.deployer-1.1.5下conf目录名example为orgdeer-cui? 命令:mv??example??orgdeer-cui? ?然后修改配置conf/orgdeer-cui/instance.properties? ??主要是修改数据库相关配置? ?改下面三个地方? ?如下:

6.启动服务器端canal.deployer-1.1.5? ?

切换目录:cd??/home/canal/canal.deployer-1.1.5/bin

启动:./startup.sh

7.查看日志是否启动成功:

tail -f /home/canal/canal.deployer-1.1.5/logs/canal/canal.log?

8. 修改客户端??canal.adapter-1.1.5下的配置?application.yml? ??主要是修改canal-server配置、数据源配置和客户端适配器配置? ?主要改一下带注释的部分 如下:

server:
? port: 8081
logging:
? level:
? ? com.alibaba.otter.canal.client.adapter.es: DEBUG
spring:
? jackson:
? ? date-format: yyyy-MM-dd HH:mm:ss
? ? time-zone: GMT+8
? ? default-property-inclusion: non_null

canal.conf:
? mode: tcp #tcp kafka rocketMQ rabbitMQ
? flatMessage: true
? zookeeperHosts:
? syncBatchSize: 1000
? # IMPORTANT!!! KEEP ADAPER SYNC CONSISTANT
? retries: -1
? timeout:
? accessKey:
? secretKey:
? consumerProperties:
? ? # canal tcp consumer
? ? canal.tcp.server.host: 127.0.0.1:11111? #服务器端地址
? ? canal.tcp.zookeeper.hosts:
? ? canal.tcp.batch.size: 500
? ? canal.tcp.username:
? ? canal.tcp.password:
? ? # kafka consumer
? ? kafka.bootstrap.servers: 127.0.0.1:9092
? ? kafka.enable.auto.commit: false
? ? kafka.auto.commit.interval.ms: 1000
? ? kafka.auto.offset.reset: latest
? ? kafka.request.timeout.ms: 40000
? ? kafka.session.timeout.ms: 30000
? ? kafka.isolation.level: read_committed
? ? kafka.max.poll.records: 1000
? ? # rocketMQ consumer
? ? rocketmq.namespace:
? ? rocketmq.namesrv.addr: 127.0.0.1:9876
? ? rocketmq.batch.size: 1000
? ? rocketmq.enable.message.trace: false
? ? rocketmq.customized.trace.topic:
? ? rocketmq.access.channel:
? ? rocketmq.subscribe.filter:
? ? # rabbitMQ consumer
? ? rabbitmq.host:
? ? rabbitmq.virtual.host:
? ? rabbitmq.username:
? ? rabbitmq.password:
? ? rabbitmq.resource.ownerId:

? srcDataSources:
? ? defaultDS:
? ? ? url: jdbc:mysql://127.0.0.1:3306/canal-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false&useInformationSchema=false&allowPublicKeyRetrieval=true? ?#mysql链接地址
? ? ? username: root? ?#用户名
? ? ? password: hzp123456? #密码
? canalAdapters:
? - instance: orgdeer-cui # 对应的是服务器端中配置mq topic name
? ? groups:
? ? - groupId: g1
? ? ? outerAdapters:
? ? ? - name: logger? #日志打印适配器
? ? ? - name: es7? #ES同步适配器
? ? ? ? hosts: 127.0.0.1:9300? #ESl链接地址
? ? ? ? properties:
? ? ? ? ? mode: transport? #模式可选transport(9300)或者rest(9200)
? ? ? ? ? cluster.name: elasticsearch? #ES集群名称
HOSTNAME%%.*:
PWD/#$HOME/~:

9.启动客户端?canal.adapter-1.1.5? ?

切换目录:cd?/home/canal/canal.adapter-1.1.5/bin

启动:./startup.sh

10 查看客户端日志是否启动成功?

tail -f ?/home/canal/canal.adapter-1.1.5/logs/adapter/adapter.log?

11.添加配置文件canal-adapter/conf/es7/sys_log.yml,用于配置MySQL中的表与Elasticsearch中索引的映射关系 (注意destination名称orgdeer-cui要跟服务器端mq topic name对应) 然后将文件sys_log.yml放到/home/canal/canal.adapter-1.1.5/conf/es7目录下?

dataSourceKey: defaultDS
destination: orgdeer-cui
groupId: g1
esMapping:
  _index: sys_log
  _id: _id
  sql: "select
        id as _id, type,
         title,
         create_by as createBy,
         UNIX_TIMESTAMP(create_date) as createDate,
        remote_addr as remoteAddr,
        user_agent as userAgent,
        request_uri as requestUri,method,
        params,exception
        from sys_log"
  etlCondition: "where create_date>={}"
  commitBatch: 3000

12.在Kibana控制台中创建sys_log索引

PUT sys_log
{
? "mappings": {
? ? ? "properties":{
? ? ? ? "type":{
? ? ? ? ? "type":"keyword"
? ? ? ? },
? ? ? ? "title":{
? ? ? ? ? "type":"text"
? ? ? ? },
? ? ? ? "createBy":{
? ? ? ? ? "type":"keyword"
? ? ? ? },
? ? ? ? "remoteAddr":{
? ? ? ? ? "type": "text"
? ? ? ? },
? ? ? ? "userAgent":{
? ? ? ? ? "type":"text"
? ? ? ? },
? ? ? ? "requestUri": {
? ? ? ? ? "type": "text"
? ? ? ? },
? ? ? ? "method": {
? ? ? ? ? "type": "keyword"
? ? ? ? },
? ? ? ? "params": {
? ? ? ? ? "type": "text"
? ? ? ? },
? ? ? ?"exception": {
? ? ? ? ? "type": "text"
? ? ? ? },
? ? ? ? "createDate": {
? ? ? ? ? ?"type": "long"
? ? ? ? }
? ? ? }
? }
}

13.通过命令触发,让canal-adapter读取到的dml日志,同步到es的库中? 命令如下:

curl -X POST http://127.0.0.1:8081/etl/es7/sys_log.yml

查看ES中数据同步的情况:

?手动删除? 添加数据? ?ES数据也会跟着同步的??

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章           查看所有文章
加:2022-07-17 16:30:00  更:2022-07-17 16:34:33 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 1:28:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码