IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringCloud Stream消息驱动 -> 正文阅读

[Java知识库]SpringCloud Stream消息驱动

SpringCloud Stream消息驱动

1、SpringCloud Stream概述

官方地址:https://spring.io/projects/spring-cloud-stream#overview

中文指导手册地址:https://m.wang1314.com/doc/webapp/topic/20971999.html

什么是SpringCloudStream官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念

目前仅支持RabbitMQ、Kafka。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-27725bvD-1667253719470)(image/82、springcloudStream.png)]

1.1、设计思想

1、标注的MQ流程

  1. 生产者/消费者之间靠消息媒介传递信息内容【massage】

  2. 消息必须走特定的通道【消息通道MessageChannel】

  3. 消息通道里的消息如何被消费呢,谁负责收发处理

    消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅

2、Cloud Stream的作用

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。

3、什么是Binder

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离

通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E9vTw92q-1667253719471)(image/83、stream处理架构.png)]

Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

4、Stream中的消息通信方式遵循了发布-订阅模式

使用Topic主题进行广播

  1. 在RabbitMQ就是Exchange
  2. 在Kakfa中就是Topic

1.2、标准的流程套路

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-miZBVQsy-1667253719471)(image/84、stream流程.png)]

1、Binder:很方便的连接中间件,屏蔽不同的差异

2、Channel

通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置

3、Source和Sink

简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

1.3、编码API和常用注解

组成和注解描述
Middleware中间件,目前只支持RabbitM和Kafka
BinderBinder是应用与消息中间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过通道离开应用程序
@StreamListener监听队列,用户消费者的队列的消息接收
@EnableBinding指通道channel和exchange绑定在一起

RabbitMQ环境必须已经OK,可正常连接使用,否则后面的步骤都实现不了

2、消息驱动之生产者(output)

2.1、新建模块cloud-stream-rabbitmq-provider8801

2.2、引入pom.xml配置文件

如果是需要Stream整合的就将依赖改为spring-cloud-starter-stream-kafka

<dependencies>
    <!--stream整合rabbit依赖-->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!--基础配置-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-devtools</artifactId>
        <scope>runtime</scope>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

2.3、YAML配置文件

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称,消息生产者
          destination: studyExchange # 表示要使用的Exchange名称定义【自定义】
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置【上面的配置】

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

2.4、生产者启动类

 package com.zcl.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * 描述:消息生产者启动类
 *
 * @author zhong
 * @date 2022-09-22 12:19
 */
@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}

2.5、业务实现

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fRuCAr6g-1667253719472)(image/85、Stream业务流程.png)]

2.5.1、服务接口实现类

自己创建一个实现的接口以及里面的方法

注意:在这个服务实现类里面不是使用@Service注解了,因为不是web应用,而是Stream消息驱动,是与中间件进行打交道的不是与数据库

package com.zcl.springcloud.service.Impl;

import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;

import javax.annotation.Resource;
import java.util.UUID;

/**
 * 描述:发送接口实现类
 * 必须使用@EnableBinding(Source.class)注解开启消息推送管道
 *
 * @author zhong
 * @date 2022-09-22 12:24
 */
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {

    /**
     * 消息发送管道
     */
    @Resource
    private MessageChannel output;

    /**
     * 发送消息
     * @return
     */
    @Override
    public String send() {
        // 定义消息
        String serial = UUID.randomUUID().toString();
        // 构建并发送消息
        this.output.send(MessageBuilder.withPayload(serial).build());
        log.info("-------------- " + serial + " ----------------");
        return serial;
    }
}

2.5.2、控制器实现

package com.zcl.springcloud.controller;

import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * 描述:消息发送控制器
 *
 * @author zhong
 * @date 2022-09-22 12:37
 */
@RestController
public class SendMessageController {

    /**
     * 注入消息发送管道接口
     */
    @Resource
    private IMessageProvider messageProvider;

