logstash是什么?
Logstash 是一个开源数据收集引擎,具有实时管道功能。 Logstash 可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。 数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
关于logstash 的介绍官网写的非常详细,这里就不再过多介绍了,感兴趣的小伙伴可以自行去官网查看,我们今天的主要目的是如果通过logstash 从kafka 消费数据,并对数据进行简单的处理然后写入到elasticsearch 中
一、运行kafka并写入测试数据
我们直接通过docker在我们的本地运行一个kafka 服务端,执行下面的命令即可:
docker run -d --name kafka -p 9092:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.66:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.66:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t bitnami/kafka:2.8
kafka 运行起来之后我们向kfka 中写入测试数据,我这里直接通过kafka tools 工具添加了一些测试数据,写入完毕之后我们可以使用kafka tools 工具来查看kafka 中数据写入是否正常
二、运行elasticsearch
还是和运行kafka 一样的方法,我们直接通过docker 来快速运行一个elasticsearch 服务端,执行下面的命令即可:
docker run --name elasticsearch -d -e ES_JAVA_OPTS="-Xms512m -Xmx512m" -e discovery.type=single-node -e network.host=0.0.0.0 -p 9200:9200 -p 9300:9300 elasticsearch:7.16.2
elasticsearch 运行起来之后我们可以通过Google 浏览器,安装ElasticSearch Head 插件,通过插件直接输入我们的elasticsearch 地址,查看当前elasticsearch 运行是否正常
三、编写logstash.conf 文件
到这里我们今天的主角就要登场了,在我们运行logstash 之前,我们先要创建一个logstash.conf 文件,为什么需要创建这文件呢?我们之前说了,我们需要把kafka 中的数据消费到elasticsearch 中,那么我们总的告诉logstash 从哪里消费,IP端口这些信息吧,所以我们需要来配置这些东西,关于logstash.conf 中的语法,很简单,我这里就不再细说,如果有不懂的小伙伴可以直接查看官方文档,写的也非常的详细。下面是logstash.conf 的完整配置
input{
kafka{
bootstrap_servers => "192.168.0.66:9092"
client_id => "consumer_id"
group_id => "consumer_group"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["kingmouse","kingmouse01"]
}
}
output {
if [@metadata][kafka][topic] == "kingmouse" {
elasticsearch {
hosts => "http://192.168.0.66:9200"
index => "kingmouse-data"
timeout => 300
}
}
if [@metadata][kafka][topic] == "kingmouse01" {
elasticsearch {
hosts => "http://192.168.0.66:9200"
index => "kingmouse01-data"
timeout => 300
}
}
stdout {}
}
input:就是我们的数据源,我们需要从kafka 消费数据,我这里就写kafka 就ok了,我这里给大家演示的是消费多个topic ,所以topics 中写了两个 output: 就是我们要输出的地方,这里我们需要写入elasticsearch 中,所以我们直接判断当前消费的数据属于哪个topic 中,然后写入不通的elasticsearch 索引中。 stdout:控制台打印日志,我们这里加上主要为了调试,生产环境可以不加
四、启动logstash
编写完logstash.conf 之后我们就可以启动我们的logstash 了,这里我们依然使用docker 来启动,所以我们需要将我们刚刚编写的配置文件通过挂卷的方式挂在到容器内部,执行下面的命令即可:
docker run --name logstash -d -e xpack.monitoring.enabled=false -v /Users/kingmouse/docker/logstash:/config-dir logstash:7.17.0 -f /config-dir/logstash.conf
五、验证
进入logstash 容器内查看启动日志,如果看到下面这些信息,说明logstash 已经从kafka 中消费到了数据
我们进入elasticsearch 看是否有新建了索引 我们可以查看对应的数据是否正确 到这里我们我们通过logstash 从kafka 消费数据到elasticsearch 就已经完成了,但是,还有一个小问题,我们发现写入elasticsearch 的数据是一个json 串,这个不便于我们去查看、搜索,所以我们希望能够按照json 的字段来展示,这个也很简单,我们删除当前的索引,然后在logstash.conf 中添加如下代码即可:
type => "json"
codec => json {
charset => "UTF-8"
}
最终完整如下:
input{
kafka{
bootstrap_servers => "192.168.0.66:9092"
client_id => "consumer_id"
group_id => "consumer_group"
auto_offset_reset => "latest"
consumer_threads => 1
decorate_events => true
topics => ["kingmouse","kingmouse01"]
type => "json"
codec => json {
charset => "UTF-8"
}
}
}
output {
if [@metadata][kafka][topic] == "kingmouse" {
elasticsearch {
hosts => "http://192.168.0.66:9200"
index => "kingmouse-data"
timeout => 300
}
}
if [@metadata][kafka][topic] == "kingmouse01" {
elasticsearch {
hosts => "http://192.168.0.66:9200"
index => "kingmouse01-data"
timeout => 300
}
}
stdout {}
}
配置完成我们重启logstash (记得修改消费组 和消费者id 哟),重启完成我们再次查看elasticsearch 中数据
六、结束语
是不是很简单,logstash 功能非常的强大,今天只是给大家简单聊一下logstash 一些基础使用,后续还会继续补充关于logstash 的更多用法
|