IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringBoot写入ES(采用Kafka+logstash+ES)方式 -> 正文阅读

[Java知识库]SpringBoot写入ES(采用Kafka+logstash+ES)方式

兄弟们兄弟们。我又来了。

之前Java程序直接写入ES。 不知道有时候网络不好。还是啥情况。

我这个写入就超时了。 然后这条消息就会丢失。 系统也嗷嗷报错。针对于这个情况。

修改了一下 写入ES的方式。

程序将消息放入-> Kafka -> logstash进行消费后输出至 -> ES中。

好了。废话不多说。老样子。大象装冰箱一共分3步

步骤1、Java程序集成kafka-》用的springBoot

maven-加入jar包

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

spring配置文件。增加配置

spring:
  kafka:
    bootstrap-servers: kafka地址
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1

增加kafka发送处理类

@Service
@Slf4j
public class KafkaServiceImpl implements KafkaService {

  @Autowired
  private KafkaTemplate<String, Object> kafkaTemplate;

  public static final String TOPIC_STR = "pile-order-charge-process";

  @Override
  public void send (OrderChargeProcessEs orderChargeProcessEs) {
    String obj2String = JSONObject.toJSONString(orderChargeProcessEs);
    log.info("准备发送消息为:{}", obj2String);
    //发送消息
    ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC_STR, obj2String);
    future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
      @Override
      public void onFailure (Throwable throwable) {
        //发送失败的处理
        log.error(TOPIC_STR + " - 生产者 发送消息失败:" + throwable.getMessage());
      }

      @Override
      public void onSuccess (SendResult<String, Object> stringObjectSendResult) {
        //成功的处理
        //log.info(TOPIC_STR + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
      }
    });
  }
}

步骤2、修改logstash管道配置、修改配置文件

管道的配置文件

input {
  kafka{
    bootstrap_servers => "kafka地址"
    topics_pattern  => "pile-order-charge-process.*"
    consumer_threads => 12
    decorate_events => true
    codec => "json"
    auto_offset_reset => "latest"
    group_id => "pile-order-charge-process"
  }
}
filter {
}
output {
  elasticsearch {
    hosts => ["es集群地址]
    user => "es账号"
    password => "es密码"
    index => "pile-order-charge-process-%{+YYYY.MM}"
  }
}

logstash的logstash.yml配置文件-将对应的管道ID设置进来。 对应上面创建的管道ID

xpack.management.pipeline.id: ["pile-order-charge-process"]

步骤三、重启logstash、kafka观察主体、kibana上观察logstash消费情况

通过kafka-manager看到队列已经创建出来了

?

通过kibana可以看到。管道已经创建。并有输入输出的数据了

本期内容到此为止了。兄弟们。 如果觉得有用 。真的帮助到你了。 麻烦兄弟给我点个赞

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-03-31 23:49:43  更:2022-03-31 23:51:14 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 7:26:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码