第一步: 下载mysql 8.0.18版本的镜像
docker pull mysql:8.0.18
第二步: 运行mysql的实例
docker run --name mysql8.0.18 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:8.0.18
第三步: 在宿主机中新建一个文件用来存储mysql的配置文件, 拷贝出mysql容器中的文件到本地
第四步: 关掉刚刚新建的mysql容器, 新建一个挂载盘的容器
# 创建一个宿主机新的文件
mkdir /mydata/mysql8.0.18
# 将mysql的配置文件copy到宿主机文件中
docker cp mysql:8.0.18:/etc/mysql/. /mydata/mysql8.0.18/conf/
# 将mysql的数据copy到宿主机文件中
docker cp mysql:8.0.18:/var/lib/mysql/. /mydata/mysql8.0.18/data/
# 删除mysql容器
docker stop mysql8.0.18
docker rm mysql8.0.18
# 再次新建mysql容器, 并且挂载数据到宿主机中, 方便后续配置文件重启mysql
docker run --name mysql8.0.18 -p 3306:3306
-v /mydata/mysql8.0.18/conf:/etc/mysql
-v /mydata/mysql8.0.18/data:/var/lib/mysql
--restart=always
-e MYSQL_ROOT_PASSWORD=admin
-d mysql:8.0.18
第五步: 开始配置mysql? 配置完成重启容器
下面是配置文件内容:
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
secure-file-priv= NULL
# 打开binlog
log-bin=mysql-bin
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server-id=1
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
# Custom config should go here
!includedir /etc/mysql/conf.d/
重启mysql容器
docker restart mysql8.0.18
查看配置是否成功:
# 进入docker mysql容器内部
docker exec -it mysql8.0.18 /bin/bash
mysql -uroot -p
# 输入密码
show master status;
show varialbles like 'log_bin';
新建一下后面测试需要的数据库和表
drop DATABASE if EXISTS mytest;
CREATE DATABASE `mytest` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
use mytest;
drop table if exists user;
create table user(
`id` varchar(256) COMMENT 'id',
`username` varchar(256) comment '用户登录名',
`password` varchar(256) comment '登录密码',
`nickname` varchar(25) comment '用户昵称',
`address` varchar(256) comment '用户地址',
`age` int(5) comment '用户年龄',
primary key (`id`)
)ENGINE=INNODB;
安装 canal-server, 启动一个canal-server容器
#拉取镜像
docker pull canal/canal-server:v1.1.5
#第一次运行
docker run --name canal-server -p 11111:11111 -d canal/canal-server:v1.1.5
拷贝配置文件到宿主机
# 将容器中的配置文件copy到宿主机
docker cp canal-server:/home/admin/canal-server/. /mydata/canal-server/
# 关闭容器, 重新挂载配置到宿主机启动容器
docker stop canal-server
docker rm canal-server
# 启动canal-server
docker run --name canal-server -p 11111:11111 -v /mydata/canal-server:/home/admin/canal-server -d canal/canal-server:v1.1.5
修改配置文件
vim /mydata/canal-server/conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=11
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=127.0.0.1:3306 # 你的数据库地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root # 您的数据库账号
canal.instance.dbPassword=admin #你的数据库密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################
安装canal-adapter
#拉取镜像
docker pull slpcat/canal-adapter:v1.1.5
#第一次运行
docker run --name canal-adapter -p 8081:8081 -d slpcat/canal-adapter:v1.1.5
拷贝配置文件到宿主机
docker cp canal-adapter:/opt/canal-adapter/. /mydata/canal-adapter/
# 关闭之前的容器
docker stop canal-adapter
docker rm canal-adapter
# 挂载配置文件到宿主机, 启动容器
docker run --name canal-adapter -p 8081:8081 -v /mydata/canal-adapter:/opt/canal-adapter -d slpcat/canal-adapter:v1.1.5
编写配置文件
# 编辑配置文件
vim /mydata/canal-adapter/conf/application.yml
配置文件内容
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111 # 之前起的 canal-server 地址 url
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
# kafka.bootstrap.servers: 127.0.0.1:9092
# kafka.enable.auto.commit: false
# kafka.auto.commit.interval.ms: 1000
# kafka.auto.offset.reset: latest
# kafka.request.timeout.ms: 40000
# kafka.session.timeout.ms: 30000
# kafka.isolation.level: read_committed
# kafka.max.poll.records: 1000
# rocketMQ consumer
# rocketmq.namespace:
# rocketmq.namesrv.addr: 127.0.0.1:9876
# rocketmq.batch.size: 1000
# rocketmq.enable.message.trace: false
# rocketmq.customized.trace.topic:
# rocketmq.access.channel:
# rocketmq.subscribe.filter:
# rabbitMQ consumer
# rabbitmq.host:
# rabbitmq.virtual.host:
# rabbitmq.username:
# rabbitmq.password:
# rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # 你的数据库地址
username: root # 数据库账号
password: admin # 数据库密码
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: es7 # 你的es配置
hosts: 127.0.0.1:9200 # 你的es配置
properties:
mode: rest # 你的es配置
cluster.name: test-es # 你的es配置
编写配置脚本
vim /mydata/canal-adapter/conf/es7/test.yml
编写配置文件内容
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: mytest # es索引
_id: _id
_type: _doc
upsert: true
sql: "select a.id as _id, a.username as username, a.password as password, a.nickname as nickname, a.address as address, a.age as age from user a"
commitBatch: 3000
测试一下
添加elastisearch的索引
{
"mappings": {
"properties": {
"username": {
"type": "text"
},
"password": {
"type": "text"
},
"nickname": {
"type": "text"
},
"address": {
"type": "text"
},
"age": {
"type": "text"
}
}
}
}
重启canal-server和canal-adapter
docker restart canal-server
docker restart canal-adapter
数据库新增修改删除, elasticsearch都会相应的改变
关于如何全量同步已存在的数据进入es
# 8081 就是指的canal-adapter起的服务地址 触发 请求去告诉adapter需要同步全量的数据
curl -X POST http://127.0.0.1:8081/etl/es7/test.yml # yml就是指的es7文件中的文件名
|