IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ集成SpringBoot -> 正文阅读

[大数据]RabbitMQ集成SpringBoot

基本概念

了解rabbitmq之前先要了解3个基本概念:生产者消费者代理(队列)。 rabbitmq在生产者和代理中间做了一层抽象。这样消息生产者和队列就没有直接联系,在中间加入了一层交换器(Exchange)。这样消息生产者把消息交给交换器,交换器根据路由策略再把消息转发给对应队列。
在这里插入图片描述
黄色的圈圈就是我们的消息推送服务,将消息推送到 中间方框里面也就是 rabbitMq的服务器,然后经过服务器里面的交换机、队列等各种关系(后面会详细讲)将数据处理入列后,最终右边的蓝色圈圈消费者获取对应监听的消息。

常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种

  • Direct Exchange 直连型交换机
    根据消息携带的路由键将消息投递给对应队列
    大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
    然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列

  • Fanout Exchange 扇型交换机
    这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列

  • Topic Exchange 主题交换机
    这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
    简单地介绍下规则:
    * (星号) 用来表示一个单词 (必须出现的)
    # (井号) 用来表示任意数量(零个或多个)单词
    通配的绑定键是跟队列进行绑定的,举个小例子
    队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
    如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
    如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;

集成SpringBoot

  1. 添加依赖
<!--rabbitmq-->
 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
  1. 编写配置文件
 spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest  # rabbitmq 默认账号
    password: guest  # rabbitmq 默认密码
    virtual-host: test # 设置虚拟主机,不设置则使用默认host
    # 消息确认配置项
    # 确认消息已发送到交换机: Exchange
    publisher-confirm-type: correlated
    # 确认消息已发送到队列: Queue
    publisher-returns: true
  1. 声明交换机,队列,以及绑定关系

Direct 直流交换机

/**
 * Direct
 */
@Configuration
public class DirectRabbitConfig {
    private String routing = "test-direct-routing";

    /**
     * Direct 队列
     * @return
     */
    @Bean
    public Queue createDirectQueue(){
        // 参数说明: public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete){}
        // name:     队列名称
        // durable:  是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:默认为false,是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除
        return new Queue("test-direct-queue",true);
    }

    /**
     * Direct 交换机
     * @return
     */
    @Bean
    public DirectExchange createDirectExchange(){
        // 参数说明: public DirectExchange(String name, boolean durable, boolean autoDelete){}
        // name:    交换机名称
        // durable: 是否持久化
        // autoDelete: 是否自动删除
        return new DirectExchange("test-direct-exchange",true,false);
    }

    /**
     * 绑定  将队列和交换机绑定,并设置用于匹配键 routing
     * @return
     */
    @Bean
    public Binding createDirectBinding(){

        return BindingBuilder.bind(createDirectQueue()).to(createDirectExchange()).with(routing);
    }

}

注意:因为direct是直连交换机,直连交换机是一对一,如果配置多个监听器到同一个队列,会发现他们是轮询消费,并会重复消费

Fanout: 扇形交换机

@Configuration
public class FanoutRabbitConfig {
    /**
     *  创建三个队列 :fanout.A   fanout.B  fanout.C
     *  将三个队列都绑定在交换机 fanoutExchange 上
     *  因为是扇型交换机, 路由键无需配置,配置也不起作用
     */

    @Bean
    public Queue createFanoutAQueue(){
        return new Queue("fanout.A",true);
    }

    @Bean
    public Queue createFanoutBQueue(){
        return new Queue("fanout.B",true);
    }
    @Bean
    public Queue createFanoutCQueue(){
        return new Queue("fanout.C",true);
    }

    /**
     * fanout 扇形交换机
     * @return
     */
    @Bean
    public FanoutExchange createFanoutExchange(){
        return new FanoutExchange("test-fanout-exchange",true,false);
    }

