IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> debezium+kafka connector 实时采集mysql -> 正文阅读

[大数据]debezium+kafka connector 实时采集mysql

配置mysql

创建用户并授权

创建用户

CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';

授予权限

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium' IDENTIFIED BY 'password';

FLUSH PRIVILEGES;

启用二进制日志

1.检查是否支持二进制日志

SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';

2.如果是OFF,请使用以下属性配置您的 MySQL 服务器配置文件并重启服务器,如下表所述:

log_bin           = mysql-bin
binlog_format     = ROW
binlog_row_image  = FULL
expire_logs_days  = 10

3.检查二进制文件

SELECT variable_value as "BINARY LOGGING STATUS (log-bin) ::" FROM information_schema.global_variables WHERE variable_name='log_bin';

启用gdit

因为现在使用的单节点暂时未开启

安装kafka

启动kafka-connect

修改config/connect-distributed.properties的plugin.path和bootstrap.servers的值

修改日志文件 config/connect-log4j.properties

log4j.rootLogger=INFO, kafkaConnectAppender

log4j.appender.kafkaConnectAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.kafkaConnectAppender.DatePattern='.'yyyy-MM-dd
log4j.appender.kafkaConnectAppender.File=/hadoop/kafka_2.11-0.11.0.2/logs/kafka-connect.log
log4j.appender.kafkaConnectAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.kafkaConnectAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.org.apache.zookeeper=ERROR
log4j.logger.org.I0Itec.zkclient=ERROR
log4j.logger.org.reflections=ERROR

启动kafka-connnector? ?bin/connect-distributed.sh -daemon config/connect-distributed.properties

api查看kafka信息http://ip:8083

启动mysql-connector

1.配置mysql-connector文件

{
    "name": "inventory-connector", 
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
        "database.hostname": "mysqlip", 
        "database.port": "3306", 
        "database.user": "debezium", 
        "database.password": "password", 
        "database.server.id": "184054", 
        "database.server.name": "fullfillment", 
        "database.include.list": "数据库1,数据库2", 
        "database.history.kafka.bootstrap.servers": "kafka:9092", 
        "database.history.kafka.topic": "dbhistory.fullfillment", 
        "include.schema.changes": "true" 
    }
}

其他详细配置参数官方文档https://debezium.io/documentation/reference/1.6/connectors/mysql.html#mysql-connector-properties

2.通过api启动mysql-connect

post方式提交http://ip:8083/connectors,参数为json文件

3.查看connect信息:get?http://ip:8083/connectors?查看所有的connnectors

get?http://ip:8083/connectors/inventory-connector/status?获取当前的状态

查看kafka

1.dbhistory.fullfillment为表结构修改数据

2..dbhistory.fullfillment.[table] 为数据修改数据

可能遇到的问题:

ClassNotFoundException: org.apache.kafka.connect.header.ConnectHeaders

kafka版本过低,使用1.1及以上的版本即可

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-31 15:30:56  更:2021-08-31 15:32:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 16:55:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码