前言:之前使用confluent已经出过文档,因为confluent插件比较老,小编只是演示用,而且confluent对应数据的更新删除不好识别,这次小编使用的是debezium搭配单机的kafka环境,演示以单机环境为主,但会说到集群。
1.环境 1.1、Kafka:kafka_2.13-2.8.0 1.2、数据库:/Oracle11G。(Oracle开启归档) 1.3、计算引擎:Flink 1.13.1 1.4、Kafka Connector: debezium-connector-mysql-1.4.0.Final-plugin.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.4.0.Final/debezium-connector-mysql-1.4.0.Final-plugin.tar.gz debezium-connector-postgres-1.4.0.Final-plugin.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.4.0.Final/debezium-connector-postgres-1.4.0.Final-plugin.tar.gz debezium-connector-oracle-1.4.0.Final-plugin.tar.gz https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.4.0.Final/debezium-connector-oracle-1.4.0.Final-plugin.tar.gz
2.实时同步 2.1、下载debezium-connector-oracle-1.4.0.Final-plugin.tar.gz并解压,安装在自己的服务器,我的安装目录是/home/debezium/ 2.2、将debezium-connector-oracle 目录下得jar包都拷贝一份到${KAFKA_HOME}/libs中 2.3、Oracle需要下载客户端并把jar包复制到 ${KAFKA_HOME}/libs 下载地址:https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip 2.4、kafka环境修改 kafka安装目录:/home/kafka/kafka_2.13-2.8.0/ 单机部署修改 [connect-standalone.properties] 集群部署修改 [connect-distributed.properties] 注:小编修改单机环境启动报错,最后无奈改为集群配置 具体如下 bootstrap.servers=172.16.50.22:9092 plugin.path=/home/debezium/debezium-connector-oracle
group.id=amirSync key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter // kafka connect内部需要用到的三个topic config.storage.topic=amir-connect-configs offset.storage.topic=amir-connect-offsets status.storage.topic=amir-connect-statuses
config.storage.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1
cleanup.policy=compact
// kafka connect内部信息保存到kafka时消息的序列化方式 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false
rest.host.name=172.16.50.22 rest.port=8085 2.5、启动Kafka集群,设置环境变量 export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties ./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties
2.6、提交Oracle-connector,监视Oracle数据库 curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://172.16.50.22:8085/connectors/ -d ’ { “name”: “debezium-oracle-amir”, “config”: { “connector.class” : “io.debezium.connector.oracle.OracleConnector”, “tasks.max” : “1”, “database.server.name” : “XE”, “database.hostname” : “172.16.50.239”, “database.port” : “1521”, “database.user” : “MSCDW”, “database.password” : “MSCDW2104”, “database.dbname” : “XE”, “database.schema” : “MSCDW”, “database.connection.adapter”: “logminer”, “database.tablename.case.insensitive”: “true”, “table.include.list” : “MSCDW.*”, “snapshot.mode” : “initial”, “schema.include.list” : “MSCDW”, “database.history.kafka.bootstrap.servers” : “172.16.50.22:9092”, “database.history.kafka.topic”: “kafkadebeziumoracle” } }’
3.结果演示 3.1、查看创建的connector列表 3.2、查看创建的connector状态 3.3、查看创建的connector配置 3.4、查看topic变化 当环境搭建好之后,默认为每个表创建一个属于自己的主题,如图所示,小编这里使用的kafka Tool工具查看 3.5、在flink中创建source和sink的表 String createDebeziumTable = String.format(“CREATE TABLE createDebeziumConfig (\n” + “id STRING,\n” + “cron STRING\n” + “) WITH (\n” + “‘connector’ = ‘kafka’,\n” + “‘format’ = ‘debezium-json’,\n” + “‘topic’ = ‘kafkadebeziumoracle’,\n” + “‘properties.bootstrap.servers’ = ‘172.16.50.22:9092’,\n” + “‘properties.group.id’ = ‘amirSync’,\n” + “‘scan.startup.mode’ = ‘earliest-offset’,\n” + “‘debezium-json.schema-include’ = ‘true’\n” + “)”); String sinkMysqlConfigTable = String.format( “CREATE TABLE sinkMysqlConfigTable \n” + “(\n” + " ID VARCHAR,\n" + " CRON VARCHAR\n" + “) WITH (\n” + " ‘connector.type’ = ‘jdbc’, \n" + " ‘connector.url’ = ‘jdbc:mysql://113.140.81.69:13306/admin’, \n" + " ‘connector.table’ = ‘config’,\n" + " ‘connector.username’ = ‘root’,\n" + " ‘connector.password’ = ‘a’, \n" + " ‘connector.write.flush.max-rows’ = ‘1’ \n" + “)”);
|