    /**
     * 绑定交换机和队列,因为时fanout交换机,没有路由的概念,所以不用绑定路由
     * @return
     */
    @Bean
    public Binding createBindingeA(){
        return BindingBuilder.bind(createFanoutAQueue()).to(createFanoutExchange());
    }
    @Bean
    public Binding createBindingeB(){
        return BindingBuilder.bind(createFanoutBQueue()).to(createFanoutExchange());
    }
    @Bean
    public Binding createBindingeC(){
        return BindingBuilder.bind(createFanoutCQueue()).to(createFanoutExchange());
    }

}

因为fanout 扇形交换机不需要routing 路由,所以当发送消息到交换机的时候,交换机会将消息发送到所有绑定到他的队列。

Topic: 主题交换机


/**
 * topic
 */
@Configuration
public class TopicRabbitConfig {
    // 绑定键
    public final static String MAN = "topic.man";
    public final static String WO_MAN = "topic.#";
    // 队列名称
    public final static String TEST_MAN_TOPIC_QUEUE = "test-man-topic-queue";
    public final static String TEST_WOMAN_TOPIC_QUEUE = "test-woman-topic-queue";
    // 交换机名称
    public final static String TEST_WOMAN_TOPIC_EXCHANGE = "test-topic-exchange";
    /**
     * MAN
     * @return
     */
    @Bean
    public Queue createManTopicQueue(){
        return new Queue(TEST_MAN_TOPIC_QUEUE,true);
    }
    /**
     * WOMAN
     * @return
     */
    @Bean
    public Queue createWomanTopicQueue(){
        return new Queue(TEST_WOMAN_TOPIC_QUEUE,true);
    }

    /**
     * topic 交换机
     * @return
     */
    @Bean
    public TopicExchange createTopicExchange(){
        return new TopicExchange(TEST_WOMAN_TOPIC_EXCHANGE,true,false);
    }

    /**
     * 绑定交换机和队列,,而且绑定的键值为topic.man
     * 这样只要是消息携带的路由键是topic.man,才会分发到该队列
     * @return
     */
    @Bean
    public Binding createTopicBinding(){
        return BindingBuilder.bind(createManTopicQueue()).to(createTopicExchange()).with(MAN);
    }

    /**
     * 绑定交换机和队列,,而且绑定的键值为topic.#
     * 这样只要是消息携带的路由键是topic开头,都会分发到该队列
     * @return
     */
    @Bean
    public Binding createTopicBinding2(){
        return BindingBuilder.bind(createWomanTopicQueue()).to(createTopicExchange()).with(WO_MAN);
    }
}

当发送的绑定键对topic.man 时,两个队列的监听器都可以收到消息

当发送的绑定键时topic.# 时,只有一个队列test-woman-topic-queue的监听器可以收到消息

发送消息

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@GetMapping("/direct/sendMsg")
	public String directSendMsg(){
		String msg = "hello World";
		/**
		参数一:  交换机名称
		参数二: 路由
		参数三: 具体消息
		*/
		rabbitTemplate.convertAndSend(DirectRabbitConfig.EXCHANGE_NAME,DirectRabbitConfig.ROUTING,msg);
		return "ok~";
	}

监听队列

@Component
public class TestDirectListener {

	// queues  指定要监听的队列
	@RabbitListener(queues = "test-direct-queue")
	public void receiver5(Message msg, Channel channel) throws IOException, InterruptedException {
		//打印数据
		String message = new String(msg.getBody(), StandardCharsets.UTF_8);
		System.out.println("队列1消费消息{}"+message);
//		channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
	}
}

convertSendAndReceive方法与convertAndSend方法的区别

convertSendAndReceive(…)可以同步消费者。使用此方法,当确认了所有的消费者都接收成功之后,才触发另一个convertSendAndReceive(…),也就是才会接收下一条消息。RPC调用方式。
convertAndSend(…):使用此方法,交换机会马上把所有的信息都交给所有的消费者,消费者再自行处理,不会因为消费者处理慢而阻塞线程。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-22 13:36:15  更:2021-08-22 13:38:26 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 18:58:41-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码