SpringCloud Stream消息驱动
1、SpringCloud Stream概述
官方地址:https://spring.io/projects/spring-cloud-stream#overview
中文指导手册地址:https://m.wang1314.com/doc/webapp/topic/20971999.html
什么是SpringCloudStream官方定义 Spring Cloud Stream 是一个构建消息驱动微服务的框架。
应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。通过我们配置binding(绑定) ,而 Spring Cloud Stream 的 binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
目前仅支持RabbitMQ、Kafka。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-27725bvD-1667253719470)(image/82、springcloudStream.png)]](https://img-blog.csdnimg.cn/fcac25afb19f4b89ab84a8ca88715af1.png)
1.1、设计思想
1、标注的MQ流程
-
生产者/消费者之间靠消息媒介传递信息内容【massage】 -
消息必须走特定的通道【消息通道MessageChannel】 -
消息通道里的消息如何被消费呢,谁负责收发处理
消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅
2、Cloud Stream的作用
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。
这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候springcloud Stream给我们提供了一种解耦合的方式。
3、什么是Binder
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。
通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-E9vTw92q-1667253719471)(image/83、stream处理架构.png)]](https://img-blog.csdnimg.cn/2a24c7e77a7e4fc2bf6724fb5089a822.png)
Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。
4、Stream中的消息通信方式遵循了发布-订阅模式
使用Topic主题进行广播
- 在RabbitMQ就是Exchange
- 在Kakfa中就是Topic
1.2、标准的流程套路
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-miZBVQsy-1667253719471)(image/84、stream流程.png)]](https://img-blog.csdnimg.cn/8ad01a545f1a4e898bb55ab5616a1ee3.png)
1、Binder:很方便的连接中间件,屏蔽不同的差异
2、Channel
通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
3、Source和Sink
简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。
1.3、编码API和常用注解
组成和注解 | 描述 |
---|
Middleware | 中间件,目前只支持RabbitM和Kafka | Binder | Binder是应用与消息中间的封装,目前实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现 | @Input | 注解标识输入通道,通过该输入通道接收到的消息进入应用程序 | @Output | 注解标识输出通道,发布的消息将通过通道离开应用程序 | @StreamListener | 监听队列,用户消费者的队列的消息接收 | @EnableBinding | 指通道channel和exchange绑定在一起 |
RabbitMQ环境必须已经OK,可正常连接使用,否则后面的步骤都实现不了
2、消息驱动之生产者(output)
2.1、新建模块cloud-stream-rabbitmq-provider8801
2.2、引入pom.xml配置文件
如果是需要Stream整合的就将依赖改为spring-cloud-starter-stream-kafka
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2.3、YAML配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
output:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
2.4、生产者启动类
package com.zcl.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
2.5、业务实现
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fRuCAr6g-1667253719472)(image/85、Stream业务流程.png)]](https://img-blog.csdnimg.cn/ff4b76bee8ea4cd0b0e7b6ead2aeba9c.png)
2.5.1、服务接口实现类
自己创建一个实现的接口以及里面的方法
注意:在这个服务实现类里面不是使用@Service 注解了,因为不是web应用,而是Stream消息驱动,是与中间件进行打交道的不是与数据库
package com.zcl.springcloud.service.Impl;
import com.zcl.springcloud.service.IMessageProvider;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.UUID;
@Slf4j
@EnableBinding(Source.class)
public class IMessageProviderImpl implements IMessageProvider {
@Resource
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
this.output.send(MessageBuilder.withPayload(serial).build());
log.info("-------------- " + serial + " ----------------");
return serial;
}
}
2.5.2、控制器实现
package com.zcl.springcloud.controller;
import com.zcl.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage()
{
return messageProvider.send();
}
}
2.6、启动测试
-
启动7001Eureka访问中心 -
启动8801消息发送者,启动成功以及观察RabbitMQ的管理界面 ![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WEiWfTC1-1667253719472)(image/86、exchanges.png)]](https://img-blog.csdnimg.cn/ff5dd3d12c97422b9357147ea3713a3a.png) -
访问接口发送消息,查看MQ的管理页面波峰情况 ![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7jCEVaRO-1667253719473)(image/87、请求波峰.png)]](https://img-blog.csdnimg.cn/5804da8263de47e89e80fce273d3c680.png)
3、消息驱动之消费者(input)
同样的参考如下流程图
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YVU2tltV-1667253719473)(image/85、Stream业务流程.png)]](https://img-blog.csdnimg.cn/9daacdc7136146d2b457968f5d8f256b.png)
3.1、新建cloud-stream-rabbitmq-consumer8802 模块
3.2、引入pom.xml依赖
与8801一样
3.3、添加YAML配置文件
配置文件与消息生产的区别在于:
output:
destination: studyExchange
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
3.4、添加启动类StreamMQMain8802
与消息生产者一样
3.5、业务实现
必须要有@Component 注解注入到Spring容器中
package com.zcl.springcloud.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListener {
@Value("${server.port}")
private String port;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
log.info("消费者1号接收到的消息 ----- " + message.getPayload() + " -----,port: " + port);
}
}
3.6、启动项目测试
-
启动7001 -
启动8801,消息发送者 -
启动8802,消息消费者 -
8801发送消息,8802消费消息,并查看具体的MQ波峰图 ![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0YxjInQ0-1667253719474)(image/88、MQ消费者绑定.png)]](https://img-blog.csdnimg.cn/095f51ea6912435d8b1636fb660661c3.png) 控制器输出 ![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-laGjKNvM-1667253719474)(image/89、发送接收输出.png)]](https://img-blog.csdnimg.cn/ca77eced8ed44a3bb5912dc20bd2015e.png)
4、分组消费与持久化
4.1、完整参考cloud-stream-rabbitmq-consumer8802,创建8803项目
除了启动的端口号不一样之外其他的配置都一样
4.2、启动项目发现问题
- 启动7001(Eureka服务中心)
- 启动8801(生产)、8802(消费)、8803(消费)
- 测试发送消失是否两个消费者都可以接收到
4.2.1、重复消费
目前是8802/8803同时都收到了,存在重复消费问题
解决方案:分组和持久化属性group
常见案例
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-g9CqfKZW-1667253719475)(image/90、重复消费案例.png)]](https://img-blog.csdnimg.cn/1a5aaca755a448f0863118efd62cecb3.png)
注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
不同组是可以全面消费的(重复消费),同一组内会发生竞争关系,只有其中一个可以消费。
4.2.2、分组
自定义配置分组,自定义分为同一个组,解决重复消费问题
配置文件分组
分别给8801、8802进行分组【orderA】
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-STpyK15M-1667253719475)(image/91、定义分组.png)]](https://img-blog.csdnimg.cn/6791e63f2caf4d93a43f107cb0fba187.png)
重启项目查看MQ管理
orderB是历史记录,上面的配置以及都分为了ordeerA 组,进入orderA组可以查看实际的消费者数量
同一组内会发生竞争关系,只有其中一个可以消费,启动项目测试是否为真
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EDKAGuQK-1667253719475)(image/91、分组之后.png)]](https://img-blog.csdnimg.cn/2b701b99dc5c4e9c8273554bd115b0a1.png)
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RN87seHu-1667253719475)(image/92、分组消费者.png)]](https://img-blog.csdnimg.cn/01705e630e9141e393a82178f61034d9.png)
4.2.3、持久化
通过上述,解决了重复消费问题,再看看持久化
-
停止8802/8803并去除掉8802的分组group: atguiguA,8803保留 -
8801先发送7条消息到rabbitmq ![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iZ2THJsA-1667253719476)(image/101、待消费信息.png)]](https://img-blog.csdnimg.cn/4a2c6e33456a4677894ed5a18a90001f.png) -
先启动8802,无分组属性配置,后台没有打出来消息
8802因为取消了groupA 的分组所以获取不到持久化的数据(如果重启mq也会消失)
-
再启动8803,有分组属性配置,后台打出来了MQ上的消息
8803保存groupA 的分组所以在启动的时候就会将持久化的数据消费
![[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Oh0yGS29-1667253719476)(image/102、启动消费.png)]](https://img-blog.csdnimg.cn/11658935269344bb810bfc0706ab787f.png)
|