    /**
     * 每调用一次接口发送一次消息
     * @return
     */
    @GetMapping(value = "/sendMessage")
    public String sendMessage()
    {
        return messageProvider.send();
    }
}

2.6、启动测试

  1. 启动7001Eureka访问中心

  2. 启动8801消息发送者,启动成功以及观察RabbitMQ的管理界面

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WEiWfTC1-1667253719472)(image/86、exchanges.png)]

  3. 访问接口发送消息,查看MQ的管理页面波峰情况

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7jCEVaRO-1667253719473)(image/87、请求波峰.png)]

3、消息驱动之消费者(input)

同样的参考如下流程图

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YVU2tltV-1667253719473)(image/85、Stream业务流程.png)]

3.1、新建cloud-stream-rabbitmq-consumer8802模块

3.2、引入pom.xml依赖

与8801一样

3.3、添加YAML配置文件

配置文件与消息生产的区别在于:

output: # 这个名字是一个通道的名称
	destination: studyExchange # 表示要使用的Exchange名称定义
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka

3.4、添加启动类StreamMQMain8802

与消息生产者一样

3.5、业务实现

必须要有@Component注解注入到Spring容器中

package com.zcl.springcloud.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

/**
 * 描述:消息消费者控制器
 *
 * @author zhong
 * @date 2022-09-22 13:18
 */
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {

    /**
     * 注入消费者的端口号
     */
    @Value("${server.port}")
    private String port;

    /**
     * 监听消息
     * @param message
     * @return
     */
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消费者1号接收到的消息 ----- " + message.getPayload() + " -----,port: " + port);
    }
}

3.6、启动项目测试

  1. 启动7001

  2. 启动8801,消息发送者

  3. 启动8802,消息消费者

  4. 8801发送消息,8802消费消息,并查看具体的MQ波峰图

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0YxjInQ0-1667253719474)(image/88、MQ消费者绑定.png)]

    控制器输出

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-laGjKNvM-1667253719474)(image/89、发送接收输出.png)]

4、分组消费与持久化

4.1、完整参考cloud-stream-rabbitmq-consumer8802,创建8803项目

除了启动的端口号不一样之外其他的配置都一样

4.2、启动项目发现问题

  1. 启动7001(Eureka服务中心)
  2. 启动8801(生产)、8802(消费)、8803(消费)
  3. 测试发送消失是否两个消费者都可以接收到

4.2.1、重复消费

目前是8802/8803同时都收到了,存在重复消费问题

解决方案:分组和持久化属性group

常见案例

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g9CqfKZW-1667253719475)(image/90、重复消费案例.png)]

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。

不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。

4.2.2、分组

自定义配置分组,自定义分为同一个组,解决重复消费问题

配置文件分组

分别给8801、8802进行分组【orderA】

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-STpyK15M-1667253719475)(image/91、定义分组.png)]

重启项目查看MQ管理

orderB是历史记录,上面的配置以及都分为了ordeerA组,进入orderA组可以查看实际的消费者数量

同一组内会发生竞争关系,只有其中一个可以消费,启动项目测试是否为真

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EDKAGuQK-1667253719475)(image/91、分组之后.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RN87seHu-1667253719475)(image/92、分组消费者.png)]

4.2.3、持久化

通过上述,解决了重复消费问题,再看看持久化

  1. 停止8802/8803并去除掉8802的分组group: atguiguA,8803保留

  2. 8801先发送7条消息到rabbitmq

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iZ2THJsA-1667253719476)(image/101、待消费信息.png)]

  3. 先启动8802,无分组属性配置,后台没有打出来消息

    8802因为取消了groupA的分组所以获取不到持久化的数据(如果重启mq也会消失)

  4. 再启动8803,有分组属性配置,后台打出来了MQ上的消息

    8803保存groupA的分组所以在启动的时候就会将持久化的数据消费

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Oh0yGS29-1667253719476)(image/102、启动消费.png)]

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-11-05 00:12:25  更:2022-11-05 00:12:36 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年3日历 -2025/3/10 18:46:54-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码