IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步 -> 正文阅读

[大数据]OpenShift 4 - 使用 Debezium 捕获变化数据,实现MySQL到PostgreSQL数据库同步

OpenShift / RHEL / DevSecOps 汇总目录
说明:本文已经在OpenShift 4.10环境中验证

场景说明

本文使用 OpenShift 的 AMQ Steams(即企业版 Kafka)和 Redhat 主导的 CDC 开源项目 Debezium 来实现从 MySQL 到 PostgreSQL 数据库的数据同步。
在这里插入图片描述
上图中的 Kafka Connector 提供了访问源或目标的参数, 而 Kafka Connect 为访问源或目标的实际运行环境,该环境运行在相关容器中。

注意:本文操作需要用到 access.redhat.com 账号,另外还需有一个镜像 Registry 服务的账号,本文使用的是 quay.io Registry 服务。

部署环境

首先创建一个项目

$ oc project db-cdc

安装CDC源和目标数据库

安装 MySQL

  1. 在 OpenShift 控制台的 “开发者” 视图的 “+添加” 中找到 “数据库”,然后点击 MySQL (Ephemeral)。在 “实例化模板” 界面中提供如图配置,最后点击创建。
    在这里插入图片描述
  2. 运行以下命令,在 MySQL 中创建测试数据表和记录。
$ wget https://raw.githubusercontent.com/liuxiaoyu-git/debezium_openshift/master/mysql/inventory.sql
$ MYSQL_POD=$(oc get pods --output=jsonpath={.items..metadata.name} -l name=mysql)
$ oc cp inventory.sql $MYSQL_POD:/tmp/
$ oc exec $MYSQL_POD -- sh -c 'mysql -uroot < /tmp/inventory.sql'
$ oc exec $MYSQL_POD -it -- mysql -u mysqluser -pmysqlpassword inventory
mysql> select * from customers;
+------+------------+-----------+-----------------------+
| id   | first_name | last_name | email                 |
+------+------------+-----------+-----------------------+
| 1001 | Sally      | Thomas    | sally.thomas@acme.com |
| 1002 | George     | Bailey    | gbailey@foobar.com    |
| 1003 | Edward     | Walker    | ed@walker.com         |
| 1004 | Anne       | Kretchmar | annek@noanswer.org    |
+------+------------+-----------+-----------------------+

安装 PostgreSQLSQL

  1. 在 OpenShift 控制台的 “开发者” 视图的 “+添加” 中找到 “数据库”,然后点击 PostgreSQL (Ephemeral)。在 “实例化模板” 界面中提供如图配置,最后点击创建。
    在这里插入图片描述
  2. 运行以下命令,确认当前在 PostgreSQL 中无 customers 表。
$ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name})
$ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory
inventory=> select * from customers;
ERROR 1146 (42S02): Table 'inventory.customers1' doesn't exist

安装 AMQ Stream 环境

安装 AMQ Stream Opeartor

在 OpenShift 中使用默认配置安装 AMQ Stream Opeartor,步骤略。

创建 Kafka 实例

在安装好的 AMQ Stream Opeartor 中根据以下配置创建 kafka 服务。

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: '3.1'
    storage:
      type: ephemeral
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    version: 3.1.0
    replicas: 3
  entityOperator:
    topicOperator: {}
    userOperator: {}
  zookeeper:
    storage:
      type: ephemeral
    replicas: 3

创建后会在 OpenShift 中看到部署的相关资源。
在这里插入图片描述

创建 KafkaConnect 用到的 Image

  1. 登录 registry.redhat.io。
$ podman login registry.redhat.io
  1. 使用以下内容创建名为 Dockerfile 的文件,其中包含访问 MySQL 日志的 Connect Plugin、访问 PostgreSQL 和 Kafka 的 JDBC Driver。
FROM registry.redhat.io/amq7/amq-streams-kafka-31-rhel8:2.1.0-4.1652296055

USER root:root

RUN mkdir -p /opt/kafka/plugins/debezium

ARG POSTGRES_VERSION=42.2.8
ARG KAFKA_JDBC_VERSION=5.3.2
ARG MYSQL_CONNECTOR_PLUGIN_VERSION=1.9.5

# Deploy MySQL Connect Plugin
RUN cd /opt/kafka/plugins/debezium/ &&\
        curl -sO https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/$MYSQL_CONNECTOR_PLUGIN_VERSION.Final/debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz &&\
        tar -xf debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz &&\
        rm -f debezium-connector-mysql-$MYSQL_CONNECTOR_PLUGIN_VERSION.Final-plugin.tar.gz

# Deploy PostgreSQL JDBC Driver
RUN cd /opt/kafka/libs && curl -sO https://jdbc.postgresql.org/download/postgresql-$POSTGRES_VERSION.jar

