IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> 【Spring Boot 集成应用】RocketMQ的集成用法(上) -> 正文阅读

[Java知识库]【Spring Boot 集成应用】RocketMQ的集成用法(上)

1. RocketMQ集成介绍

在金融互联网领域广泛应用,在阿里双11活动经历过多次考验, 经过严苛的生产验证,有比较高的可靠性,在数据处理上有比较高的稳定性, 能从最大程度上保证消息不易丢失,如果业务上有一定的规模, 且对数据的一致性,稳定性要求严苛, 那么可以采用RocketMQ, 比如金融互联网领域, 支付场景、交易场景等。如果有借助消息队列实现分布式事务, RocketMQ可以作为首选。

Spring Boot 官方提供了spring-boot-starter-activemq 对ActiveMQ的支持, 但并没有提供对RocketMQ的支持, 这不代表Spring Boot 本身不支持, RocketMQ 官方给我们提供了RocketMQ-Spring 框架, 整合了RocketMQ与Spring Boot, 主要提供3个特性:

  1. 使用 RocketMQTemplate 用来统一发送消息,包括同步、异步发送消息和事务消息
  2. @RocketMQTransactionListener 注解用来处理事务消息的监听和回查
  3. @RocketMQMessageListener 注解用来消费消息
2. RocketMQ安装说明

简要安装说明, 可参考RocketMQ的官方文档。

  1. 下载RocketMQ 安装文件

    如果不能连接, 采用其他镜像下载: https://mirrors.cloud.tencent.com/apache/rocketmq/4.3.2/rocketmq-all-4.3.2-bin-release.zip

  2. 启动 NameServer

    nohup sh bin/mqnamesrv &
    tail -f ~/logs/rocketmqlogs/namesrv.log
    
  3. 启动Broker

    nohup sh bin/mqbroker -n localhost:9876 &
    tail -f ~/logs/rocketmqlogs/broker.log
    
3. RocketMQ集成配置

采用RocketMQ官方提供得rocketmq-spring-boot-starter作为集成组件。

  1. 创建spring-boot-mq-rocket父级工程
    在这里插入图片描述

    MAVEN依赖:

    <properties>
            <rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>
    </properties>
    
    <dependencies>
        <!-- RocketMq与Spring Boot 集成组件依赖-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>${rocketmq-spring-boot-starter-version}</version>
        </dependency>
    </dependencies>
    
  2. 创建rocketmq-basic工程
    在这里插入图片描述

    工程依赖直接继承父级依赖, 无须添加其他依赖组件。

    工程配置:

    application.yml文件:

    server:
      port: 12613
    spring:
      application:
        name: rocketmq-basic
    
    # RocketMQ配置
    rocketmq:
      name-server: 10.10.20.15:9876
      producer:
        group: basic-group
    
    

    配置填写RocketMQ地址信息, 如果是集群,多个以逗号分割。

  3. 创建启动类

    com.mirson.spring.boot.mq.rocket.basic.startup.RocketMqBasicApplication

    @SpringBootApplication
    @ComponentScan(basePackages = {"com.mirson"})
    public class RocketMqBasicApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(RocketMqBasicApplication.class, args);
        }
    }
    

    扫描包含com.mirson包下所有路径。

4. RocketMQ集成之普通消息处理
  1. 定义监听器

    com.mirson.spring.boot.mq.rocket.basic.consume.StringConsumer:

    @Service
    @RocketMQMessageListener(topic = RabbitMqConfig.TOPIC, consumerGroup = RabbitMqConfig.CONSUME_GROUP_STRING)
    @Log4j2
    public class StringConsumer implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String message) {
            log.info("StringConsumer => receive: " + message);
        }
    }
    
    
    • 订阅的主题为RabbitMqConfig.TOPIC, 订阅的分组为RabbitMqConfig.CONSUME_GROUP_STRING。
    • 实现RocketMQListener接口, 将接收的消息通过日志打印。
  2. 提供接口

    com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller

    @RestController
    @Log4j2
    public class RocketMqProviderContorller {
    
        @Resource
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 生产者发送字符类型消息
         * @return
         */
        @GetMapping("/sendString")
        public String sendString() {
            String msg = "random number: " + RandomUtils.nextInt(0, 100);
            // Send string
            SendResult sendResult = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC, msg);
            log.info("send result: " + sendResult.getSendStatus());
            return msg;
        }
        ...
    }
    
    

    提供sendString接口, 每次请求发送一个随机数, 通过RocketMQTemplate的syncSend同步方法发送数据。

    如果发送成功, 会返回状态: SEND_OK。

  3. 调用验证

    • 访问接口地址:http://127.0.0.1:12613/sendString
      在这里插入图片描述

    • 查看监听器日志
      在这里插入图片描述
      可以看到, String类型的普通消息监听器, 正常接收到消息。

5. RocketMQ集成之原生消息处理

