SQL_server增量同步kafka
事前准备:
1>centos7.5 *4台(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4) 2>zookeeper-3.4.6 *4台(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4) 3>kafka_2.11-2.4.0 *4台(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4) 4>SQLserver *1台(192.168.0.10 Microsoft SQL Server 2016 (SP2) ) 5>debezium-connector-sqlserver (debezium-connector-sqlserver-1.2.1.Final-plugin.tar.gz)
需要注意:
1.真实配置的sqlserver版本是否支持CDC。 2.debezium-connector(具体详见:https://debezium.io/releases/2.0/) 3.zookeeper集群正常启动 4.kafka集群正常启动 5.SQLserver代理服务正常启动 6.其中(192.168.0.1;192.168.0.2;192.168.0.3;192.168.0.4)可以访问192.168.0.10的1433端口
zookeeper集群部署(4台)
zookeeper的部署主要说一下配置的内容(zoo.cfg):
tickTime=2000 initLimit=10 syncLimit=5 dataDir=./zookeeper-3.4.6/data clientPort=2181 server.1=192.168.0.1:2888:3888 server.2=192.168.0.2:2888:3888 server.3=192.168.0.3:2888:3888 server.4=192.168.0.4:2888:3888
zookeeper服务启动:
./zookeeper-3.4.6/bin/zkServer.sh start
zookeeper服务验证:
./zookeeper-3.4.6/bin/zkServer.sh status 输出(leader or follower)
kafka集群部署(4台)
kafka的部署配置内容(server.properties)四台服务器相同配置,唯一区别listeners 修改为自己的IP
broker.id=0 producer.type=sync request.required.acks=-1 max.in.flight.requests.per.connection = 1 listeners =PLAINTEXT://192.168.0.1:9092 num.network.threads=4 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/data/kafka num.partitions=4 num.recovery.threads.per.data.dir=1 default.replication.factor=4 offsets.topic.replication.factor=4 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181 zookeeper.connection.timeout.ms=600000 group.initial.rebalance.delay.ms=0
kafka的部署配置内容(zookeeper.properties)四台服务器相同配置
dataDir=/tmp/zookeeper clientPort=2181 maxClientCnxns=0 admin.enableServer=false
*kafka的部署配置内容(connect-distributed.properties)四台服务器相同配置
bootstrap.servers=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092 group.id =connect-cluster key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets offset.storage.replication.factor=2 offset.storage.partitions=3 #offset.storage.partitions=25 config.storage.topic=connect-configs config.storage.replication.factor=2 status.storage.topic=connect-status status.storage.replication.factor=2 offset.flush.interval.ms=10000 rest.port=18083 plugin.path =./kafka_2.11-2.4.0/connectors
kafka connector插件:
1.建立connectors文件夹目录层级同上述plugin.path 2.下载ebezium-connector-sqlserver-1.2.1.Final-plugin.tar.gz 链接:https://pan.baidu.com/s/1-Hiv0c3Zfr8nbtS16JzKxA 提取码:4v66 3.解压tar包
kafka后台服务启动:
cd ./kafka_2.11-2.4.0/bin sh kafka-server-start.sh -daemon …/config/server.properties
kafka服务验证((kafka Manager)
安装kafka Manager管理工具 详见:https://github.com/yahoo/kafka-manager
kafka启动连接器
./kafka_2.11-2.4.0/bin/connect-distributed.sh -daemon ./kafka_2.11-2.4.0/config/connect-distributed.properties
验证JPS(4台)
SQLserver 开启CDC (1台)
###建立database create database yl_test use yl_test create table TEST_1(id int not null ,V_1 varchar(10),V_2 int ) ###数据库开启CDC EXEC sys.sp_cdc_enable_db ###数据表开启CDC EXEC sys.sp_cdc_enable_table @source_schema = N’dbo’, @source_name = N’TEST_1’, @role_name = NULL, @supports_net_changes = 1 GO ######开启SQLserver代理 sp_configure ‘show advanced options’, 1; GO RECONFIGURE WITH OVERRIDE; GO sp_configure ‘Agent XPs’, 1; GO RECONFIGURE WITH OVERRIDE GO sp_configure ‘show advanced options’, 1; GO RECONFIGURE WITH OVERRIDE; GO sp_configure ‘Agent XPs’, 1; GO RECONFIGURE WITH OVERRIDE GO ##########插入数据 insert dbo.TEST_1 values(1,‘b’,500)
建立SQLserver的连接
建立一个run.josn
{ “name”: “sqlserver-cdc-source”, “config”: { “connector.class” : “io.debezium.connector.sqlserver.SqlServerConnector”, “database.server.name” : “TEST”, “database.hostname” : “192.168.0.10”, “database.port” : “1433”, “database.user” : “sa”, “database.password” : “password”, “database.dbname” : “yl_test”, “table.whitelist”: “dbo.TEST_1”, “schemas.enable” : “false”, “mode”:“incrementing”, ----自增列模式 同理还有stamp模式等 “incrementing.column.name”: “ID”, —自增列的ID “database.history.kafka.bootstrap.servers” : “192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092”, “database.history.kafka.topic”: “TopicTLioCDC”, --内部 “value.converter.schemas.enable”:“false”, “value.converter”:“org.apache.kafka.connect.json.JsonConverter” } }
建立一个kafka连接器
cd 到 run.josn 目录下 curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” “http://192.168.0.1:18083/connectors”/ -d @run.json
连接器相关命令
curl http://192.168.0.1:18083/connectors --查看连接器 curl -X DELETE http://192.168.0.1:18083/connectors/connector_name --删除连接器 curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” “http://192.168.0.1:18083/connectors”/ -d @run.json --建立连接器
捕捉增量数据
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092,192.168.0.4:9092 --consumer-property group.id=group1 --consumer-property client.id=consumer-1 --topic TEST.dbo.TEST_1
KafKa相关
查看topic
bin/kafka-topics.sh --list --zookeeper 192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181,192.168.0.4:2181
删除topic 登录zookeeper
1.登录zookeeper ---->./bin/zkCli.sh 2 删除topic ---->rmr /brokers/topics/example
topic建立
bin/kafka-topics.sh --zookeeper 192.168.0.1:2181 --create --topic topicname --replication-factor 1 --partitions 3
topic发送消息
bin/kafka-console-producer.sh --broker-list 192.168.0.0.1:9092 --topic test
|