IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> SQL_server增量同步kafka -> 正文阅读

[大数据]SQL_server增量同步kafka

SQL_server增量同步kafka

事前准备:

1>centos7.5 *4台(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4)
2>zookeeper-3.4.6 *4台(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4)
3>kafka_2.11-2.4.0 *4台(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4)
4>SQLserver *1台(192.168.0.10 Microsoft SQL Server 2016 (SP2) )
5>debezium-connector-sqlserver (debezium-connector-sqlserver-1.2.1.Final-plugin.tar.gz)

需要注意:

1.真实配置的sqlserver版本是否支持CDC。
2.debezium-connector(具体详见:https://debezium.io/releases/2.0/)
3.zookeeper集群正常启动
4.kafka集群正常启动
5.SQLserver代理服务正常启动
6.其中(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4)可以访问192.168.0.10的1433端口

zookeeper集群部署(4台)

zookeeper的部署主要说一下配置的内容(zoo.cfg):

tickTime=2000
initLimit=10
syncLimit=5
dataDir=./zookeeper-3.4.6/data
clientPort=2181
server.1=192.168.0.1:2888:3888
server.2=192.168.0.2:2888:3888
server.3=192.168.0.3:2888:3888
server.4=192.168.0.4:2888:3888

zookeeper服务启动:

./zookeeper-3.4.6/bin/zkServer.sh start

zookeeper服务验证:

./zookeeper-3.4.6/bin/zkServer.sh status
输出(leader or follower)

kafka集群部署(4台)

kafka的部署配置内容(server.properties)四台服务器相同配置,唯一区别listeners修改为自己的IP

broker.id=0
producer.type=sync
request.required.acks=-1
max.in.flight.requests.per.connection = 1
listeners=PLAINTEXT://192.168.0.1:9092
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka
num.partitions=4
num.recovery.threads.per.data.dir=1
default.replication.factor=4
offsets.topic.replication.factor=4
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181
zookeeper.connection.timeout.ms=600000
group.initial.rebalance.delay.ms=0

kafka的部署配置内容(zookeeper.properties)四台服务器相同配置

dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

*kafka的部署配置内容(connect-distributed.properties)四台服务器相同配置

bootstrap.servers=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=2
offset.storage.partitions=3
#offset.storage.partitions=25
config.storage.topic=connect-configs
config.storage.replication.factor=2
status.storage.topic=connect-status
status.storage.replication.factor=2
offset.flush.interval.ms=10000
rest.port=18083
plugin.path=./kafka_2.11-2.4.0/connectors

kafka connector插件:

1.建立connectors文件夹目录层级同上述plugin.path
2.下载ebezium-connector-sqlserver-1.2.1.Final-plugin.tar.gz
链接:https://pan.baidu.com/s/1-Hiv0c3Zfr8nbtS16JzKxA 提取码:4v66
3.解压tar包

kafka后台服务启动:

cd ./kafka_2.11-2.4.0/bin
sh kafka-server-start.sh -daemon …/config/server.properties

kafka服务验证((kafka Manager)

安装kafka Manager管理工具
详见:https://github.com/yahoo/kafka-manager

kafka启动连接器

./kafka_2.11-2.4.0/bin/connect-distributed.sh -daemon ./kafka_2.11-2.4.0/config/connect-distributed.properties

验证JPS(4台)

在这里插入图片描述

SQLserver 开启CDC (1台)

###建立database
create database yl_test
use yl_test
create table TEST_1(id int not null ,V_1 varchar(10),V_2 int )
###数据库开启CDC
EXEC sys.sp_cdc_enable_db
###数据表开启CDC
EXEC sys.sp_cdc_enable_table
@source_schema = N’dbo’,
@source_name = N’TEST_1’,
@role_name = NULL,
@supports_net_changes = 1
GO
######开启SQLserver代理
sp_configure ‘show advanced options’, 1;
GO
RECONFIGURE WITH OVERRIDE;
GO
sp_configure ‘Agent XPs’, 1;
GO
RECONFIGURE WITH OVERRIDE
GO
sp_configure ‘show advanced options’, 1;
GO
RECONFIGURE WITH OVERRIDE;
GO
sp_configure ‘Agent XPs’, 1;
GO
RECONFIGURE WITH OVERRIDE
GO
##########插入数据
insert dbo.TEST_1 values(1,‘b’,500)

建立SQLserver的连接

建立一个run.josn

{
“name”: “sqlserver-cdc-source”,
“config”: {
“connector.class” : “io.debezium.connector.sqlserver.SqlServerConnector”,
“database.server.name” : “TEST”,
“database.hostname” : “192.168.0.10”,
“database.port” : “1433”,
“database.user” : “sa”,
“database.password” : “password”,
“database.dbname” : “yl_test”,
“table.whitelist”: “dbo.TEST_1”,
“schemas.enable” : “false”,
“mode”:“incrementing”, ----自增列模式 同理还有stamp模式等
“incrementing.column.name”: “ID”, —自增列的ID
“database.history.kafka.bootstrap.servers” : “192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092”,
“database.history.kafka.topic”: “TopicTLioCDC”, --内部
“value.converter.schemas.enable”:“false”,
“value.converter”:“org.apache.kafka.connect.json.JsonConverter”
}
}

建立一个kafka连接器

cd 到 run.josn 目录下
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” “http://192.168.0.1:18083/connectors”/ -d @run.json

连接器相关命令

curl http://192.168.0.1:18083/connectors --查看连接器
curl -X DELETE http://192.168.0.1:18083/connectors/connector_name --删除连接器
curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” “http://192.168.0.1:18083/connectors”/ -d @run.json --建立连接器

捕捉增量数据

bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092 --consumer-property group.id=group1 --consumer-property client.id=consumer-1 --topic TEST.dbo.TEST_1

KafKa相关

查看topic

bin/kafka-topics.sh --list --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181

删除topic 登录zookeeper

1.登录zookeeper ---->./bin/zkCli.sh
2 删除topic ---->rmr /brokers/topics/example

topic建立

bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --create --topic topicname --replication-factor 1 --partitions 3

topic发送消息

bin/kafka-console-producer.sh --broker-list 192.168.0.0.1:9092 --topic test

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-15 11:37:30  更:2022-05-15 11:37:47 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 6:38:38-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码