IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 通过logstash消费kafka数据到elasticsearch -> 正文阅读

[大数据]通过logstash消费kafka数据到elasticsearch

logstash是什么?

Logstash是一个开源数据收集引擎,具有实时管道功能。 Logstash可以动态地将来自不同数据源的数据统一起来,并将数据标准化到你所选择的目的地。 数据往往以各种各样的形式,或分散或集中地存在于很多系统中。
在这里插入图片描述

关于logstash的介绍官网写的非常详细,这里就不再过多介绍了,感兴趣的小伙伴可以自行去官网查看,我们今天的主要目的是如果通过logstashkafka消费数据,并对数据进行简单的处理然后写入到elasticsearch

一、运行kafka并写入测试数据

我们直接通过docker在我们的本地运行一个kafka服务端,执行下面的命令即可:

docker run -d --name kafka -p 9092:9092 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=192.168.0.66:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.0.66:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t bitnami/kafka:2.8

kafka运行起来之后我们向kfka中写入测试数据,我这里直接通过kafka tools工具添加了一些测试数据,写入完毕之后我们可以使用kafka tools工具来查看kafka中数据写入是否正常
在这里插入图片描述

二、运行elasticsearch

还是和运行kafka一样的方法,我们直接通过docker来快速运行一个elasticsearch服务端,执行下面的命令即可:

docker run --name elasticsearch -d -e ES_JAVA_OPTS="-Xms512m -Xmx512m" -e discovery.type=single-node -e network.host=0.0.0.0 -p 9200:9200 -p 9300:9300 elasticsearch:7.16.2

elasticsearch运行起来之后我们可以通过Google浏览器,安装ElasticSearch Head插件,通过插件直接输入我们的elasticsearch地址,查看当前elasticsearch运行是否正常
在这里插入图片描述

三、编写logstash.conf文件

到这里我们今天的主角就要登场了,在我们运行logstash之前,我们先要创建一个logstash.conf文件,为什么需要创建这文件呢?我们之前说了,我们需要把kafka中的数据消费到elasticsearch中,那么我们总的告诉logstash从哪里消费,IP端口这些信息吧,所以我们需要来配置这些东西,关于logstash.conf中的语法,很简单,我这里就不再细说,如果有不懂的小伙伴可以直接查看官方文档,写的也非常的详细。下面是logstash.conf的完整配置

input{
      kafka{
        bootstrap_servers => "192.168.0.66:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest" 
        consumer_threads => 1
        decorate_events => true 
        topics => ["kingmouse","kingmouse01"]
      }
}
output {

  if [@metadata][kafka][topic] == "kingmouse" {
        elasticsearch {
          hosts => "http://192.168.0.66:9200"
          index => "kingmouse-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "kingmouse01" {
        elasticsearch {
          hosts => "http://192.168.0.66:9200"
          index => "kingmouse01-data"
          timeout => 300
        }
    }
  stdout {}
}

input:就是我们的数据源,我们需要从kafka消费数据,我这里就写kafka就ok了,我这里给大家演示的是消费多个topic,所以topics中写了两个
output: 就是我们要输出的地方,这里我们需要写入elasticsearch中,所以我们直接判断当前消费的数据属于哪个topic中,然后写入不通的elasticsearch索引中。
stdout:控制台打印日志,我们这里加上主要为了调试,生产环境可以不加

四、启动logstash

编写完logstash.conf之后我们就可以启动我们的logstash了,这里我们依然使用docker来启动,所以我们需要将我们刚刚编写的配置文件通过挂卷的方式挂在到容器内部,执行下面的命令即可:

docker run --name logstash -d -e xpack.monitoring.enabled=false -v /Users/kingmouse/docker/logstash:/config-dir logstash:7.17.0 -f /config-dir/logstash.conf  

五、验证

进入logstash容器内查看启动日志,如果看到下面这些信息,说明logstash已经从kafka中消费到了数据
在这里插入图片描述

我们进入elasticsearch看是否有新建了索引
在这里插入图片描述
我们可以查看对应的数据是否正确
在这里插入图片描述
到这里我们我们通过logstashkafka消费数据到elasticsearch就已经完成了,但是,还有一个小问题,我们发现写入elasticsearch的数据是一个json串,这个不便于我们去查看、搜索,所以我们希望能够按照json的字段来展示,这个也很简单,我们删除当前的索引,然后在logstash.conf中添加如下代码即可:

type => "json"
        codec => json {
            charset => "UTF-8"
        } 

最终完整如下:

input{
      kafka{
        bootstrap_servers => "192.168.0.66:9092"
        client_id => "consumer_id"
        group_id => "consumer_group"
        auto_offset_reset => "latest" 
        consumer_threads => 1
        decorate_events => true 
        topics => ["kingmouse","kingmouse01"]
        type => "json"
        codec => json {
            charset => "UTF-8"
        } 
      }
}
output {

  if [@metadata][kafka][topic] == "kingmouse" {
        elasticsearch {
          hosts => "http://192.168.0.66:9200"
          index => "kingmouse-data"
          timeout => 300
        }
    }

  if [@metadata][kafka][topic] == "kingmouse01" {
        elasticsearch {
          hosts => "http://192.168.0.66:9200"
          index => "kingmouse01-data"
          timeout => 300
        }
    }
  stdout {}
}

配置完成我们重启logstash(记得修改消费组消费者id哟),重启完成我们再次查看elasticsearch中数据
在这里插入图片描述

六、结束语

是不是很简单,logstash功能非常的强大,今天只是给大家简单聊一下logstash一些基础使用,后续还会继续补充关于logstash的更多用法

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-05-08 08:10:56  更:2022-05-08 08:13:52 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 8:56:51-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码