logstash同步mysql数据至elastic
题记
项目数据采集、数据分析需要,我们需要同步MySQL 数据至ES ;在研发、测试、压测过程中,我们采用了代码同步存储MySQL 数据至ES ,MQ 异步消息方式同步数据,定时任务批量同步;但是最终发现了一个问题。
问题分析
不管是何种方式,需要通过JAVA 代码调起,操作相对繁琐,整体性能不高,由于项目自身已经介入了ELK ,尝试通过Logstash 数据采集的形式实现相关数据同步。
参考资料
https://www.elastic.co/guide/en/logstash/6.4/plugins-inputs-jdbc.html
https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-elasticsearch.html
https://www.elastic.co/guide/en/logstash/6.4/plugins-outputs-stdout.html
实现方案
logstash conf 配置文件
test_user.conf
input {
jdbc {
jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_table"
jdbc_user => "root"
jdbc_password => "bsakjd7sl2ada"
schedule =>"* * * * *"
statement => "SELECT id ,user_id ,user_name, update_time FROM user_db where update_time >:sql_last_value"
type => "user_table"
}
jdbc {
jdbc_driver_library => "/opt/mysql/mysql-connector-java-8.0.19.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://10.x.3.xxx:3306/user_db"
jdbc_user => "root"
jdbc_password => "bsakjd7sl2ada"
schedule =>"* * * * *"
statement => "SELECT id ,user_id ,user_name, user_phone, user_address, update_time FROM user_table_msg where update_time >:sql_last_value"
type => "user_table_msg"
}
}
filter {
}
output {
if[type]=="user_table"{
elasticsearch{
hosts => "http://10.x.x.xxx:9200"
user => "elastic"
password => "S8sdshlD"
index => "user_table"
document_id => "%{id}"
document_type => "user_table"
}
}
if[type]=="user_table_msg"{
elasticsearch{
hosts => "http://10.x.x.xxx:9200"
user => "elastic"
password => "S8sdshlD"
index => "user_table_msg"
document_id => "%{id}"
document_type => "user_table_msg"
}
}
stdout {
codec => json_lines
}
}
mysql建表语句
建表语句
CREATE TABLE `USER_TABLE` (
`ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id',
`USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称',
`UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户表';
CREATE TABLE `USER_TABLE_MSG` (
`ID` BIGINT(20) NOT NULL AUTO_INCREMENT COMMENT '主键id',
`USER_ID` VARCHAR(32) NOT NULL DEFAULT '' COMMENT '用户id',
`USER_NAME` VARCHAR(100) NOT NULL DEFAULT '' COMMENT '用户名称',
`USER_PHONE` VARCHAR(11) NOT NULL DEFAULT '' COMMENT '用户手机号',
`USER_ADDRESS` VARCHAR(255) NOT NULL DEFAULT '' COMMENT '用户地址',
`UPDATE_TIME` TIMESTAMP NULL DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`)
) ENGINE=INNODB DEFAULT CHARSET=UTF8 COMMENT '用户信息表';
测试验证
登录kibana 查询索引user_table 相关数据信息,按照既定的update_time 验证数据同步条件是否正常。
问题延伸
在上述问题分析、开发、验证的过程中,我们采用logstash plugin 完成了数据从mysql 数据库到elastic 的抽取过程,但是如果数据量过大,需要怎么处理呢,我们如何解决单次查询无法覆盖增量数据的情况。
如何完成循环查询
select () update_time >:sql_last_value limit 0, 1000;
分页策略实现
上述文件TODO 位置增加如下配置
jdbc_paging_enabled => true
jdbc_page_size => 1000
再次验证
- 造数:在最新时间节点后,单次提交增加
1000+ 以上数据; - 重启:重启节点,重新编译加载
test_user.conf文件 ; - 查看
logstash 采集日志,登录kibana 平台查看数据;
多表实现
input 模块增加jdbc 节点,type 为user_table_msg ;output 模块增加if[type]=="user_table_msg" 节点;
|