目的
对内网IP进行字段信息丰富
背景
由于Geoip插件依赖的数据源文件GeoLite2-City.mmdb只包含了外网IP城市信息,因此只能够对外网IP进行字段信息丰富,主要原因在于Geoip插件依赖的数据源GeoLite2-City。 而GeoLite2-City.mmdb是一种二进制文件,没有找到合适的方法对其进行内网IP数据扩展,只能寻找它途。
方案
logstash提供了多种filter plugin,其中有些插件可以用来进行字段丰富,在这些插件中Jdbc_static filter plugin和Translate filter plugin能够很好的满足内网IP解析需求。 如果你使用的ES版本在7.5以上,你可以使用ES的enrich processor功能实现,本文不在介绍。如果你想要了解该功能,可以阅读https://elastic.blog.csdn.net/article/details/122138208。
Jdbc_static filter plugin
Jdbc_static filter 在首次启动时将远程数据库IP数据加载到loogstash本地内存数据库,对IP进行字段丰富时是从本地内存数据库获取IP城市信息。 内网IP数据发生变化后,不需重启logstash即可生效。可以通过设置loader_schedule参数来控制IP数据更新周期
示例如下:
filter {
jdbc_static {
loaders => [
{
id => "remote-servers"
query => "select ip, city_name, country_name from MYDATA.IP_CITY order by ip"
local_table => "servers"
}
]
local_db_objects => [
{
name => "servers"
index_columns => ["ip"]
columns => [
["ip","varchar(15)"],
["city_name","varchar(128)"],
["country_name","varchar(128)"]
]
}
]
local_lookups => [
{
id => "local-servers"
query => "select city_name, country_name from servers WHERE ip = :ip"
parameters => {ip => "[from_ip]"}
target => "server"
}
]
# using add_field here to add & rename values to the event root
add_field => { "[geoip][city_name]" => "%{[server][0][city_name]}"}
add_field => { "[geoip][country_name]" => "%{[server][0][country_name]}"}
remove_field => ["server"]
loader_schedule => "* */2 * * *" # run loaders every 2 hours
jdbc_user => "xxxx"
jdbc_password => "xxxxxx"
jdbc_driver_class => "com.ibm.db2.jcc.DB2Driver" # DB2数据库
jdbc_driver_library => "D:\MavenRepository\com\ibm\db2\db2jcc4\3.71.22\db2jcc4-3.71.22.jar" # DB2数据库驱动路径
jdbc_connection_string => "jdbc:db2://xx.xx.xx.xx:50000/EPLAT"
}
}
Translate filter plugin
与Jdbc_static filter将IP数据维护到数据库不同,Translate filter使用文件来存放IP数据信息,已知可支持文件格式有YAML,JSON和 CSV。 可以通过设置refresh_interval参数来控制IP数据更新周期 refresh_behaviour 参数可以控制数据更新方式,设置为’replace’,会全部替换掉原来老的数据。
示例如下: ip_city.yaml文件数据格式: “216.46.173.126”: “shanghai,china”
filter {
translate {
field => "from_ip"
destination => "ip_data"
dictionary_path => 'D:/ES/data/ip_city.yml'
refresh_interval => 86400
refresh_behaviour => 'replace'
}
if ("" in [ip_data]) {
csv {
source => ip_data
separator => ","
columns => [ "city_name", "country_name"]
add_field => { "[geoip][city_name]" => "%{[city_name]}"}
add_field => { "[geoip][country_name]" => "%{[country_name]}"}
remove_field => [ "ip_data", "city_name", "country_name" ]
}
}
}
|