IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringCloud (十一) --------- Stream 消息驱动框架 -> 正文阅读

[Java知识库]SpringCloud (十一) --------- Stream 消息驱动框架


一、Stream 概述

在微服务的开发过程中,可能会经常用到消息中间件,通过消息中间件在服务与服务之间传递消息,不管你使用的是哪款消息中间件,比如 RabbitMQ 还是Kafka,那么消息中间件和服务之间都有一点耦合性,这个耦合性就是指如果我原来使用的 RabbitMQ,现在要替换为 Kafka,那么我们的微服务都需要修改,变动会比较大,因为这两款消息中间件有一些区别,如果我们使用 Spring Cloud Stream 来整合我们的消息中间件,那么这样就可以降低微服务和消息中间件的耦合性,做到轻松在不同消息中间件间切换,当然目前 Spring Cloud Stream 只支持 RabbitMQ 和 Kafka。

按照官方的定义,Spring Cloud Stream 是一个构建消息驱动微服务的框架。Spring Cloud Stream 解决了开发人员无感知的使用消息中间件的问题,因为Spring Cloud Stream 对消息中间件的进一步封装,可以做到代码层面对消息中间件的无感知,甚至于动态的切换中间件(RabbitMQ切换为 Kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

二、Stream 重要概念

在这里插入图片描述
应用程序通过 input (相当于消费者 consumer )、output (相当于生产者producer)来与 Spring Cloud Stream 中 Binder 交互,而 Binder 负责与消息中间件交互,因此,我们只需关注如何与 Binder 交互即可,而无需关注与具体消息中间件的交互。

组成说明
BinderBinder是应用与消息中间件之间的封装,目前实现了 Kafka 和 RabbitMQ ,通过Binder可以很方便的连接中间件,可以动态的改变消息类型 (对应于 Kafka 的 topic,RabbitMQ 的 exchange),这些都可以通过配置文件来实现。
@Input该注解标识输入通道,通过该输入通道接收消息进入应用程序
@Output该注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding将信道 channel 和 exchange绑定在一起

三、Stream 应用

A、创建项目

创建 SpringBoot 项目 springcloud-service-stream

B、添加依赖

<!-- spring-cloud-starter-stream-rabbit -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

C、配置文件

#对接具体的消息中间件
spring.cloud.stream.binders.rabbitmq.type=rabbit
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.host=192.168.160.133
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.port=5672
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.username=guest
spring.cloud.stream.binders.rabbitmq.environment.passsword=guest
spring.cloud.stream.binders.rabbitmq.environment.spring.rabbitmq.virtual-host=/


#消息生产者
#其中 output 是一个key, 这个名字是一个通道的名称, 在代码中会用到
#destination表示要使用的Exchange名称定义
spring.cloud.stream.bindings.output.destination=spring.cloud.stream.exchange
#设置要绑定的消息服务的 binder
spring.cloud.stream.bindings.output.binder=rabbitmq

D、消息发送

@EnableBinding(Source.class)
public class MessageSender {
    @Autowired
    private MessageChannel output;//消息发送管道
    
    public void publish(String msg) {
        output.send(MessageBuilder.withPayLoad(msg).build());
        system.out.println("消息发送 : <" + msg + "> 完成, 时间" + new Date());
    }
}

E、消息接收

#配置消息消费者
#指定交换机
spring.cloud.stream.bindings.input.destination=spring.cloud.stream.exchange
#设置要绑定的消息服务的binder
spring.cloud.stream.bindings.input.binder=rabbitmq
@EnableBinding(Sink.class)
public class MessageReceiver {
    @StreamListener(Sink.INPUT)
    public void input(Message message) {
        System.out.println("消息接收: <" + message.getPayload() + "> 完成, 时间:" + new Date());
    }
}

在这里插入图片描述

四、Stream 自定义消息通道

我们已经实现了一个基础的 Spring Cloud Stream 消息传递处理操作,但在操作之中使用的是系统提供的 Source (output) 、Sink (input) ,接下来我们来看一下自定义通道名称。

public interface MessageSource {
   // channel 名称
   String OUTPUT = "myOutPut";
   @Output(OUTPUT)
   Message output();
}
public interface MessageSink {
    String INPUT = "myInput";
    
    @Input(Sink.INPUT)
    SubscribableChannel input();
}
#配置消息消费者
#指定交换机
spring.cloud.stream.bindings.myInput.destination=spring.cloud.stream.exchange

#设置要绑定的消息服务的binder
spring.cloud.stream.bindings.myInput.binder=rabbitmq
#消息生产者
spring.cloud.stream.bindings.myOutput.destination=spring.cloud.stream.exchange
spring.cloud.stream.bindings.myOutput.binder=rabbitmq

五、Stream 分组与持久化

我们成功地实现了消息的发送与接收,但是所发送的消息在默认情况下都属于一种临时消息,也就是说如果没有消费者进行消费处理,那么该消息是不会被永久保留,可能会造成消息的丢失。如果要实现持久化消息,需要在消息的消费端配置一个分组,有分组就表示该消息可以进行持久化。

#指定分组,可以进行消息的持久化 applies to consumers only
spring.cloud.stream.bindings.myInput.group=rabbitmq-group

在 Spring Cloud Stream 中在消费者端如果将队列设置为持久化队列,则队列名称会变为为 destination.group,此时消费端的微服务宕机或重启,该队列信息依然会被保留在 RabbitMQ 中,后续依然可以进行消费。

消息分组的作用:

1、消息可以持久化
2、可以实现同一分组只有一个消费者能接收到消费

没有做分组时,一个消息可以被多个消费者接收,分组可以让一个消息只能被一个消费者接收,避免一个消息被多个消费者消费。当项目集群部署了很多份,那么就会变成多个消费者,但是业务可能需要的是一个消息只消费一次,所以此时需要加个分组,就可以实现同一个分组里面的消费者只会有一个消费者能接收到消息。

注意:

1、 不分组的话,消费者要先启动起来,然后再用生产者发送消息,这样才可以接收到消息,否则发送的消息就丢失了,生产者先发了消息,消费者后面才启动的话是接收不到消息的。

2、不分组的话,多个消费者都能接收消息,也就是一个消息可以被多个消费者接收。

六、Stream 设置路由键

默认情况下 Spring Cloud Stream 传送消息属于广播消息,默认匹配方式是 #,表示所有消费者都可以匹配上,我们也可以通过指定路由键 RoutingKey 实现按需求匹配消息消费端进行消息接收处理。

在消费端进行设置

#设置一个RoutingKey路由key,默认是#,我们可以指定
spring.cloud.stream.rabbit.bindings.myInput.consumer.bindingRoutingKey=spring.cloud.stream.#

开发的时候有两种选择:

一种就是 直接 SpringBoot + rabbitmq 整合实现消息传送。
一种就是 使用 Spring Cloud Stream 对消息中间件的包装,来实现消息传送。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-05-07 11:02:36  更:2022-05-07 11:04:17 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 0:34:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码