IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> springboot配置rabbitmq队列 -> 正文阅读

[大数据]springboot配置rabbitmq队列

作者:>

springboot配置rabbitmq

此文章主要是提供springboot整合rabbitmq时,对于延时队列、死信队列、Fanout交换机、Topic交换机等,进行举例说明。

示例代码需要的静态常量如下:

package com.feng.rabbit.com.utils;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-13 15:11
 **/
public interface Constants {

    String QUEUE_NAME = "mail.queue1";

    String EXCHANGE_NAME = "mail.exchange1";

    String MAIL_ROUTING_KEY = "mail.routing1";

    // --------------------------------------- 死信队列配置 --------------------------------------

    String DELAY_QUEUE_TTL_NAME = "delay_queue_ttl";

    String DELAY_PROCESS_QUEUE_ROUTING_KEY = "delay_process_queue_routing_key";

    String DELAY_PROCESS_QUEUE = "delay_process_queue";

    String DELAY_PROCESS_EXCHANGE = "delay_process_exchange";

    // 过期时间
    int QUEUE_EXPIRATION = 5000;

    // --------------------------------------- Fanout Exchange 配置 --------------------------------------

    String FANOUT_SMS_QUEUE = "fanout_sms_queue";

    String FANOUT_MAIL_QUEUE = "fanout_mail_queue";

    String FANOUT_EXCHANGE = "fanout_exchange";

    // --------------------------------------- Topic Exchange 配置 --------------------------------------

    String TOPIC_SMS_QUEUE = "topic_sms_queue";

    String TOPIC_MAIL_QUEUE = "topic_mail_queue";

    String TOPIC_ALL_QUEUE = "topic_all_queue";

    String TOPIC_SMS_QUEUE_ROUTING_KEY = "topic.sms.queue";

    String TOPIC_MAIL_QUEUE_ROUTING_KEY = "topic.mail.queue";

    String TOPIC_ALL_QUEUE_ROUTING_KEY = "topic.#";

    String TOPIC_QQ_QUEUE_ROUTING_KEY = "topic.qq.queue";

    String TOPIC_ROUTING_KEY = "topic.*.queue";

    String TOPIC_EXCHANGE = "topic_exchange";

}

延时队列&死信队列

延时队列是指定队列过期时间,当队列过期时,触发死信队列执行,以此来达到延时执行的效果。

死信队列其实和普通的队列没啥大的区别,都需要创建自己的QueueExchange,然后通过RoutingKey绑定到Exchange上去,只不过死信队列的RoutingKeyExchange要作为参数,绑定到正常的队列上去。

配置信息

延时队列和死信队列绑定配置类:

package com.feng.rabbit.pro.config;

import com.feng.rabbit.com.utils.Constants;
import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

import java.util.HashMap;
import java.util.Map;

/**
 * @description: 延时队列和死信队列配置
 * @author: fenglin
 * @create: 2021-08-16 09:48
 **/
@SpringBootConfiguration
public class DelayQueueConfig {

    /**
     * 配置延时队列,通过mq发送消息到该队列 rabbitTemplate.convertAndSend(Constants.DELAY_QUEUE_TTL_NAME, "");
     * @return
     */
    @Bean
    Queue delayQueueTTL() {
        Map<String, Object> params = new HashMap<>(8);
        params.put("x-dead-letter-exchange", Constants.DELAY_PROCESS_EXCHANGE);// 设置队列的死信路由,那么出现dead letter之后将dead letter重新发送到指定exchange;
        params.put("x-dead-letter-routing-key", Constants.DELAY_PROCESS_QUEUE_ROUTING_KEY);// 设置路由键,出现dead letter之后将dead letter重新按照指定的routing-key发送;
        params.put("x-message-ttl", Constants.QUEUE_EXPIRATION);// 设置队列过期时间
        return QueueBuilder.durable(Constants.DELAY_QUEUE_TTL_NAME)
            .withArguments(params).build();
    }

