情景: 我这边对接kafka,通过^符号来进行字符串分割 其中, 有个时间字段,但是,时间是14位的数字字符串 类似:20220517162218 于是,我这边需要转换成date类型的,才能在页面上按时间查询 转换成:2022-05-17 14:51:23
代码:
mutate{
split => ["message","^"]
add_field => {
"my_time" => "%{[message][8]}"
}
}
date {
match => ["my_time","yyyyMMddHHmmss"]
target => "my_time"
}
ruby {
code => "event.set('my_time', (event.get('my_time').time.localtime).strftime('%Y-%m-%d %H:%M:%S'))"
}
代码我不做过多的说明了,logstash工具的官方文档就那样,不好理解,只能自己试试。
完整的conf配置内容如下:
input{
file{
path => "C:/Users/Administrator/Desktop/logstash_7.12.0_kafka_test/alldata/*"
type => "default"
add_field => {"flag"=>"fk_11l_topic"}
start_position => "beginning"
type => "default"
codec => plain { charset => "UTF-8" }
}
}
filter {
mutate{
split => ["message","^"]
add_field => {
"my_time" => "%{[message][8]}"
}
}
date {
match => ["my_time","yyyyMMddHHmmss"]
target => "my_time"
}
ruby {
code => "event.set('my_time', (event.get('my_time').time.localtime).strftime('%Y-%m-%d %H:%M:%S'))"
}
if [message][0] {
mutate {
add_field => {
"acc_num" => "%{[message][0]}"
}
}
} else {
mutate {
add_field => {
"acc_num" => ""
}
}
}
if [message][8] {
mutate {
add_field => {
"violation_time" => "%{my_time}"
}
}
} else {
mutate {
add_field => {
"violation_time" => ""
}
}
}
mutate{
add_field => {
"scene_name" => "WHITELIST_MESSAGE"
"violation_type_code" => "21"
"kafka_topic" => "fk_111_topic"
}
remove_field=>["message"]
remove_field=>["@version"]
remove_field=>["my_time"]
}
}
output {
elasticsearch {
hosts => "127.0.0.1:9200"
index => "1212white111list_message_%{+YYYY-MM-dd}"
retry_on_conflict => 5
document_id => "%{acc_num}"
codec => plain { charset => "UTF-8" }
}
stdout { codec => rubydebug }
}
|