IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> Springboot 整合 kafka 实现消息的发布和订阅 -> 正文阅读

[大数据]Springboot 整合 kafka 实现消息的发布和订阅

Kafka 是一个分布式、高吞吐量、可持久性和自动负载均衡的消息队列。它在实现了传统意义上的 MQ 功能的同时,也可以作为大数据的流处理平台。

简单来说,Kafka 就是一个高吞吐量的分布式发布订阅消息系统。

Kafka 的用法跟 RabbitMQ 用法相同,都是作为一个消息中间件收发消息,下面介绍的是 Springboot 微服务集成 Kafka,已经简单的用法说明。

依赖

Spring 有专门支持 Kafka 的依赖,引入 Spring 对应版本支持的 Kafka 依赖即可,如下

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

配置

spring:
  kafka:
    bootstrap-servers: ${KAFKA_HOST:192.168.0.105:9092}
    #=============== producer  =======================
    producer:
      #如果该值大于零时,表示启用重试失败的发送次数
      retries: 0
      #每当多个记录被发送到同一分区时,生产者将尝试将记录一起批量处理为更少的请求,默认值为16384(单位字节)
      batch-size: 16384
      #生产者可用于缓冲等待发送到服务器的记录的内存总字节数,默认值为3355443
      buffer-memory: 33554432
      #key的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value的Serializer类,实现类实现了接口org.apache.kafka.common.serialization.Serializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #=============== consumer  =======================
    consumer:
      #用于标识此使用者所属的使用者组的唯一字符串
      group-id: consumer-group-default
      #当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量
      #可选的值为latest, earliest, none
      auto-offset-reset: earliest
      #消费者的偏移量将在后台定期提交,默认值为true
      enable-auto-commit: true
      #如果'enable-auto-commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 100
      #密钥的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #值的反序列化器类,实现类实现了接口org.apache.kafka.common.serialization.Deserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

默认 value-serializer 使用 org.apache.kafka.common.serialization.StringSerializer ,只支持文本消息。自定义 org.springframework.kafka.support.serializer.JsonSerializer 可以让消息支持其他类型。

使用示例

新建消息实体类

public class Message {
    private Long id;
    private String content;
    private Date sendTime;
	
	// constructor、getter、setter...
}

消息生产者控制器

@RestController
@RequestMapping("/kafka/producer")
public class KafkaProducerController {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerController.class);

    private static final String TOPIC = "topic-test";

    private KafkaTemplate kafkaTemplate;

    public KafkaProducerController(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    @PostMapping("/push")
    public ResponseEntity<String> pushMessage(@RequestBody Message message) {
        Date time = new Date();
        message.setSendTime(time);
        kafkaTemplate.send(TOPIC, JSON.toJSONString(message)).addCallback(success
                -> LOGGER.info("{}-生产者发送消息成功:{},时间:{}", TOPIC, success, time), failure
                -> LOGGER.error("{}-生产者发送消息失败:{}", failure.getMessage()));
        return new ResponseEntity<>("success", HttpStatus.OK);
    }
}

消息消费者监听

@Component
public class KafkaConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumer.class);

    private static final String TOPIC = "topic-test";

    @KafkaListener(topics = {TOPIC})
    public void testConsumer(String body) {
        LOGGER.info("消费时间: {}", new Date());
        Message message = JSON.parseObject(body, Message.class);
        LOGGER.info("topic: {}, 消费消息内容: {}", TOPIC, message);
    }
}

上边的示例是生产者发送消息到 topic-test,消费者以默认组 consumer-group-default 身份监听 topic-test 消费消息,监听器用 @KafkaListener 注解,topics 属性表示监听的topic,支持同时监听多个,用英文逗号分隔,如果需要使用指定组身份消费消息,可通过注解中的 groupId 属性指定。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-14 22:23:12  更:2021-10-14 22:23:22 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 8:02:59-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码