# Deploy Kafka Connect JDBC
RUN cd /opt/kafka/plugins/debezium/ &&\
        curl -sO https://packages.confluent.io/maven/io/confluent/kafka-connect-jdbc/$KAFKA_JDBC_VERSION/kafka-connect-jdbc-$KAFKA_JDBC_VERSION.jar

USER 1001
  1. 在 Dockerfile 文件所在目录运行命令,在本地构建镜像。说明:请根据镜像 Registry 服务地址和账号名修改命令中的镜像名称。
$ podman build -t quay.io/dawnskyliu/connect-debezium:v1 .
  1. 登录 Registry 服务器,然后将生成的本地镜像推送到镜像 Registry 服务器上。
$ podman login quay.io
$ podman push quay.io/dawnskyliu/connect-debezium:v1
  1. 在镜像 Registry 服务器上确认 connect-debezium 是 Public 访问类型的 Repository。
    在这里插入图片描述

配置 KafkaConnect

在安装好的 AMQ Stream Opeartor 中根据以下配置创建 KafkaConnect 对象,其中使用了前面生成的 “quay.io/dawnskyliu/connect-debezium:v1” 镜像。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "true"
spec:
  version: 3.1.0
  replicas: 1
  image: 'quay.io/dawnskyliu/connect-debezium:v1'
  bootstrapServers: 'my-cluster-kafka-bootstrap:9093'
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    config.storage.replication.factor: 1
    offset.storage.replication.factor: 1
    status.storage.replication.factor: 1
    config.storage.min.insync.replicas: 1
    offset.storage.min.insync.replicas: 1
    status.storage.min.insync.replicas: 1

配置 KafkaConnector

MySqlConnector

在安装好的 AMQ Stream Opeartor 中根据以下配置创建 KafkaConnector 对象。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: mysql-source-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.debezium.connector.mysql.MySqlConnector
  tasksMax: 1
  config:
    "database.hostname": "mysql"
    "database.ssl.mode": "disabled"
    "database.allowPublicKeyRetrieval": "true"
    "database.port": "3306"
    "database.user": "debezium"
    "database.password": "dbz"
    "database.server.id": "1"
    "database.server.name": "dbserver1"
    "database.include": "inventory"
    "database.history.kafka.bootstrap.servers": "my-cluster-kafka-bootstrap:9092"
    "database.history.kafka.topic": "schema-changes.inventory"
    "transforms": "route"
    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter"
    "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)"
    "transforms.route.replacement": "$3"

JdbcSinkConnector

在安装好的 AMQ Stream Opeartor 中根据以下配置创建 KafkaConnector 对象。

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: postgresql-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: io.confluent.connect.jdbc.JdbcSinkConnector
  tasksMax: 1
  config:
    "topics": "customers"
    "connection.url": "jdbc:postgresql://postgresql:5432/inventory?user=postgresuser&password=postgrespw"
    "transforms": "unwrap"
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState"
    "transforms.unwrap.drop.tombstones": "false"
    "auto.create": "true"
    "insert.mode": "upsert"
    "delete.enabled": "true"
    "pk.fields": "id"
    "pk.mode": "record_key"

环境检查

在 AMQ Streams 的 Operator 中确认 Kafka,Kafka Connect 和 Kafka Connector 的运行状态。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

CDC 验证

数据同步

确认 customers 表和数据已经从 MySQL 同步到 PostgreSQL 中。

$ POSTGRESQL_POD=$(oc get pod -l name=postgresql -o jsonpath={.items[0].metadata.name})
$ oc exec $POSTGRESQL_POD -it -- psql -U postgresuser inventory
inventory=> select * from customers;
 last_name |  id  | first_name |         email
-----------+------+------------+-----------------------
 Thomas    | 1001 | Sally      | sally.thomas@acme.com
 Bailey    | 1002 | George     | gbailey@foobar.com
 Walker    | 1003 | Edward     | ed@walker.com
 Kretchmar | 1004 | Anne       | annek@noanswer.org
(4 rows)

添加数据

在 MySQL 中执行命令添加新数据,然后在 PostgreSQL 确认变化数据已同步。

mysql> INSERT INTO customers VALUES (default,"test1","test1","test1@acme.com");

更新数据

在 MySQL 中执行命令更新数据,然后在 PostgreSQL 确认变化数据已同步。

mysql> update customers set first_name='Test' where id = 1001;

删除数据

在 MySQL 中执行命令删除数据,然后在 PostgreSQL 确认变化数据已同步。

mysql> delete from customers where first_name='test1';

参考

https://github.com/liuxiaoyu-git/debezium_openshift
https://debezium.io/documentation/reference/1.9/operations/openshift.html
https://github.com/debezium/debezium-examples/tree/main/openshift
https://aws.amazon.com/cn/blogs/china/debezium-deep-dive/

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-08-19 19:12:48  更:2022-08-19 19:13:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 23:59:34-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码