IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> spring cloud stream 使用rocket笔记 -> 正文阅读

[Java知识库]spring cloud stream 使用rocket笔记

趁着打算重新找工作,刚好把之前的笔记整理整理,这篇笔记是当初在使用stream 连接rocketmq时的一些笔记

当时我们用的spring cloud stream 版本是3.1,当我去向往常一样去用注解绑定通道时@EnableBinding(),我的编译器亲切的告诉我这个注解已经被废弃了?way?

Enables the binding of targets annotated with Input and Output to a broker, according to the list of interfaces passed as value to the annotation.
Deprecated
as of 3.1 in favor of functional programming model
Author:
Dave Syer, Marius Bogoevici, David Turanski, Soby Chacko

根据作为值传递给注释的接口列表,启用使用 Input 和 Output 注释的目标绑定到代理。
已弃用
从 3.1 开始支持函数式编程模型
作者:
戴夫·赛尔,马里乌斯·博戈耶维奇,大卫·图兰斯基,索比·查科

哦,这该死的编译器,不过函数式编程,我还是蛮喜欢了,本着面向搜索引擎编程思想,我熟练的打开网页,在看了半个小时以后,我想,这都是什么鬼?
我又想起了被谭浩强《C语言程序设计》以及陈昊鹏译的《JAVA编程思想》支配的恐惧,简直是扼杀编程的兴趣。网上你都搜到的东西都都像是一个地方复制的,
并且说着你不懂的中国话,优美的中国话,就跟当初学reactor 响应式编程一样,满屏都充斥着无阻塞,数据流,背压,异步执行,哦这都是什么鬼,
所以我有时候真的怀疑,知识那么难学,应该是因为我们的教程人为的给增加了难度吧。

好了,言归正传,既然网上的东西都靠不住,那就只能靠自己了,幸亏不用梯子还能打开stream的官方文档

!stream官方文档

不过这对于我这个英语渣来说,嘿嘿 真是优美的英语。好在我从来都相信计算机是一门先试科学,既然别人都能开发出来,为什么我只是学会用一下不可以呢,瞬间感觉难度成指数下降。
我通常都愿意这么安慰自己,哎呦,好像有扯远了。

首先把rocketmq环境启动好后,创建一个生产者吧

项目版本

spring boot 2.5.3
com.alibaba.cloud 2.2.5.RELEASE

生产者

新建一个项目作为生产者

引入依赖


        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        </dependency>

使用Supplier函数定义一个简单的生产者,并将这个Supplier对象通过@Bean注入


    private static AtomicInteger counter =  new AtomicInteger(0);
    /**
     * 生产吃的
     * @return
     */
    @Bean
    public Supplier<String> order() {
        return () -> {
            System.out.println("=======》》厨师开始做饭!");
            int i = counter.incrementAndGet();
            if(i%5==0)
            {
                System.out.println("=======》》厨师心情不好!");
                return "食屎吧您!";
            }
            return "满汉全席奉上!";
        };
    }

配置


server:
  port: 8081
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 192.168.1.5:9876 # 配置你的rocketmq的服务地址
      bindings:
        order-out-0: # 命名规则 函数名(就是要绑定的函数名字)+in(输入)/out(输出)+index
          destination: fond-topic # 消息要发送到哪个主题

bindings下,配置了一个输入通道,这个通道绑定到 fond-topic 这个主题,需要注意的是order-out-0的命名规则,
order就是我们定义的生产者函数名字,out 代表是一个输出通道,通过这种命名规则,这个输入通道的信息,我们定义的order()进行了绑定

消费者

再新建一个项目作为消费者,依赖和生产者的依赖一样。用户@Bean 注入一个Function作为消息的消费者,函数名fond


    @Bean
    public Function<Flux<Message<String>>, Mono<Void>> fond() {
        return flux -> flux.map(message -> {
            System.out.println("消息:"+message.getPayload());
            System.out.println("我要开动了!");
            return message;
        }).then();
    }

配置文件


server:
  port: 8082
spring:
  cloud:
    function:
      definition: fond # 只有一个通道时可以不配,官方建议都使用显示的指定,多个用;隔开
    stream:
      rocketmq:
        binder:
          name-server: 192.168.1.5:9876
      bindings:
        fond-in-0:
          destination: fond-topic
          group: fond-topic-01-group

上面的配置和生产者差不多,不同点是bindings里配置的是输出通道,fond 就是上面定义的消费者函数,group一定要写,rocketmq的配置是一定要有分组的,
分组用来防止消息的重复消费。同一个分组内消费者处于竞争状态,一个消息只能由一个消费者消费。这里随便给一个,分别启动生产者和消费项目。

在这里插入图片描述
在这里插入图片描述

如我所愿,生产者和消费打印出了我想要的信息,消费者可以接收到生产者发送到的消息,可以看到每隔一秒生产者就创造了一个食物发送给消费者,因为框架提供了一个默认的轮询机制,它将触发生产者的调用,即默认没1秒调用一次order.get(),什么你还没吃够?

spring:
  cloud:
    stream:
      poller:
        fixedDelay: 100 #轮询间隔 单位毫秒

修改以上配置,1秒10吃安排上。

可以看到这种生产者是系统内部的,那如果我们的消息产生是基于事件驱动的,比如,通过外部web接口调用?这时可以用StreamBridge ,它就像一个桥梁,将我们的输入和消息中间件连接起来。
在原生产者项目上增加如下代码,创建一个controller

@RestController
@Slf4j
public class SendController{
	@Autowired
	private StreamBridge streamBridge;
	
 	@RequestMapping("/send")
    public String  send3 (String  msg)
    {
		//代表将消息发送到fond-topic这个主题
        streamBridge.send("fond-topic", msg);
        return "发送消息"+msg;

    }
  }

将之前的order()方法的@Bean先注掉,不然一直发影响人食欲。配置文件以及消费者什么都不用改,就是这么easy。

再重新启动生产者项目。通过浏览器调用
http://localhost:8081/send?msg=美团外卖来了。消费者控制台如下:

在这里插入图片描述
可以看到成功的用StreamBridge 往指定的主题内发送了消息。什么@EnableBinding,@StreamListener,@Output,@Input都去见鬼吧,就是这么easy,不是吗?

这种发送官方是称之为动态发送,因为在生产者内,我们并没有配置fond-topic的绑定关系,还有一种是相对于动态发送的,修改发送方法,替换为order-out-0的绑定配置,启动测试。


 	@RequestMapping("/send")
    public String  send3 (String  msg)
    {
		//代表将消息发送到order-out-0 的绑定关系
        streamBridge.send("order-out-0", msg);
        return "发送消息"+msg;

    }

今天先整理到这里 太晚了。

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 01:24:15  更:2021-08-17 01:25:15 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 8:51:18-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码