IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> Spring Boot整合Kafka -> 正文阅读

[Java知识库]Spring Boot整合Kafka

一、Spring Boot整合Kafka

创建 SpringBoot项目,引入 kafka依赖:

        <!-- Springboot整合 Kafka使用。注意:版本一致   -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.1</version>
        </dependency>

1、yaml配置文件

在配置文件中,配置Kafka服务信息。

spring:
  application:
    name: kafka-springboot
  # kafka配置信息
  kafka:
    bootstrap-servers: 192.168.xxx.xxx:9092 # 集群用逗号分隔
    producer: # 生产者
      retries: 3 # 失败重试次数
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # 消费者
      group-id: default-group # 消费组
      enable-auto-commit: false
      # auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: MANUAL_IMMEDIATE

配置参数不了解的,请查看之前文章,或者查看官方文档。

这里主要说明一下 listener配置参数:

  • RECORD:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  • BATCH:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  • TIME:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  • COUNT:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  • COUNT_TIME:TIME | COUNT 有一个条件满足时提交
  • MANUAL:当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  • MANUAL_IMMEDIATE:手动调用Acknowledgment.acknowledge()后立即提交

作用就是影响消费者监听器(ListenerConsumer)处理之后提交动作。一般我们选用手动提交。

在这里插入图片描述

2、生产者

使用 KafkaTemplate类的 send方法发送消费。

在这里插入图片描述

2.1 分区发送

@Service
public class Producer {
    private final static String TOPIC_NAME = "my-springboot-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMsg(int orderId) {
        //1 构建消息数据
        Order order = new Order(orderId, "订单-" + orderId, 1000.00);

        //2.发送消息
        // 未指定分区发送
        kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
        // 指定分区发送
        //kafkaTemplate.send(TOPIC_NAME, 0, String.valueOf(order.getOrderId()), JSON.toJSONString(order));

    }
}

2.2 同步发送

    /**
     * 同步发送
     * @param orderId
     */
    public void syncSendMsg(int orderId) {
        //1 构建消息数据
        Order order = new Order(orderId, "订单-" + orderId, 1000.00);

        //2.发送消息
        // 未指定分区发送
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
        SendResult<String, String> sendResult = null;
        try {
            sendResult = future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        RecordMetadata metadata = sendResult.getRecordMetadata();
        ProducerRecord<String, String> producerRecord = sendResult.getProducerRecord();
        System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    }

2.3 异步发送

1)首先需要自定义一个生产者监听器类

自定义生产者发送结果监听类,需要实现 ProducerListener类

  • 发送消息成功则会回调用 onSuccess方法,
  • 发送消息失败则会回调用 onError方法。
@Component
public class MyKafkaProducerSendResultListener implements ProducerListener {

	@Override
	public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
		System.out.println("消息发送成功 : " + producerRecord.toString());
	}

	@Override
	public void onError(ProducerRecord producerRecord, RecordMetadata recordMetadata, Exception exception) {
		System.out.println("消息发送成功,exception=" + exception.getMessage());
	}
}

2)添加生产者监听器类之后在发送消息

    @Autowired
    private MyKafkaProducerSendResultListener myKafkaProducerSendResultListener;
    /**
     * 异步发送
     * @param orderId
     */
    public void asyncSendMsg(int orderId) {
        //1 构建消息数据
        Order order = new Order(orderId, "订单-" + orderId, 1000.00);

        //2.发送消息
        // 添加发送回调监听
        kafkaTemplate.setProducerListener(myKafkaProducerSendResultListener);
        // 未指定分区发送
        kafkaTemplate.send(TOPIC_NAME, String.valueOf(order.getOrderId()), JSON.toJSONString(order));
    }
}

3、消费者

使用 @KafkaListener注解来注入消费者。常见参数如下:

@KafkaListener(
    groupId = "mySpringBootGroup", 
    topicPartitions = {
        @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
        @TopicPartition(
            topic = "topic2", 
            partitions = "0",
            partitionOffsets = @PartitionOffset(
                partition = "1", 
                initialOffset = "100"))
          },
        concurrency = "6"
)
  • group-id:表示消费组,如果没有指定,则会使用配置文件中设置的默认的groupId。
  • topicPartitions:一个消费组可以消费多个主题分区
  • TopicPartition:主题分区相关
  • concurrency:同组下的消费者个数,必须小于等于分区总数,大于意义不大,没必要大于。

3.1 多消费组消费主题

如果一个主题要被多个消费组消费,那么我们使用 @KafkaListener注解来注入多个消费者即可。

@Component
public class MyConsumer {

    @KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup")
    public void listenConsumerGroup1(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        //System.out.println(record);
        System.out.printf("ConsumerGroup1收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
        //手动提交offset
        ack.acknowledge();
    }

    //配置多个消费组
    @KafkaListener(topics = "my-springboot-topic",groupId = "mySpringBootGroup2")
    public void listenConsumerGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.printf("ConsumerGroup2收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), value);
        //手动提交offset
        ack.acknowledge();
    }
}

4、启动项目

启动项目,从打印信息看出,每个消费者都会输出其配置信息。

在这里插入图片描述
单元测试ok:
在这里插入图片描述

有了之前 Kafka客户端API的使用,Spring Boot整合Kafka使用就更加简单了。

传送门(Kafka客户端API):https://blog.csdn.net/qq_42402854/article/details/124994563

– 求知若饥,虚心若愚。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-01 15:02:20  更:2022-06-01 15:03:35 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 20:30:58-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码