    /**
     * 创建延时队列延时后,兜底的接收队列
     *
     * @return
     */
    @Bean
    Queue processQueue() {
        return QueueBuilder.durable(Constants.DELAY_PROCESS_QUEUE).build();
    }

    /**
     * 延时队列延时后,兜底的交换机
     *
     * @return
     */
    @Bean
    DirectExchange processExchange() {
        return new DirectExchange(Constants.DELAY_PROCESS_EXCHANGE);
    }

    /**
     * 将延时队列的queue和exchange进行绑定
     *
     * @return
     */
    @Bean
    Binding processBinding() {
        return BindingBuilder.bind(processQueue()).to(processExchange())
            .with(Constants.DELAY_PROCESS_QUEUE_ROUTING_KEY);
    }

}

通过x-dead-letter-exchange设置队列的死信路由,那么出现dead letter之后将dead letter重新发送到指定exchange;

通过x-dead-letter-routing-key设置路由键:出现dead letter之后将dead letter重新按照指定的routing-key发送;

通过x-message-ttl设置队列的过期时间;

消费者监听配置

消费者配置监听队列为死信队列名

package com.feng.rabbit.cus.receiver;

import com.alibaba.fastjson.JSON;
import com.feng.rabbit.com.domain.MsgObj;
import com.feng.rabbit.com.utils.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-13 14:58
 **/
@Component
public class MailListener {

    private final static Logger log = LoggerFactory.getLogger(MailListener.class);

    @RabbitListener(queues = Constants.DELAY_PROCESS_QUEUE)
    public void deadQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("接收到队列({})的消息为:{}", Constants.DELAY_PROCESS_QUEUE, msg);
        try {
            channel.basicAck(tag, false); // 手动确认消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

测试接口

@RequestMapping(value = "/deadQueue")
public String ttlQueue() {
    log.info(" ------------死信队列开始发送消息咯------------");
    
    rabbitTemplate.convertAndSend(Constants.DELAY_QUEUE_TTL_NAME, "我是死信队列发过来的消息:" + Constants.DELAY_QUEUE_TTL_NAME);
    
    return "success";
}

访问http://localhost:8090/mail/deadQueue得到下列结果:

生产者:

在这里插入图片描述

消费者:

在这里插入图片描述

刚好是经过五秒后,消费者接收到消息。

Fanout交换机

只要队列绑定到了Fanout exchange上,就会接收到消息,与routing_key没有关系!

配置信息

创建两个队列,一个SMS队列,一个Mail队列,两队列都绑定到Fanout交换机。

package com.feng.rabbit.pro.config;

import com.feng.rabbit.com.utils.Constants;
import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-16 10:47
 **/
@SpringBootConfiguration
public class FanoutExchangeConfig {

    @Bean
    Queue fanOutEmailQueue() {
        return QueueBuilder.durable(Constants.FANOUT_MAIL_QUEUE).build();
    }

    @Bean
    Queue fanoutSmsQueue() {
        return QueueBuilder.durable(Constants.FANOUT_SMS_QUEUE).build();
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange(Constants.FANOUT_EXCHANGE, true, false, null);
    }

    @Bean
    Binding fanoutMailBinding() {
        return BindingBuilder.bind(fanOutEmailQueue()).to(fanoutExchange());
    }

    @Bean
    Binding fanoutSmsBinding() {
        return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutExchange());
    }

}

值得注意的点:BindingBuilder.bind(fanOutEmailQueue()).to(fanoutExchange())后面没有with(“routing_key”)

是因为fanout交换机和routing_key没有关系

消费者监听配置

package com.feng.rabbit.cus.receiver;

import com.alibaba.fastjson.JSON;
import com.feng.rabbit.com.domain.MsgObj;
import com.feng.rabbit.com.utils.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-13 14:58
 **/
@Component
public class MailListener {

    private final static Logger log = LoggerFactory.getLogger(MailListener.class);

