需求分析
通过 FileBeats 采集日志,采集后的日志输出到 Logstash ,Logstash 处理过滤后(将每条日志解析成字段)将数据输出到 Elasticserch
安装配置
下载安装包
Download Elastic Stack Logstash 7.6.1
上传并解压
unzip logstash-7.6.1.zip
启动测试
cd /opt/server/logstash-7.6.1
bin/logstash -e 'input { stdin { } } output { stdout {} }'
FileBeat 采集日志到 Logstash
日志结构
字段名 | 说明 |
---|
client IP | 浏览器端IP | timestamp | 请求的时间戳 | method | 请求方式(GET/POST) | uri | 请求的链接地址 | status | 服务器端响应状态 | length | 响应的数据长度 | reference | 从哪个URL跳转而来 | browser | 浏览器 |
准备测试数据
235.9.200.242 - - [15/Apr/2015:00:27:19 +0849] "POST /itcast.cn/bigdata.html 200 45 "www.baidu.com" "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36 Core/1.53.4549.400 QQBrowser/9.7.12900 144.180.122.249
准备配置文件
cd /opt/server/filebeat-7.6.1-linux-x86_64
vim filebeat-logstash.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /opt/server/web-log/access.*
multiline.pattern: '^\d+\.\d+\.\d+\.\d+ '
multiline.negate: true
multiline.match: after
output.logstash:
enabled: true
hosts: ["node1:5044"]
启动 FileBeat
cd /opt/server/filebeat-7.6.1-linux-x86_64
./filebeat -e -c filebeat-logstash.yml
Logstash 接收数据并打印
执行说明
启动 FileBeat ,紧接着启动 Logstash ,向 FileBeat 采集的日志文件中加入一条数据,测试 Logstash 是否能接收到数据并打印在控制台
准备配置文件
cd /opt/server/logstash-7.6.1/config
vim filebeat-print.conf
input {
beats {
port => 5044
}
}
output {
stdout {
codec => rubydebug
}
}
测试 Logstash 配置是否正确
cd /opt/server/logstash-7.6.1
bin/logstash -f config/filebeat-print.conf --config.test_and_exit
启动 Logstash
bin/logstash -f config/filebeat-print.conf --config.reload.automatic
Logstash 输出到 Elasticsearch
准备配置文件
cd /opt/server/logstash-7.6.1/config
vim filebeat-es.conf
input {
beats {
port => 5044
}
}
output {
elasticsearch {
hosts => [ "node1:9200","node2:9200","node3:9200"]
}
}
启动 Logstash
bin/logstash -f config/filebeat-es.conf --config.reload.automatic
Logstash 采集 Mysql 数据并打印
准备配置文件
cd /opt/server/logstash-7.6.1/config
vim mysql-print.conf
① 全量采集
input {
jdbc {
jdbc_driver_library => "/opt/server/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://117.126.1.123:3306/test"
jdbc_user => "root"
jdbc_password => "****"
schedule => "*/1 * * * *"
statement => "SELECT * from wcresult where number > 10;"
}
}
output{
stdout{
codec=>rubydebug
}
}
② 增量采集
input {
jdbc {
jdbc_driver_library => "/opt/server/mysql-connector-java-5.1.38.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://117.126.1.123:3306/test"
jdbc_user => "root"
jdbc_password => "****"
use_column_value => true
tracking_column => "id"
schedule => "*/1 * * * *"
statement => "SELECT * from wcresult where id > :sql_last_value;"
}
}
output{
stdout{
codec=>rubydebug
}
}
Logstash 采集用户输入到文件中
准备配置文件
cd /opt/server/logstash-7.6.1/config
vim input-file.conf
input {stdin{}}
output {
file {
path => "/opt/servers/es/logstash-7.6.1/usercase/datas/%{+YYYY-MM-dd}-%{host}.txt"
codec => line {
format => "%{message}"
}
flush_interval => 0
}
}
Logstash 采集用户输入到 ES 中
准备配置文件
cd /opt/server/logstash-7.6.1/config
vim input-es.conf
input {stdin{}}
output {
elasticsearch {
hosts => ["node1:9200"]
index => "logstash-%{+YYYY.MM.dd}"
}
}
Logstash 采集文件数据到 Kafka 中
准备配置文件
cd /opt/server/logstash-7.6.1/config
vim file-kafka.conf
input {
file{
path => "/opt/server/web-login.log"
type => "log"
start_position => "beginning"
}
}
output {
kafka {
topic_id => "login"
bootstrap_servers => "node1:9092,node2:9092,node3:9092"
batch_size => 5
}
}
Logstash 采集 Kafka 数据到 Es 中
准备配置文件
cd /opt/server/logstash-7.6.1/config
vim Kafka-es.conf
input{
kafka {
group_id => "test"
auto_offset_reset => "earliest"
topics => ["login"]
bootstrap_servers => "node1:9092,node2:9092,node3:9092"
}
}
output {
elasticsearch {
hosts => ["node1:9200"]
index => "web_login_log_%{+YYYY-MM}"
}
}
Logstash 过滤器
过滤器说明
在 Logstash 中可以配置过滤器 Filter 对采集到的数据进行中间处理,在 Logstash 中,有大量的插件供我们使用
查看 Logstash 已安装的插件
Filter plugins | Logstash Reference 7.6 | Elastic
cd /opt/server/logstash-7.6.1
bin/logstash-plugin list
Grok 插件
Grok 插件说明
Grok 是通过模式匹配的方式来识别日志中的数据,可以把 Grok 插件简单理解为升级版本的正则表达式。它拥有更多的模式,默认,Logstash 拥有 120 个模式。如果这些模式不满足我们解析日志的需求,我们可以直接使用正则表达式来进行匹配
常用的 Grok 模式
NUMBER | 匹配数字(包含:小数) |
---|
INT | 匹配整形数字 | POSINT | 匹配正整数 | WORD | 匹配单词 | DATA | 匹配所有字符 | IP | 匹配IP地址 | PATH | 匹配路径 |
Grok 模式语法
Grok filter plugin | Logstash Reference 7.6 | Elastic
%{SYNTAX:SEMANTIC}
%{NUMBER:duration} %{IP:client}
%{NUMBER:duration:int} %{IP:client}
mutate 插件
插件说明
mutate 插件主要是作用在字段上,它可以对字段进行过滤、重命名、删除、替换或者修改结构
插件使用
Mutate filter plugin | Logstash Reference 7.6 | Elastic
Setting | 作用 | input type | Required |
---|
add_field | 添加额外的字段 | hash | No | add_tag | 添加额外的标记 | array | No | enable_metric | 是否启用额外的度量字段 | boolean | No | id | mutate 插件的 id | string | No | periodic_flush | 周期性刷新 | boolean | No | remove_field | 移除字段 | array | No | remove_tag | 移除标记 | array | No |
date 插件
插件说明
解析字段中的日期,并进行格式转换
插件使用
Date filter plugin | Logstash Reference 7.6 | Elastic
Logstash 解析所有字段并输出到 Elasticsearch
修改配置文件
cd /opt/server/logstash-7.6.1/config
vim filebeat-es.conf
input {
beats {
port => 5044
}
}
filter {
grok {
match => {
"message" => "%{IP:ip} - - \[%{HTTPDATE:date}\] \"%{WORD:method} %{PATH:uri} %{DATA}\" %{INT:status:int} %{INT:length:int} \"%{DATA:reference}\" \"%{DATA:browser}\""
}
}
mutate {
enable_metric => "false"
remove_field => ["message", "log", "tags", "input", "agent", "host", "ecs", "@version"]
}
date {
match => ["date","dd/MMM/yyyy:HH:mm:ss Z","yyyy-MM-dd HH:mm:ss"]
target => "date"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
hosts => ["node1:9200" ,"node2:9200" ,"node3:9200"]
index => "apache_web_log_%{+YYYY-MM}"
}
}
启动检查
bin/logstash -f config/filebeat-es.conf --config.reload.automatic
|