input{
jdbc{
type => "mysql_es"
# 数据库驱动包存放路径
jdbc_driver_library => "D:\tools\es\logstash-8.0.0\lib\mysql-connector-java-8.0.19.jar"
# 数据库驱动器;
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 数据库连接方式
jdbc_connection_string => "jdbc:mysql://localhost:3306/es?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "123456"
# 数据库重连尝试次数
connection_retry_attempts => "3"
# 判断数据库连接是否可用,默认false不开启
jdbc_validate_connection => "true"
# 数据库连接可用校验超时时间,默认3600s
jdbc_validation_timeout => "3600"
# 开启分页查询(默认false不开启)
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
jdbc_page_size => "100000"
# statement为查询数据sql,如果sql较复杂,建议通过statement_filepath配置sql文件的存放路径
#statement_filepath => "D:\dev\logstash-7.8.0\config\business_order\jdbc.sql"
statement => "SELECT id,name,DATE_FORMAT(date,'%Y-%m-%d %H:%i:%S')as date FROM es WHERE date >= :sql_last_value order by id asc"
#statement => "SELECT * FROM es" #不要设置use_column_value => true
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
lowercase_column_names => false
# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中
record_last_run => true
# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
#:sql_last_value如果input里面use_column_value => true, 即如果设置为true的话,可以是我们设定的字段的上一次的值。
#默认 use_column_value => false, 这样 :sql_last_value为上一次更新的最后时刻值。
#也就是说,对于新增的值,才会更新。这样就实现了增量更新的目的。
use_column_value => true
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => "date"
# 查询结果某字段的数据类型,仅包括numeric和timestamp,默认为numeric
tracking_column_type => timestamp
# 记录上次执行结果数据的存放位置
last_run_metadata_path => "D:\tools\es\logstash-5.4.0\logs\mysql.txt"
# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false
clean_run => false
# 同步频率(分 时 天 月 年),默认每分钟同步一次
schedule => "* * * * *"
}
# 同步多个表
jdbc{
type => "movies"
# 数据库驱动包存放路径
jdbc_driver_library => "D:\tools\es\logstash-8.0.0\lib\mysql-connector-java-8.0.19.jar"
# 数据库驱动器;
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
# 数据库连接方式
jdbc_connection_string => "jdbc:mysql://localhost:3306/es?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT"
# 数据库用户名
jdbc_user => "root"
# 数据库密码
jdbc_password => "123456"
# 数据库重连尝试次数
connection_retry_attempts => "3"
# 判断数据库连接是否可用,默认false不开启
jdbc_validate_connection => "true"
# 数据库连接可用校验超时时间,默认3600s
jdbc_validation_timeout => "3600"
# 开启分页查询(默认false不开启)
jdbc_paging_enabled => "true"
# 单次分页查询条数(默认100000,若字段较多且更新频率较高,建议调低此值)
jdbc_page_size => "100000"
# statement为查询数据sql,如果sql较复杂,建议通过statement_filepath配置sql文件的存放路径
#statement_filepath => "D:\dev\logstash-7.8.0\config\business_order\jdbc.sql"
statement => "SELECT * FROM movies WHERE update_time >= :sql_last_value order by id asc"
#statement => "SELECT id,name,DATE_FORMAT(date,'%Y-%m-%d %H:%i:%S') FROM es" #不要设置use_column_value => true
# 是否将字段名转换为小写,默认true(如果有数据序列化、反序列化需求,建议改为false);
lowercase_column_names => false
# 是否记录上次执行结果,true表示会将上次执行结果的tracking_column字段的值保存到last_run_metadata_path指定的文件中
record_last_run => true
# 需要记录查询结果某字段的值时,此字段为true,否则默认tracking_column为timestamp的值
#:sql_last_value如果input里面use_column_value => true, 即如果设置为true的话,可以是我们设定的字段的上一次的值。
#默认 use_column_value => false, 这样 :sql_last_value为上一次更新的最后时刻值。
#也就是说,对于新增的值,才会更新。这样就实现了增量更新的目的。
use_column_value => true
# 需要记录的字段,用于增量同步,需是数据库字段
tracking_column => "update_time"
# 查询结果某字段的数据类型,仅包括numeric和timestamp,默认为numeric
tracking_column_type => timestamp
# 记录上次执行结果数据的存放位置
last_run_metadata_path => "D:\tools\es\logstash-5.4.0\logs\movies.txt"
# 是否清除last_run_metadata_path的记录,需要增量同步时此字段必须为false
clean_run => false
# 同步频率(分 时 天 月 年),默认每分钟同步一次
schedule => "* * * * *"
}
}
filter {
#这个插件
mutate {
#copy => { "id" => "_id"}
#remove_field => ["id", "@version", "unix_ts_in_secs"]
remove_field => [ "@version", "@timestamp"]
}
}
output{
if [type] == "mysql_es" {
elasticsearch{
# ES地址,集群中多个地址可用数组形式:hosts => ["localhost:9200"]
hosts => "localhost:9200"
#data_stream => "true"
# 索引名称
index => "mysql_es"
# 数据唯一索引(建议同数据库表的唯一ID对应)
document_id => "%{id}"
#如果设置为true,模板名字一样的时候,新的模板会覆盖旧的模板
#template_overwrite => true #http://localhost:9200/_template 魔板例子
#template_name => "my_template" #注意这个名字是用来查找映射配置的,尽量设置成全局唯一的
#映射配置文件的位置
#template => "D:\tools\es\logstash-5.4.0\config\template\logstash-ik.json"
#关闭logstash自动管理模板功能
#manage_template => true
}
}
if [type] == "movies" {
elasticsearch{
# ES地址,集群中多个地址可用数组形式:hosts => ["localhost:9200"]
hosts => "localhost:9200"
#data_stream => "true"
# 索引名称
index => "movies"
# 数据唯一索引(建议同数据库表的唯一ID对应)
document_id => "%{id}"
}
}
#stdout { codec => rubydebug }
}
注意:serverTimezone=CTT 这样可以让es 同步日期正常不会多8个小时
"jdbc:mysql://localhost:3306/es?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=CTT"
问题:为什么es 自定义模板 设置mapping时日期格式不起作用?
"date": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss",
"store": true
}
|