大家先看一下ELK通过rabbitmq消息队列保存日志的结构图: 此种架构引入了消息队列机制,位于各个节点上的Logstash Agent先将数据/日志传递给RabbitMQ(或者Redis),并将队列中消息或数据间接传递给Logstash,Logstash过滤、分析后将数据传递给Elasticsearch存储。最后由Kibana将日志和数据呈现给用户。因为引入了RabbitMQ(或者Redis),所以即使远端Logstash server因故障停止运行,数据将会先被存储下来,从而避免数据丢失。
那我们来看一下如何配置: logstash的配置文件logstash.conf(自定义名称)
input{
rabbitmq {
host => "192.168.20.42" #RabbitMQ-IP地址
vhost => "/" #虚拟主机
port => 5672 #端口号
user => "test" #用户名
password => "test" #密码
queue => "LogQueue" #队列
durable => false #持久化跟队列配置一致
codec => "plain" #格式
}
tcp {
host => "localhost"
mode => "server"
port => 9900
}
file{
path => ["E:/logs/logsFile.log"] # 想要收集的日志信息文件-注意全路径
type=>"elasticsearch" # 指定一个类型
start_position =>"beginning" # 说明从文件最开始读
}
}
filter{
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
# 请求头
mutate {
convert => {
"bytes" => "integer"
}
}
# ip 地理位置
geoip {
source => "clientip"
}
# 浏览器请求信息
useragent {
source => "agent"
target => "useragent"
}
}
output{
elasticsearch{ # 配置elasticsearch接收数据信息
hosts=>["127.0.0.1:9200"] # 配置elasticsearch端口信息
index=>"es-%{+YYYY.MM.dd}" # 配置Kibana新建index,比如我这里是”log-“开头,在Kibana可以写”log-*”
}
stdout { codec => rubydebug }
}
然后各自启动rabbitmq、logstash、elasticSearch、kibana 然后在项目中做好配置: logback-spring.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property name="springAppName" value="LogstashTest"/>
<include resource="org/springframework/boot/logging/logback/base.xml" />
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--配置logStash 服务地址-->
<destination>127.0.0.1:9900</destination>
<!-- 日志输出编码 -->
<encoder charset="UTF-8"
class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder">
<providers>
<timestamp>
<timeZone>UTC</timeZone>
</timestamp>
<pattern>
<pattern>
{
"logLevel": "%level",
"serviceName": "${springAppName:-}",
"pid": "${PID:-}",
"thread": "%thread",
"class": "%logger{40}",
"message": "%message"
}
</pattern>
</pattern>
</providers>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="LOGSTASH" />
<appender-ref ref="CONSOLE" />
</root>
</configuration>
然后测试发送日志,可以在logstash的启动命令页面看到如下: 说明日志已经通过logstash发送到消息队列,然后再在elasticsearch中接收就可以了
|