IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> logstash同步mysql数据至elastic -> 正文阅读

[大数据]logstash同步mysql数据至elastic

logstash同步mysql数据至elastic

题记

项目数据采集、数据分析需要,我们需要同步MySQL数据至ES;在研发、测试、压测过程中,我们采用了代码同步存储MySQL数据至ESMQ异步消息方式同步数据,定时任务批量同步;但是最终发现了一个问题。

问题分析

不管是何种方式,需要通过JAVA代码调起,操作相对繁琐,整体性能不高,由于项目自身已经介入了ELK,尝试通过Logstash数据采集的形式实现相关数据同步。

参考资料

https://www.elastic.co/guide/en/logstash/6.4/plugins-inputs-jdbc.html

https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-elasticsearch.html

https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-stdout.html

实现方案

logstash conf 配置文件

test_user.conf
input {
    jdbc {
        jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_table"
        jdbc_user => "root"
        jdbc_password => "bsakjd7sl2ada"
        # TODO
        schedule =>"* * * * *"
        statement => "SELECT id ,user_id ,user_name, update_time FROM user_db where update_time >:sql_last_value"
        type => "user_table"
    }
    # 多表实现
    jdbc {
        jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_db"
        jdbc_user => "root"
        jdbc_password => "bsakjd7sl2ada"
        # TODO
        schedule =>"* * * * *"
        statement => "SELECT id ,user_id ,user_name, user_phone, user_address, update_time FROM user_table_msg where update_time >:sql_last_value"
        type => "user_table_msg"
    }
}
filter {
}
output {
    if[type]=="user_table"{
          elasticsearch{
              # 集群:["",""]
              hosts => "http://10.x.x.xxx:9200"
              user => "elastic"
              password => "S8sdshlD"
              index => "user_table"
              document_id => "%{id}"
              document_type => "user_table"
           }
    } 
    # 多表实现
    if[type]=="user_table_msg"{
          elasticsearch{
              # 集群:["",""]
              hosts => "http://10.x.x.xxx:9200"
              user => "elastic"
              password => "S8sdshlD"
              index => "user_table_msg"
              document_id => "%{id}"
              document_type => "user_table_msg"
           }
    }   
    stdout {
        # JSON输出
        # codec => json
        codec => json_lines
    }
}


mysql建表语句

建表语句

-- 用户表
CREATE TABLE `USER_TABLE` (
  `ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id',
  `USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称',
  `UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户表';


-- 用户信息表
CREATE TABLE `USER_TABLE_MSG` (
  `ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
  `USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id',
  `USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称',
  `USER_PHONE` VARCHAR(11) NOT NULL DEFAULT '' COMMENT '用户手机号',
  `USER_ADDRESS` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '用户地址',
  `UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`ID`)
) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户信息表';

测试验证

登录kibana查询索引user_table相关数据信息,按照既定的update_time验证数据同步条件是否正常。

问题延伸

在上述问题分析、开发、验证的过程中,我们采用logstash plugin完成了数据从mysql数据库到elastic的抽取过程,但是如果数据量过大,需要怎么处理呢,我们如何解决单次查询无法覆盖增量数据的情况。

如何完成循环查询

select () update_time >:sql_last_value limit 0, 1000;

分页策略实现

上述文件TODO位置增加如下配置

# 这将导致SQL语句分为多个查询。
# 每个查询都将使用限制和偏移来集体检索完整结果集。
# 使用jdbc_page_size设置限制大小。
jdbc_paging_enabled => true
jdbc_page_size => 1000

再次验证

  • 造数:在最新时间节点后,单次提交增加1000+以上数据;
  • 重启:重启节点,重新编译加载test_user.conf文件
  • 查看logstash采集日志,登录kibana平台查看数据;

多表实现

  • input模块增加jdbc 节点,typeuser_table_msg
  • output模块增加if[type]=="user_table_msg"节点;
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-04-01 00:08:39  更:2022-04-01 00:09:06 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 14:55:21-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码