IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Kafka connect: 使用JDBC connector同步无主键的表时,如何处理无法执行delete操作的问题? -> 正文阅读

[大数据]Kafka connect: 使用JDBC connector同步无主键的表时,如何处理无法执行delete操作的问题?

环境:
kafka 2.8.0
debezium 1.9.0

我们主要实现使用debezium采集oracle的数据,将数据通过jdbc connector写到mysql,但是由于有一些oracle数据表没有主键,导致使用jdbc connector sink的时候不能正常地执行delete操作, 原因如下:

debezium采集数据发送到kafka, kafka的消息体的key默认是所采集的表的主键,如果表没有主键,则消息体的key为空。
由于源表有delete的操作,我们为了保持两边数据同步,使用jdbc connector sink的时候,需要设置"delete.enabled": "true", 但是这个设置要求"pk.mode": "record_key", 又因为没有主键,消息体中key为空,设置不了"pk.mode": "record_key",所以我们只能从源头出发,在source上处理对应的key。

具体操作如下:

Source端配置:

{
 
    "name": "oracle-source-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "database.server.name" : "ORCL",
        "tasks.max" : "1",
	    "database.server.name" : "orcl_server",
	    "database.hostname" : "oracl_地址",
	    "database.port" : "1521",
	    "database.user" : "user_name",
	    "database.password" : "passwd",
	    "database.dbname" : "db名称",
        "table.include.list": "DB2.STUDENT",
        "message.key.columns": "DB2.STUDENT:SNAME",
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.source",
        "event.processing.failure.handling.mode": "skip",
        "log.mining.strategy": "online_catalog",
        "database.history.skip.unparseable.ddl": "true",
        "database.history.store.only.captured.tables.ddl": "true",
        "time.precision.mode" : "connect",
     	"database.serverTimezone":"UTC+8",
     	"decimal.handling.mode": "string"
    }
}

主要配置:
"message.key.columns": "DB2.STUDENT:SNAME", 无主键的表 设置指定主键 可以指定多个列,也可以设置多个表 如:inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4

sink端配置

{
    "name":"oracle-sink-connector",
    "config":{
        "topics": "orcl_server.DB2.STUDENT",
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url": "jdbc:mysql://localhost:3306/orcl_source",
        "connection.user": "user_name",
        "connection.password": "pass_wd",
        "tasks.max": "1",
        "auto.create": "true",
        "auto.evolve": "true",
        "insert.mode":"upsert",
        "delete.enabled": "true",
        "batch.size": "3000",
        "pk.mode": "record_key",
        "transforms":"ExtractField,repTopic",
   "transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.ExtractField.field":"after",
        "transforms.repTopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.repTopic.regex":"(.*).(DB2.(.*))",
        "transforms.repTopic.replacement":"$2"
  }
}

总结

自此,就可以完成对应的操作了,好的,就到这里,希望对你有所帮助,希望感兴趣的小伙伴能够点个关注,一起交流学习~

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-03 09:25:32  更:2022-05-03 09:25:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 0:39:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码