1.环境准备:
1.1? mysql:5.7
1.2?elasticsearch:7.4.2
1.3??kibana:7.4.2
1.4 服务端:canal-deployer1.1.5
1.5 客户端:canal-adapter 1.1.5
2.下载安装MySQL(注:本人项目都是docker安装? 这里就不一一展示安装步骤了? 详情见百度或者看我其他文章)修改mysql配置文件 开启binlog日志,并且以ROW方式,开启主从模式 以及logbin的文件位置 如下:
server_id=101 binlog-ignore-db=mysql log-bin=mall-mysql-bin binlog_cache_size=1M binlog_format=row expire_logs_days=7 slave_skip_errors=1062
2.1?记得重启下mysql? ?查看MySQL配置是否开启
SHOW VARIABLES LIKE 'binlog-format'; -- 结果应该是ROW SHOW VARIABLES LIKE 'log_bin'; -- 结果应该是 ON SHOW VARIABLES LIKE '%log%'; -- 所有binlog信息
?2.2? 然后给canal创建一个canal账户? 命令如下:
CREATE USER canal IDENTIFIED BY 'canal'; ? GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
2.3 创建一个canal-test库和sys_log测试表
DROP TABLE IF EXISTS `sys_log`; CREATE TABLE `sys_log` ?( ? `id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '编号', ? `type` char(1) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '1' COMMENT '日志类型', ? `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '' COMMENT '日志标题', ? `create_by` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '创建者', ? `create_date` datetime NULL DEFAULT NULL COMMENT '创建时间', ? `remote_addr` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '操作IP地址', ? `user_agent` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '用户代理', ? `request_uri` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '请求URI', ? `method` varchar(5) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '操作方式', ? `params` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '操作提交的数据', ? `exception` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '异常信息', ? PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '日志表' ROW_FORMAT = Dynamic;
3.下载安装ES和Kibana? 这里需要注意的是两个版本需要一致? ?这里就不介绍了? ? 如下我自己安装的:
4.下载canal 的服务器端??canal-deployer1.1.5 和客户端?canal-adapter 1.1.5? 下载地址为:https://github.com/alibaba/canal/releases
5. 解压这两个项目 命令:
tar -zxvf??canal.adapter-1.1.5.tar.gz
tar -zxvf??canal.deployer-1.1.5.tar.gz
6.修改服务端?canal.deployer-1.1.5下conf目录名example为orgdeer-cui? 命令:mv??example??orgdeer-cui? ?然后修改配置conf/orgdeer-cui/instance.properties? ??主要是修改数据库相关配置? ?改下面三个地方? ?如下:
6.启动服务器端canal.deployer-1.1.5? ?
切换目录:cd??/home/canal/canal.deployer-1.1.5/bin
启动:./startup.sh
7.查看日志是否启动成功:
tail -f /home/canal/canal.deployer-1.1.5/logs/canal/canal.log?
8. 修改客户端??canal.adapter-1.1.5下的配置?application.yml? ??主要是修改canal-server配置、数据源配置和客户端适配器配置? ?主要改一下带注释的部分 如下:
server: ? port: 8081 logging: ? level: ? ? com.alibaba.otter.canal.client.adapter.es: DEBUG spring: ? jackson: ? ? date-format: yyyy-MM-dd HH:mm:ss ? ? time-zone: GMT+8 ? ? default-property-inclusion: non_null
canal.conf: ? mode: tcp #tcp kafka rocketMQ rabbitMQ ? flatMessage: true ? zookeeperHosts: ? syncBatchSize: 1000 ? # IMPORTANT!!! KEEP ADAPER SYNC CONSISTANT ? retries: -1 ? timeout: ? accessKey: ? secretKey: ? consumerProperties: ? ? # canal tcp consumer ? ? canal.tcp.server.host: 127.0.0.1:11111? #服务器端地址 ? ? canal.tcp.zookeeper.hosts: ? ? canal.tcp.batch.size: 500 ? ? canal.tcp.username: ? ? canal.tcp.password: ? ? # kafka consumer ? ? kafka.bootstrap.servers: 127.0.0.1:9092 ? ? kafka.enable.auto.commit: false ? ? kafka.auto.commit.interval.ms: 1000 ? ? kafka.auto.offset.reset: latest ? ? kafka.request.timeout.ms: 40000 ? ? kafka.session.timeout.ms: 30000 ? ? kafka.isolation.level: read_committed ? ? kafka.max.poll.records: 1000 ? ? # rocketMQ consumer ? ? rocketmq.namespace: ? ? rocketmq.namesrv.addr: 127.0.0.1:9876 ? ? rocketmq.batch.size: 1000 ? ? rocketmq.enable.message.trace: false ? ? rocketmq.customized.trace.topic: ? ? rocketmq.access.channel: ? ? rocketmq.subscribe.filter: ? ? # rabbitMQ consumer ? ? rabbitmq.host: ? ? rabbitmq.virtual.host: ? ? rabbitmq.username: ? ? rabbitmq.password: ? ? rabbitmq.resource.ownerId:
? srcDataSources: ? ? defaultDS: ? ? ? url: jdbc:mysql://127.0.0.1:3306/canal-test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2B8&useSSL=false&useInformationSchema=false&allowPublicKeyRetrieval=true? ?#mysql链接地址 ? ? ? username: root? ?#用户名 ? ? ? password: hzp123456? #密码 ? canalAdapters: ? - instance: orgdeer-cui # 对应的是服务器端中配置mq topic name ? ? groups: ? ? - groupId: g1 ? ? ? outerAdapters: ? ? ? - name: logger? #日志打印适配器 ? ? ? - name: es7? #ES同步适配器 ? ? ? ? hosts: 127.0.0.1:9300? #ESl链接地址 ? ? ? ? properties: ? ? ? ? ? mode: transport? #模式可选transport(9300)或者rest(9200) ? ? ? ? ? cluster.name: elasticsearch? #ES集群名称 HOSTNAME%%.*: PWD/#$HOME/~:
9.启动客户端?canal.adapter-1.1.5? ?
切换目录:cd?/home/canal/canal.adapter-1.1.5/bin
启动:./startup.sh
10 查看客户端日志是否启动成功?
tail -f ?/home/canal/canal.adapter-1.1.5/logs/adapter/adapter.log?
11.添加配置文件canal-adapter/conf/es7/sys_log.yml,用于配置MySQL中的表与Elasticsearch中索引的映射关系 (注意destination名称orgdeer-cui要跟服务器端mq topic name对应) 然后将文件sys_log.yml放到/home/canal/canal.adapter-1.1.5/conf/es7目录下?
dataSourceKey: defaultDS
destination: orgdeer-cui
groupId: g1
esMapping:
_index: sys_log
_id: _id
sql: "select
id as _id, type,
title,
create_by as createBy,
UNIX_TIMESTAMP(create_date) as createDate,
remote_addr as remoteAddr,
user_agent as userAgent,
request_uri as requestUri,method,
params,exception
from sys_log"
etlCondition: "where create_date>={}"
commitBatch: 3000
12.在Kibana控制台中创建sys_log索引
PUT sys_log { ? "mappings": { ? ? ? "properties":{ ? ? ? ? "type":{ ? ? ? ? ? "type":"keyword" ? ? ? ? }, ? ? ? ? "title":{ ? ? ? ? ? "type":"text" ? ? ? ? }, ? ? ? ? "createBy":{ ? ? ? ? ? "type":"keyword" ? ? ? ? }, ? ? ? ? "remoteAddr":{ ? ? ? ? ? "type": "text" ? ? ? ? }, ? ? ? ? "userAgent":{ ? ? ? ? ? "type":"text" ? ? ? ? }, ? ? ? ? "requestUri": { ? ? ? ? ? "type": "text" ? ? ? ? }, ? ? ? ? "method": { ? ? ? ? ? "type": "keyword" ? ? ? ? }, ? ? ? ? "params": { ? ? ? ? ? "type": "text" ? ? ? ? }, ? ? ? ?"exception": { ? ? ? ? ? "type": "text" ? ? ? ? }, ? ? ? ? "createDate": { ? ? ? ? ? ?"type": "long" ? ? ? ? } ? ? ? } ? } }
13.通过命令触发,让canal-adapter读取到的dml日志,同步到es的库中? 命令如下:
curl -X POST http://127.0.0.1:8081/etl/es7/sys_log.yml
查看ES中数据同步的情况:
?手动删除? 添加数据? ?ES数据也会跟着同步的??
|