环境: kafka 2.8.0 debezium 1.9.0
我们主要实现使用debezium采集oracle的数据,将数据通过jdbc connector写到mysql,但是由于有一些oracle数据表没有主键,导致使用jdbc connector sink的时候不能正常地执行delete操作, 原因如下:
debezium采集数据发送到kafka, kafka的消息体的key默认是所采集的表的主键,如果表没有主键,则消息体的key为空。 由于源表有delete的操作,我们为了保持两边数据同步,使用jdbc connector sink的时候,需要设置"delete.enabled": "true" , 但是这个设置要求"pk.mode": "record_key" , 又因为没有主键,消息体中key为空,设置不了"pk.mode": "record_key" ,所以我们只能从源头出发,在source上处理对应的key。
具体操作如下:
Source端配置:
{
"name": "oracle-source-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "ORCL",
"tasks.max" : "1",
"database.server.name" : "orcl_server",
"database.hostname" : "oracl_地址",
"database.port" : "1521",
"database.user" : "user_name",
"database.password" : "passwd",
"database.dbname" : "db名称",
"table.include.list": "DB2.STUDENT",
"message.key.columns": "DB2.STUDENT:SNAME",
"database.history.kafka.bootstrap.servers" : "kafka:9092",
"database.history.kafka.topic": "schema-changes.source",
"event.processing.failure.handling.mode": "skip",
"log.mining.strategy": "online_catalog",
"database.history.skip.unparseable.ddl": "true",
"database.history.store.only.captured.tables.ddl": "true",
"time.precision.mode" : "connect",
"database.serverTimezone":"UTC+8",
"decimal.handling.mode": "string"
}
}
主要配置: "message.key.columns": "DB2.STUDENT:SNAME" , 无主键的表 设置指定主键 可以指定多个列,也可以设置多个表 如:inventory.customers:pk1,pk2;(.*).purchaseorders:pk3,pk4
sink端配置
{
"name":"oracle-sink-connector",
"config":{
"topics": "orcl_server.DB2.STUDENT",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://localhost:3306/orcl_source",
"connection.user": "user_name",
"connection.password": "pass_wd",
"tasks.max": "1",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode":"upsert",
"delete.enabled": "true",
"batch.size": "3000",
"pk.mode": "record_key",
"transforms":"ExtractField,repTopic",
"transforms.ExtractField.type":"org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.ExtractField.field":"after",
"transforms.repTopic.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.repTopic.regex":"(.*).(DB2.(.*))",
"transforms.repTopic.replacement":"$2"
}
}
总结
自此,就可以完成对应的操作了,好的,就到这里,希望对你有所帮助,希望感兴趣的小伙伴能够点个关注,一起交流学习~
|