目录
嵌套Json格式数据
JSON格式一
JSON格式二
?JSON格式三
组件解析:
时间相关操作
调整logstash @timestamp
时间戳转换
嵌套Json格式数据
JSON格式一
?有如下JSON日志(position下是一个JSON)
{
"RequestTime":1637737587605,
"timestamp":"2021-11-24T15:06:42.681Z",
"position":{
"LogType":"请求日志",
"TopDirectory":"stream_ad_v1",
"RequestIp":"127.0.0.1"
}
}
将其解析为:
{
"RequestIp" => "127.0.0.1",
"TopDirectory" => "stream_ad_v1",
"timestamp" => "2021-11-24T15:06:42.681Z",
"RequestTime" => 1637737587605,
"LogType" => "请求日志"
}
思路:
1、新建一个新的字段(position_su),并将position赋值给新字段
2、对新字段进行JSON解析
3、移除多余字段(可选)
?详细logstash.conf:
input{
stdin {
codec => json {
charset => ["UTF-8"]
}
}
}
filter{
mutate {
add_field => {"position_su" => "%{position}"} #先新建一个新的字段,并将position赋值给它
}
json {
source => "position_su" # 再对该字段进行解析
remove_field => ["position_su","position"] # 最后移出多余字段(可选)
}
}
output{
stdout { codec => rubydebug }
}
JSON格式二
有如以下JSON格式(position下是一个JSON数组,数组元素为1)
{
"RequestTime":1637737587605,
"timestamp":"2021-11-24T15:06:42.681Z",
"position":[
{
"LogType":"请求日志",
"TopDirectory":"stream_ad_v1",
"RequestIp":"127.0.0.1"
}
]
}
如果不做改变,沿用上述方法,将解析失败,如图:
?此时可以先不进行JSON转换,将“[]”替换之后,在进行JSON转换,
logstash.conf:
input{
stdin {}
}
filter{
mutate {
gsub => [ "message","\[","" ]
gsub => [ "message","\]","" ]
}
json { source => "message" }
mutate {
add_field => {"position_su" => "%{position}"} #先新建一个新的字段,并将position赋值给它
}
json {
source => "position_su" # 再对该字段进行解析
remove_field => ["position_su","position"] # 最后移出多余字段(可选)
}
}
output{
stdout { codec => rubydebug }
}
?JSON格式三
有如下JSON数据格式(position下是一个JSON数组,数组元素为n)
{
"RequestTime":1637737587605,
"timestamp":"2021-11-24T15:06:42.681Z",
"position":[
{
"LogType":"请求日志",
"TopDirectory":"stream_ad_v1",
"RequestIp":"127.0.0.1"
},
{
"LogType":"请求日志",
"TopDirectory":"stream_ad_v2",
"RequestIp":"127.0.0.1"
},
{
"LogType":"请求日志",
"TopDirectory":"stream_ad_v3",
"RequestIp":"127.0.0.1"
}
]
}
此时将“[]”替换就不能解决问题,观察此JSON经过logstash处理后的输出格式,如图:
我们发现,logstash支持数组元素按索引位置获取元素,因此,logstash.conf可以如下改造:
input{
stdin {
codec =>json {
charset => ["UTF-8"]
}
}
}
filter{
mutate {
add_field => {"position_json" => "%{[position][0]}"}
}
mutate {
add_field => {"position_su" => "%{position_json}"} #先新建一个新的字段,并将position赋值给它
}
json {
source => "position_su" # 再对该字段进行解析
remove_field => ["position_su","position"] # 最后移出多余字段(可选)
}
}
output{
stdout { codec => rubydebug }
}
执行结果如图:
备注: 如果遇到以下复杂的版本,LogType下嵌套一个或多个JSON数组。
{
"RequestTime":1637737587605,
"timestamp":"2021-11-24T15:06:42.681Z",
"position":[
{
"LogType":[
{
"a":"请求日志a"
},
{
"b":"请求日志b"
},
{
"c":"请求日志c"
}
],
"TopDirectory":"stream_ad_v1",
"RequestIp":"127.0.0.1"
},
{
"LogType":"请求日志",
"TopDirectory":"stream_ad_v2",
"RequestIp":"127.0.0.1"
},
{
"LogType":"请求日志",
"TopDirectory":"stream_ad_v3",
"RequestIp":"127.0.0.1"
}
]
}
??????? 目前我能想到的方法是抽丝剥茧,依次向下取数据。但实际工作中,这样的需求是不合理的,类似的操作应该放到ElasticSearch中来完成。
组件解析:
codec
????????编码插件(codec)可以在logstash输入或输出时处理不同类型的数据,同时,还可以更好更方便的与其他自定义格式的数据产品共存,比如:fluent、netflow、collectd等通用数据格式的其他产品。
????????因此,logstash不只是一个input→filter→output的数据流,而是一个input→decode→filter→encode→output的数据流。
????????codec支持的编码格式常见有plain、json、json_lines等。
- plain 是最简单的编码插件,你输入什么信息,就返回什么信息。使用样例:
- output { stdout { codec => plain } }
- json 有时候logstash采集的日志是JSON格式,那我们可以在input字段加入codec => json来进行解析,这样就可以根据具体内容生成字段,方便分析和储存。如果想让logstash输出为json格式,可以在output字段加入codec=>json。
- json_lines(即JSON的格式化) 使用json插件时,每个字段都是key:value格式,多个字段之间通过逗号分隔。这种输出比较长,因此我们可以采用json_lines编码格式稍微好一点。
mutate (过滤器)
mutate过滤器能够帮助你修改指定字段的内容。
mutate { ?? ?convert => { ??? ??? ? "TopDirectory" => "string" ?? ??? ? "RequestIp" => "string" ??? } }
复制一个已存在的字段到另外一个字段,已存在的字段会被重新写到一个新的字段,新的字段不需要单独添加。 mutate { ?? ?copy => {"TopDirectory" => "TopDirectory2"} }
这里只针对string类型字段,如下把name字段中的“o”替换为“p” mutate { ?? ?gsub => ["name","o","p"] }
- 大小写转换 lowercase&uppercase
mutate {
?? ?#lowercase => [ "TopDirectory" ]??? #小写
?? ?uppercase => [ "TopDirectory" ]??????? #大写
}
mutate {
?? ? rename => {"TopDirectory" => "TopDirectory3"}
}
mutate {
?? ?strip => ["TopDirectory"]
}
mutate {
?? ?update => {"TopDirectory" => "li"}
}
作用和 update 类似,但是当字段不存在的时候,它会起到 add_field 参数一样的效果,自动添加新的字段。
mutate {
?? ?remove_field => ["TopDirectory"]
}
mutate {
?? ?add_field => {"testField1" => "0"}
?? ?add_field => {"testField2" => "%{TopDirectory}"}?? #引用TopDirectory中的值
}
时间相关操作
我们先来看一下logstash的标准输出:
需要解决的问题:
- ?目前系统时间为2021-11-26T11:31:16, 而logstash显示时间明显与服务器相差8个小时
- ?使用日志时间替换logstash中的时间
调整logstash @timestamp
问题:logstash时间与服务器时间相差8小时
解决思路:
将“@timestamp”时间加8个小时(8 * 60 * 60)
在filter中加入:
filter{
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby { code => "event.set('@timestamp',event.get('timestamp'))" }
mutate {
remove_field => ["timestamp"]
}
}
时间戳转换
需求:用日志时间替换logstash时间
解决方法:
调用?date { } 方法
?在filter中加入:
filter {
date {
match => ["RequestTime", "UNIX_MS", "yyyy-MM-dd HH:mm:ss,SSS"]
target => "@timestamp"
locale => "cn"
}
}
备注:
- match中参数:字段名,格式化模式,要转换的时间格式
- target:值类型是字符串,默认值是“@timestamp”。将匹配的时间戳存储到给定的目标字段中。
- locale:值类型是字符串,这个设置没有默认值。使用IETF-BCP47或POSIX语言标记指定用于日期分析的区域设置。 简单的例子是:
logstash中支持的时间格式如下:
- ISO8601 - 将解析任何有效的ISO8601时间戳,如2021-11-30T17:44:01.103Z
- UNIX - 将解析float或int值,表示自1346149001.132以及1326149001.132以来的秒数(以秒为单位)
- UNIX_MS - 将分析int值表示unix时间(以毫秒为单位),如1366125117000TAI64N - 将解析tai64n时间值
路漫漫其修远兮,吾将上下而求索。??????????????? ---屈原?
|