    // ----------------------------------------------- Fanout exchange -------------------------------------------------
    @RabbitListener(queues = Constants.FANOUT_MAIL_QUEUE)
    public void fanoutMailQueue(MsgObj msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("接收到队列({})的消息为:{}", Constants.FANOUT_MAIL_QUEUE, JSON.toJSONString(msg));
        try {
            channel.basicAck(tag, false); // 手动确认消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = Constants.FANOUT_SMS_QUEUE)
    public void fanoutSmsQueue(MsgObj msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("接收到队列({})的消息为:{}", Constants.FANOUT_SMS_QUEUE, JSON.toJSONString(msg));
        try {
            channel.basicAck(tag, false); // 手动确认消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

测试接口

package com.feng.rabbit.pro.controller;

import com.feng.rabbit.com.domain.MsgObj;
import com.feng.rabbit.com.utils.Constants;
import com.feng.rabbit.pro.config.DelayQueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-13 14:25
 **/
@RestController
@RequestMapping(value = "/mail")
public class MailController {

    private final static Logger log = LoggerFactory.getLogger(MailController.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/fanoutQueue")
    public String fanoutQueue() {
        log.info(" ------------Fanout 交换机开始发送消息咯------------");
        String msgId = UUID.randomUUID().toString();

        MsgObj msgObj = new MsgObj(msgId, "fanout", "四川成都",
                22, Constants.FANOUT_EXCHANGE, null);

        rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msgObj, new CorrelationData(msgId));
        return "Fanout queue";
    }
}

访问接口http://localhost:8090/mail/fanoutQueue得到下列结果:

在这里插入图片描述

同一时间,两个队列接收到消息。

Topic交换机

通过routing key匹配规则把消息分发对应的队列中。

配置信息

创建三个队列,分别为:mail、sms、all,并将三个队列分别绑定到交换机中,routing_key分别为:

QUEUERouting Key
mailtopic.mail.queue
smstopic.sms.queue
alltopic.*.queue

绑定配置类:

package com.feng.rabbit.pro.config;

import com.feng.rabbit.com.utils.Constants;
import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.context.annotation.Bean;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-16 11:26
 **/
@SpringBootConfiguration
public class TopicExchangeConfig {

    /**
     * 创建mail队列
     * @return
     */
    @Bean
    Queue topicMailQueue() {
        return QueueBuilder.durable(Constants.TOPIC_MAIL_QUEUE).build();
    }

    /**
     * 创建sms队列
     * @return
     */
    @Bean
    Queue topicSmsQueue() {
        return QueueBuilder.durable(Constants.TOPIC_SMS_QUEUE).build();
    }

    /**
     * 创建all队列
     * @return
     */
    @Bean
    Queue topicAllQueue() {
        return QueueBuilder.durable(Constants.TOPIC_ALL_QUEUE).build();
    }

    /**
     * 创建交换机
     * @return
     */
    @Bean
    TopicExchange topicExchange() {
        return new TopicExchange(Constants.TOPIC_EXCHANGE);
    }

    /**
     * 将mail队列和交换机绑定
     * @return
     */
    @Bean
    Binding topicMailBinding() {
        return BindingBuilder.bind(topicMailQueue()).to(topicExchange()).with(Constants.TOPIC_MAIL_QUEUE_ROUTING_KEY);
    }

    /**
     * 将sms队列和交换机绑定
     * @return
     */
    @Bean
    Binding topicSmsBinding() {
        return BindingBuilder.bind(topicSmsQueue()).to(topicExchange()).with(Constants.TOPIC_SMS_QUEUE_ROUTING_KEY);
    }

    /**
     * 将all队列和交换机绑定
     * @return
     */
    @Bean
    Binding topicAllBinding() {
        return BindingBuilder.bind(topicAllQueue()).to(topicExchange()).with(Constants.TOPIC_ROUTING_KEY);
    }

}

下面用例子描述#和*的区别:

1:"#"启动消费者时,接收所有的消息
2:"error.#“启动消费者时,接收所有以error开头的消息
3:”*.kern"启动消费者时,接收所有以一个单词和kern组合的消息
"error.app"发送消息时,1和2 会接收到消息

消费者监听配置

package com.feng.rabbit.cus.receiver;

import com.alibaba.fastjson.JSON;
import com.feng.rabbit.com.domain.MsgObj;
import com.feng.rabbit.com.utils.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-13 14:58
 **/
@Component
public class MailListener {

    private final static Logger log = LoggerFactory.getLogger(MailListener.class);

    // ------------------------------------------------ Topic Exchange -------------------------------------------------

    @RabbitListener(queues = Constants.TOPIC_MAIL_QUEUE)
    public void topicMailQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("接收到队列({})的消息为:{}", Constants.TOPIC_MAIL_QUEUE, msg);
        try {
            channel.basicAck(tag, false); // 手动确认消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = Constants.TOPIC_SMS_QUEUE)
    public void topicSmsQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("接收到队列({})的消息为:{}", Constants.TOPIC_SMS_QUEUE, msg);
        try {
            channel.basicAck(tag, false); // 手动确认消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = Constants.TOPIC_ALL_QUEUE)
    public void topicAllQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        log.info("接收到队列({})的消息为:{}", Constants.TOPIC_ALL_QUEUE, msg);
        try {
            channel.basicAck(tag, false); // 手动确认消息
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

测试接口

package com.feng.rabbit.pro.controller;

import com.feng.rabbit.com.domain.MsgObj;
import com.feng.rabbit.com.utils.Constants;
import com.feng.rabbit.pro.config.DelayQueueConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

/**
 * @description:
 * @author: fenglin
 * @create: 2021-08-13 14:25
 **/
@RestController
@RequestMapping(value = "/mail")
public class MailController {

    private final static Logger log = LoggerFactory.getLogger(MailController.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/topicSmsQueue")
    public String topicSmsQueue() {
        log.info(" ------------Topic 交换机开始发送消息咯------------");

        String msgId = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, Constants.TOPIC_SMS_QUEUE_ROUTING_KEY,
                "我是来自topic的消息:" + Constants.TOPIC_SMS_QUEUE_ROUTING_KEY, new CorrelationData(msgId));

        return "topic sms queue";
    }

    @RequestMapping(value = "/topicMailQueue")
    public String topicMailQueue() {
        log.info(" ------------Topic 交换机开始发送消息咯------------");
        String msgId = UUID.randomUUID().toString();

        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, Constants.TOPIC_MAIL_QUEUE_ROUTING_KEY,
                "我是来自topic的消息:" + Constants.TOPIC_MAIL_QUEUE_ROUTING_KEY, new CorrelationData(msgId));

        return "topic mail queue";
    }

    @RequestMapping(value = "/topicAllQueue")
    public String topicAllQueue() {
        log.info(" ------------Topic 交换机开始发送消息咯------------");

        String msgId = UUID.randomUUID().toString();
        rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, Constants.TOPIC_QQ_QUEUE_ROUTING_KEY,
                "我是来自topic的消息:" + Constants.TOPIC_QQ_QUEUE_ROUTING_KEY, new CorrelationData(msgId));

        return "topic all queue";
    }
}

  1. 当访问http://localhost:8090/mail/topicSmsQueue时,sms队列和all队列接收到消息
    在这里插入图片描述
  2. 当访问http://localhost:8090/mail/topicMailQueue时,mail队列和all队列接收到消息 在这里插入图片描述
  3. 当访问http://localhost:8090/mail/topicAllQueue时,all队列接收到消息
    在这里插入图片描述

最后附上gitee地址:https://gitee.com/fenglin676168/rabbitmq.git

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-17 15:27:43  更:2021-08-17 15:29:59 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 13:27:55-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码