RocketMQ原生消息,除了发送的数据, 还可以获取RocketMQ内置的系统信息, 比如消息ID, 主机名称,时间戳, 队列信息等。

  1. 定义监听器

    com.mirson.spring.boot.mq.rocket.basic.consume.MessageExtConsumer

    @Service
    @RocketMQMessageListener(topic = RabbitMqConfig.TOPIC_EXT,  selectorExpression = "tag1", consumerGroup = RabbitMqConfig.CONSUME_GROUP_EXT)
    @Log4j2
    public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener{
        @Override
        public void onMessage(MessageExt message) {
            log.info("MessageExtConsumer => receive msgId:{}, msgData:{} ", message.getMsgId(), new String(message.getBody()));
        }
    
        /**
         * 自定义消费者的开始位置,这里设置的是当前时间
         * @param consumer
         */
        @Override
        public void prepareStart(DefaultMQPushConsumer consumer) {
            // set consumer consume message from now
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
        }
    }
    
    
    • 订阅的主题为RabbitMqConfig.TOPIC_EXT, 订阅的Group为RabbitMqConfig.CONSUME_GROUP_EXT。打印接收到的消息ID与数据。

    • 实现RocketMQPushConsumerLifecycleListener接口, 可以自定义消费者的开始消息位置, 这里设置的是当前时间。

  2. 提供发送接口

    com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller, 增加接口:

    /**
         * 发送RocketMQ 原生消息
         * @return
         */
    @GetMapping("/sendStringExt")
    public String sendStringExt() {
        String msg = "random number: " + RandomUtils.nextInt(0, 100);
        try {
            SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_EXT + ":tag1", msg);
            log.info("result:  " + result.getSendStatus());
        }catch(Exception e) {
            log.error(e.getMessage(), e);
        }
        // Send String Ext Message
        return msg;
    }
    
    • 发送一个随机数, 增加了一个tag1标记, 与上面RocketMQMessageListener注解中的selectorExpression需保持一致, 如不匹配, 不能收到对应消息。
    • 与正常发送方式没有差异, 不需做额外处理, 仍采用同步方式发送。
  3. 测试验证

    • 访问接口:

      http://127.0.0.1:12613/sendStringExt
      在这里插入图片描述

    • 查看监听器日志
      在这里插入图片描述

      能够正常接收到消息, 并打印出了Rocketmq封装的消息ID。

6. RocketMQ集成之Spring Message消息

Spring Message 是一种消息传输规范, RocketMQ可以支持, 在Spring Cloud Stream 中采用的就是Spring Message作为消息传输规范, 这是一个用于构建基于消息的微服务应用框架。

  1. 定义传输对象

    在实际消息交互当中, 不会传输简单的数据结构, 一般传递的是业务对象,这里定义一个订单对象:

    com.mirson.spring.boot.mq.rocket.basic.bo.Order

    @Data
    public class Order implements Serializable {
    
        private static final long serialVersionUID = -1L;
    
        /**
         * 订单ID
         */
        private String orderId;
    
        /**
         * 创建时间
         */
        private Date createDate;
    
    }
    

    消息交互当中, 默认会通过序列化传递, 需要实现序列化接口。

  2. 定义监听器

    com.mirson.spring.boot.mq.rocket.basic.consume.OrderSpringMessageConsumer

    @Service
    @RocketMQMessageListener(topic = RabbitMqConfig.TOPIC_SPRING_MESSAGE, consumerGroup = RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE)
    @Log4j2
    public class OrderSpringMessageConsumer implements RocketMQListener<Order> {
    
        @Override
        public void onMessage(Order order) {
            log.info("OrderSpringMessageConsumer => receive order: " + order);
        }
    
    }
    
    
    • 定义不同的主题以区分, 这里订阅的主题为RabbitMqConfig.TOPIC_SPRING_MESSAGE, 组别为RabbitMqConfig.CONSUME_GROUP_SPRING_MESSAGE。
    • 实现RocketMQListener接口, 泛型为Order; 打印接收到的订单数据。
  3. 定义发送接口

    com.mirson.spring.boot.mq.rocket.basic.provider.RocketMqProviderContorller

    /**
         * 发送RocketMQ Spring Message封装消息
         * @return
         */
    @GetMapping("/sendSpringMessage")
    public String sendSpringMessage() {
        String msg = "random number: " + RandomUtils.nextInt(0, 100);
        Order order = new Order();
        order.setOrderId(UUID.randomUUID().toString());
        order.setCreateDate(new Date());
    
        // Send Spring Message With Order
        SendResult result = rocketMQTemplate.syncSend(RabbitMqConfig.TOPIC_SPRING_MESSAGE, MessageBuilder.withPayload(order).build());
        log.info("send result: " + result.getSendStatus());
        return msg;
    }
    
    
    • 创建一个订单对象, 生成UUID作为订单ID, 设置订单创建时间。
    • 采用同步方式发送, 指定主题RabbitMqConfig.TOPIC_SPRING_MESSAGE, 注意, Spring Message封装采用MessageBuilder, 将订单放入playload包体里面,调用build方法进行序列化。
  4. 测试验证

    • 调用发送接口

      http://127.0.0.1:12613/sendSpringMessage
      在这里插入图片描述

    • 查看监听器日志
      在这里插入图片描述

      能够正常接收并打印出完整的订单数据。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-08-06 10:29:56  更:2022-08-06 10:32:13 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:15:40-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码