IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Oracle+Kafka+Flink(CDC捕获) 部署实时同步数据 -> 正文阅读

[大数据]Oracle+Kafka+Flink(CDC捕获) 部署实时同步数据

前言:之前使用confluent已经出过文档,因为confluent插件比较老,小编只是演示用,而且confluent对应数据的更新删除不好识别,这次小编使用的是debezium搭配单机的kafka环境,演示以单机环境为主,但会说到集群。

1.环境
1.1、Kafka:kafka_2.13-2.8.0
1.2、数据库:/Oracle11G。(Oracle开启归档)
1.3、计算引擎:Flink 1.13.1
1.4、Kafka Connector:
debezium-connector-mysql-1.4.0.Final-plugin.tar.gz
https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.0.Final/debezium-connector-mysql-1.4.0.Final-plugin.tar.gz
debezium-connector-postgres-1.4.0.Final-plugin.tar.gz
https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.4.0.Final/debezium-connector-postgres-1.4.0.Final-plugin.tar.gz
debezium-connector-oracle-1.4.0.Final-plugin.tar.gz
https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.4.0.Final/debezium-connector-oracle-1.4.0.Final-plugin.tar.gz

2.实时同步
2.1、下载debezium-connector-oracle-1.4.0.Final-plugin.tar.gz并解压,安装在自己的服务器,我的安装目录是/home/debezium/
2.2、将debezium-connector-oracle 目录下得jar包都拷贝一份到${KAFKA_HOME}/libs中
2.3、Oracle需要下载客户端并把jar包复制到
${KAFKA_HOME}/libs
下载地址:https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip
2.4、kafka环境修改
kafka安装目录:/home/kafka/kafka_2.13-2.8.0/
单机部署修改 [connect-standalone.properties]
集群部署修改 [connect-distributed.properties]
注:小编修改单机环境启动报错,最后无奈改为集群配置
具体如下
在这里插入图片描述
bootstrap.servers=172.16.50.22:9092
plugin.path=/home/debezium/debezium-connector-oracle

group.id=amirSync
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
// kafka connect内部需要用到的三个topic
config.storage.topic=amir-connect-configs
offset.storage.topic=amir-connect-offsets
status.storage.topic=amir-connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

cleanup.policy=compact

// kafka connect内部信息保存到kafka时消息的序列化方式
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

rest.host.name=172.16.50.22
rest.port=8085
2.5、启动Kafka集群,设置环境变量
export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties
./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties

2.6、提交Oracle-connector,监视Oracle数据库
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://172.16.50.22:8085/connectors/ -d ’
{
“name”: “debezium-oracle-amir”,
“config”: {
“connector.class” : “io.debezium.connector.oracle.OracleConnector”,
“tasks.max” : “1”,
“database.server.name” : “XE”,
“database.hostname” : “172.16.50.239”,
“database.port” : “1521”,
“database.user” : “MSCDW”,
“database.password” : “MSCDW2104”,
“database.dbname” : “XE”,
“database.schema” : “MSCDW”,
“database.connection.adapter”: “logminer”,
“database.tablename.case.insensitive”: “true”,
“table.include.list” : “MSCDW.*”,
“snapshot.mode” : “initial”,
“schema.include.list” : “MSCDW”,
“database.history.kafka.bootstrap.servers” : “172.16.50.22:9092”,
“database.history.kafka.topic”: “kafkadebeziumoracle”
}
}’

3.结果演示
3.1、查看创建的connector列表
在这里插入图片描述3.2、查看创建的connector状态
在这里插入图片描述3.3、查看创建的connector配置
在这里插入图片描述3.4、查看topic变化
当环境搭建好之后,默认为每个表创建一个属于自己的主题,如图所示,小编这里使用的kafka Tool工具查看
在这里插入图片描述3.5、在flink中创建source和sink的表
在这里插入图片描述String createDebeziumTable = String.format(“CREATE TABLE createDebeziumConfig (\n” +
“id STRING,\n” +
“cron STRING\n” +
“) WITH (\n” +
“‘connector’ = ‘kafka’,\n” +
“‘format’ = ‘debezium-json’,\n” +
“‘topic’ = ‘kafkadebeziumoracle’,\n” +
“‘properties.bootstrap.servers’ = ‘172.16.50.22:9092’,\n” +
“‘properties.group.id’ = ‘amirSync’,\n” +
“‘scan.startup.mode’ = ‘earliest-offset’,\n” +
“‘debezium-json.schema-include’ = ‘true’\n” +
“)”);
String sinkMysqlConfigTable = String.format(
“CREATE TABLE sinkMysqlConfigTable \n” +
“(\n” +
" ID VARCHAR,\n" +
" CRON VARCHAR\n" +
“) WITH (\n” +
" ‘connector.type’ = ‘jdbc’, \n" +
" ‘connector.url’ = ‘jdbc:mysql://113.140.81.69:13306/admin’, \n" +
" ‘connector.table’ = ‘config’,\n" +
" ‘connector.username’ = ‘root’,\n" +
" ‘connector.password’ = ‘a’, \n" +
" ‘connector.write.flush.max-rows’ = ‘1’ \n" +
“)”);

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-23 10:51:49  更:2021-07-23 10:52:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 13:19:19-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码