IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> elasticsearch和mysql数据同步 基于canal canal-server和canal-adapter -> 正文阅读

[大数据]elasticsearch和mysql数据同步 基于canal canal-server和canal-adapter

第一步: 下载mysql 8.0.18版本的镜像

docker pull mysql:8.0.18

第二步: 运行mysql的实例

docker run --name mysql8.0.18 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=admin -d mysql:8.0.18

第三步: 在宿主机中新建一个文件用来存储mysql的配置文件, 拷贝出mysql容器中的文件到本地

第四步: 关掉刚刚新建的mysql容器, 新建一个挂载盘的容器

# 创建一个宿主机新的文件
mkdir /mydata/mysql8.0.18


# 将mysql的配置文件copy到宿主机文件中
docker cp mysql:8.0.18:/etc/mysql/. /mydata/mysql8.0.18/conf/

# 将mysql的数据copy到宿主机文件中
docker cp mysql:8.0.18:/var/lib/mysql/. /mydata/mysql8.0.18/data/

# 删除mysql容器
docker stop mysql8.0.18
docker rm mysql8.0.18

# 再次新建mysql容器, 并且挂载数据到宿主机中, 方便后续配置文件重启mysql
docker run --name mysql8.0.18 -p 3306:3306 
-v /mydata/mysql8.0.18/conf:/etc/mysql 
-v /mydata/mysql8.0.18/data:/var/lib/mysql 
--restart=always 
-e MYSQL_ROOT_PASSWORD=admin 
-d mysql:8.0.18

第五步: 开始配置mysql? 配置完成重启容器

下面是配置文件内容:

[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
secure-file-priv= NULL

# 打开binlog
log-bin=mysql-bin
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server-id=1

# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0

# Custom config should go here
!includedir /etc/mysql/conf.d/

重启mysql容器

docker restart mysql8.0.18

查看配置是否成功:

# 进入docker mysql容器内部
docker exec -it mysql8.0.18 /bin/bash

mysql -uroot -p

# 输入密码

show master status;

show varialbles like 'log_bin';

新建一下后面测试需要的数据库和表

drop DATABASE if EXISTS mytest;
CREATE DATABASE `mytest` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

use mytest;

drop table if exists user;
create table user(
	`id` varchar(256) COMMENT 'id',
	`username` varchar(256) comment '用户登录名',
	`password` varchar(256) comment '登录密码',
	`nickname` varchar(25) comment '用户昵称',
	`address` varchar(256) comment '用户地址',
	`age` int(5) comment '用户年龄',
	primary key (`id`)
)ENGINE=INNODB;

安装 canal-server, 启动一个canal-server容器

#拉取镜像
docker pull canal/canal-server:v1.1.5
#第一次运行
docker run --name canal-server -p 11111:11111 -d canal/canal-server:v1.1.5

拷贝配置文件到宿主机

# 将容器中的配置文件copy到宿主机
docker cp canal-server:/home/admin/canal-server/. /mydata/canal-server/

# 关闭容器,  重新挂载配置到宿主机启动容器
docker stop canal-server
docker rm canal-server

# 启动canal-server
docker run --name canal-server -p 11111:11111 -v /mydata/canal-server:/home/admin/canal-server -d canal/canal-server:v1.1.5

修改配置文件

vim /mydata/canal-server/conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=11

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306  # 你的数据库地址
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root  # 您的数据库账号
canal.instance.dbPassword=admin #你的数据库密码
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

安装canal-adapter

#拉取镜像
docker pull slpcat/canal-adapter:v1.1.5
#第一次运行
docker run --name canal-adapter -p 8081:8081 -d slpcat/canal-adapter:v1.1.5

拷贝配置文件到宿主机

docker cp canal-adapter:/opt/canal-adapter/.  /mydata/canal-adapter/


# 关闭之前的容器
docker stop canal-adapter
docker rm canal-adapter

# 挂载配置文件到宿主机, 启动容器
docker run --name canal-adapter -p 8081:8081 -v /mydata/canal-adapter:/opt/canal-adapter  -d slpcat/canal-adapter:v1.1.5

编写配置文件

# 编辑配置文件
vim /mydata/canal-adapter/conf/application.yml

配置文件内容

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #  之前起的 canal-server 地址  url
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    # kafka.bootstrap.servers: 127.0.0.1:9092
    # kafka.enable.auto.commit: false
    # kafka.auto.commit.interval.ms: 1000
    # kafka.auto.offset.reset: latest
    # kafka.request.timeout.ms: 40000
    # kafka.session.timeout.ms: 30000
    # kafka.isolation.level: read_committed
    # kafka.max.poll.records: 1000
    # rocketMQ consumer
    # rocketmq.namespace:
    # rocketmq.namesrv.addr: 127.0.0.1:9876
    # rocketmq.batch.size: 1000
    # rocketmq.enable.message.trace: false
    # rocketmq.customized.trace.topic:
    # rocketmq.access.channel:
    # rocketmq.subscribe.filter:
    # rabbitMQ consumer
    # rabbitmq.host:
    # rabbitmq.virtual.host:
    # rabbitmq.username:
    # rabbitmq.password:
    # rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # 你的数据库地址
      username: root   # 数据库账号
      password: admin  # 数据库密码
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7  # 你的es配置
        hosts: 127.0.0.1:9200 # 你的es配置
        properties:
         mode: rest # 你的es配置
         cluster.name: test-es # 你的es配置

编写配置脚本

vim /mydata/canal-adapter/conf/es7/test.yml

编写配置文件内容

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: mytest # es索引
  _id: _id
  _type: _doc
  upsert: true
  sql: "select a.id as _id, a.username as username, a.password as password, a.nickname as nickname, a.address as address, a.age as age from user a"
  commitBatch: 3000

测试一下

添加elastisearch的索引

{
  "mappings": {
    "properties": {
      "username": {
        "type": "text"
      },
      "password": {
        "type": "text"
      },
      "nickname": {
        "type": "text"
      },
      "address": {
        "type": "text"
      },
      "age": {
        "type": "text"
      }
    }
  }
}

重启canal-server和canal-adapter

docker restart canal-server

docker restart canal-adapter

数据库新增修改删除, elasticsearch都会相应的改变

关于如何全量同步已存在的数据进入es

# 8081 就是指的canal-adapter起的服务地址  触发 请求去告诉adapter需要同步全量的数据
curl -X POST http://127.0.0.1:8081/etl/es7/test.yml # yml就是指的es7文件中的文件名

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-14 10:00:40  更:2022-05-14 10:01:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:29:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码