一、Stream 概述
在微服务的开发过程中,可能会经常用到消息中间件,通过消息中间件在服务与服务之间传递消息,不管你使用的是哪款消息中间件,比如 RabbitMQ 还是Kafka,那么消息中间件和服务之间都有一点耦合性,这个耦合性就是指如果我原来使用的 RabbitMQ,现在要替换为 Kafka,那么我们的微服务都需要修改,变动会比较大,因为这两款消息中间件有一些区别,如果我们使用 Spring Cloud Stream 来整合我们的消息中间件,那么这样就可以降低微服务和消息中间件的耦合性,做到轻松在不同消息中间件间切换,当然目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka。
按照官方的定义,Spring Cloud Stream 是一个构建消息驱动微服务的框架。Spring Cloud Stream 解决了开发人员无感知的使用消息中间件的问题,因为Spring Cloud Stream 对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(RabbitMQ切换为 Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。
二、Stream 重要概念
应用程序通过 input (相当于消费者 consumer )、output (相当于生产者producer)来与 Spring Cloud Stream 中 Binder 交互,而 Binder 负责与消息中间件交互,因此,我们只需关注如何与 Binder 交互即可,而无需关注与具体消息中间件的交互。
组成 | 说明 |
---|
Binder | Binder是应用与消息中间件之间的封装,目前实现了 Kafka 和 RabbitMQ ,通过Binder可以很方便的连接中间件,可以动态的改变消息类型 (对应于 Kafka 的 topic,RabbitMQ 的 exchange),这些都可以通过配置文件来实现。 | @Input | 该注解标识输入通道,通过该输入通道接收消息进入应用程序 | @Output | 该注解标识输出通道,发布的消息将通过该通道离开应用程序 | @StreamListener | 监听队列,用于消费者的队列的消息接收 | @EnableBinding | 将信道 channel 和 exchange绑定在一起 |
三、Stream 应用
A、创建项目
创建 SpringBoot 项目 springcloud-service-stream
B、添加依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
C、配置文件
spring.cloud.stream.binders.rabbitmq.type=rabbit
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.host=192.168.160.133
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbitmq.environment.passsword=guest
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.virtual-host=/
spring.cloud.stream.bindings.output.destination=spring.cloud.stream.exchange
spring.cloud.stream.bindings.output.binder=rabbitmq
D、消息发送
@EnableBinding(Source.class)
public class MessageSender {
@Autowired
private MessageChannel output;
public void publish(String msg) {
output.send(MessageBuilder.withPayLoad(msg).build());
system.out.println("消息发送 : <" + msg + "> 完成, 时间" + new Date());
}
}
E、消息接收
spring.cloud.stream.bindings.input.destination=spring.cloud.stream.exchange
spring.cloud.stream.bindings.input.binder=rabbitmq
@EnableBinding(Sink.class)
public class MessageReceiver {
@StreamListener(Sink.INPUT)
public void input(Message message) {
System.out.println("消息接收: <" + message.getPayload() + "> 完成, 时间:" + new Date());
}
}
四、Stream 自定义消息通道
我们已经实现了一个基础的 Spring Cloud Stream 消息传递处理操作,但在操作之中使用的是系统提供的 Source (output) 、Sink (input) ,接下来我们来看一下自定义通道名称。
public interface MessageSource {
String OUTPUT = "myOutPut";
@Output(OUTPUT)
Message output();
}
public interface MessageSink {
String INPUT = "myInput";
@Input(Sink.INPUT)
SubscribableChannel input();
}
spring.cloud.stream.bindings.myInput.destination=spring.cloud.stream.exchange
spring.cloud.stream.bindings.myInput.binder=rabbitmq
spring.cloud.stream.bindings.myOutput.destination=spring.cloud.stream.exchange
spring.cloud.stream.bindings.myOutput.binder=rabbitmq
五、Stream 分组与持久化
我们成功地实现了消息的发送与接收,但是所发送的消息在默认情况下都属于一种临时消息,也就是说如果没有消费者进行消费处理,那么该消息是不会被永久保留,可能会造成消息的丢失。如果要实现持久化消息,需要在消息的消费端配置一个分组,有分组就表示该消息可以进行持久化。
spring.cloud.stream.bindings.myInput.group=rabbitmq-group
在 Spring Cloud Stream 中在消费者端如果将队列设置为持久化队列,则队列名称会变为为 destination.group,此时消费端的微服务宕机或重启,该队列信息依然会被保留在 RabbitMQ 中,后续依然可以进行消费。
消息分组的作用:
1、消息可以持久化 2、可以实现同一分组只有一个消费者能接收到消费
没有做分组时,一个消息可以被多个消费者接收,分组可以让一个消息只能被一个消费者接收,避免一个消息被多个消费者消费。当项目集群部署了很多份,那么就会变成多个消费者,但是业务可能需要的是一个消息只消费一次,所以此时需要加个分组,就可以实现同一个分组里面的消费者只会有一个消费者能接收到消息。
注意:
1、 不分组的话,消费者要先启动起来,然后再用生产者发送消息,这样才可以接收到消息,否则发送的消息就丢失了,生产者先发了消息,消费者后面才启动的话是接收不到消息的。
2、不分组的话,多个消费者都能接收消息,也就是一个消息可以被多个消费者接收。
六、Stream 设置路由键
默认情况下 Spring Cloud Stream 传送消息属于广播消息,默认匹配方式是 #,表示所有消费者都可以匹配上,我们也可以通过指定路由键 RoutingKey 实现按需求匹配消息消费端进行消息接收处理。
在消费端进行设置
spring.cloud.stream.rabbit.bindings.myInput.consumer.bindingRoutingKey=spring.cloud.stream.
开发的时候有两种选择:
一种就是 直接 SpringBoot + rabbitmq 整合实现消息传送。 一种就是 使用 Spring Cloud Stream 对消息中间件的包装,来实现消息传送。
|