IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息 -> 正文阅读

[Java知识库]SpringBoot整合RabbitMQ实现消息的发送与接收,确认消息,延时消息

安装消息中间件

Windows安装ErLang

https://github.com/erlang/otp/releases/tag/OTP-25.0
在这里插入图片描述

在这里插入图片描述

Windows安装RabbitMq

https://www.rabbitmq.com/install-windows.html
在这里插入图片描述
在这里插入图片描述

安装RabbitMq UI界面

在这里插入图片描述
打开RabbitMQ Command Prompt 进入命令行

# 查看mq服务状态
rabbitmqctl.bat status

在这里插入图片描述

# 安装ui界面
rabbitmq-plugins enable rabbitmq_management

在这里插入图片描述

访问http://localhost:15672/
默认账号密码guest/guest
在这里插入图片描述

安装延时消息插件

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
在这里插入图片描述
将 ez文件拷贝到安装目录rabbitmq_server-3.10.2\plugins下

# 安装插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在这里插入图片描述

SpringBoot整合

这里我直接用我先前建好的微服务
order-service作为消息发送者,storage-service作为消息接收者
在这里插入图片描述
在这里插入图片描述

消息发送端order-service

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
  rabbitmq:
    username: guest
    password: guest
    host: 127.0.0.1
    port: 5672
    # 消息确认(ACK)
    publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
    publisher-returns: true #确认消息已发送到队列(Queue)

RabbitMqConfig

package top.fate.config;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 11:41
 */
@Configuration
public class RabbitMqConfig {

    private static final Logger LOG = LogManager.getLogger();

    public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
    public static final String DIRECT_EXCHANGE = "direct_exchange"; //交换器名称
    public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由键

    public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称
    public static final String DELAY_EXCHANGE = "delay_exchange"; //交换器名称
    public static final String DELAY_ROUTING_KEY = "delay_routing_key"; //路由键

    @Autowired
    private CachingConnectionFactory connectionFactory;

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
    {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);

        //设置Json转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());

        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        //确认消息送到交换机(Exchange)回调
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
        {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause)
            {
                LOG.info("\n确认消息送到交换机(Exchange)结果:");
                LOG.info("相关数据:" + correlationData);
                LOG.info("是否成功:" + ack);
                LOG.info("错误原因:" + cause);
            }
        });

        //确认消息送到队列(Queue)回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
        {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage)
            {
                LOG.info("\n确认消息送到队列(Queue)结果:");
                LOG.info("发生消息:" + returnedMessage.getMessage());
                LOG.info("回应码:" + returnedMessage.getReplyCode());
                LOG.info("回应信息:" + returnedMessage.getReplyText());
                LOG.info("交换机:" + returnedMessage.getExchange());
                LOG.info("路由键:" + returnedMessage.getRoutingKey());
            }
        });
        return rabbitTemplate;
    }

    /**
     * Json转换器
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * Direct交换器
     */
    @Bean
    public DirectExchange directExchange()
    {
        /**
         * 创建交换器,参数说明:
         * String name:交换器名称
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         */
        return new DirectExchange(DIRECT_EXCHANGE, true, false);
    }

    /**
     * 队列
     */
    @Bean
    public Queue directQueue()
    {
        /**
         * 创建队列,参数说明:
         * String name:队列名称。
         * boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
         * 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
         * boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
         * boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
         * 当没有生产者或者消费者使用此队列,该队列会自动删除。
         * Map<String, Object> arguments:设置队列的其他一些参数。
         */
        return new Queue(DIRECT_QUEUE, true, false, false, null);
    }

    /**
     * 绑定
     */
    @Bean
    Binding directBinding(DirectExchange directExchange, Queue directQueue)
    {
        //将队列和交换机绑定, 并设置用于匹配键:routingKey路由键
        return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
    }

    /******************************延时队列******************************/

    @Bean
    public CustomExchange delayExchange()
    {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
    }

    @Bean
    public Queue delayQueue()
    {
        Queue queue = new Queue(DELAY_QUEUE, true);
        return queue;
    }

    @Bean
    public Binding delaybinding(Queue delayQueue, CustomExchange delayExchange)
    {
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
    }

}

实体对象

package top.fate.entity;

import lombok.Data;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:26
 */
@Data
public class TestEntity {

    private String username;
    private String password;

    public TestEntity(String username, String password) {
        this.username = username;
        this.password = password;
    }

    public TestEntity() {
    }
}

生产者服务接口

package top.fate.service;

import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:29
 */
public interface ProducerService {

    /**
     * 发送json格式数据
     *
     * @param o
     */
    void sendTestJson(Object o);

    /**
     * 延时发送map格式数据
     *
     * @param map
     */
    void sendDelayTestMap(Map map);
}

生产者服务实现类

package top.fate.service.impl;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.fate.config.RabbitMqConfig;
import top.fate.service.ProducerService;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:30
 */
@Service
public class ProducerServiceImpl implements ProducerService {

    private static final Logger LOG = LogManager.getLogger();

    SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送json格式数据
     *
     * @param o
     */
    @Override
    public void sendTestJson(Object o) {

        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING_KEY, o);
        LOG.info("json格式的数据发送成功 发送时间为" + formatter.format(new Date()));
    }

    /**
     * 延时发送map格式数据
     *
     * @param map
     */
    @Override
    public void sendDelayTestMap(Map map) {
        rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, RabbitMqConfig.DELAY_ROUTING_KEY, map, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {

                message.getMessageProperties().setHeader("x-delay", 5000);

                return message;
            }
        });
        LOG.info("map格式的数据发送成功 发送时间为" + formatter.format(new Date()));
    }
}

测试Controller

package top.fate.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import top.fate.entity.TestEntity;
import top.fate.service.ProducerService;

import java.util.HashMap;
import java.util.Map;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:43
 */
@RestController
@RequestMapping(value = "producer")
public class ProducerController {

    @Autowired
    private ProducerService producerService;

    @GetMapping("sendObject")
    public void sendObject(){
        producerService.sendTestJson(new TestEntity("user","123456"));
    }

    @GetMapping("sendMap")
    public void sendMap(){
        Map map = new HashMap();
        map.put("user1",new TestEntity("user1","123"));
        map.put("user2",new TestEntity("user2","123"));
        map.put("user3",new TestEntity("user3","123"));
        producerService.sendDelayTestMap(map);
    }
}

消息接收端storage-service

添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMqConfig

package top.fate.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import top.fate.service.impl.AckReceiver;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:46
 */
@Configuration
public class RabbitMqConfig {

    public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
    public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称

    /**
     * 消息接收确认处理类
     */
    @Autowired
    private AckReceiver ackReceiver;

    @Autowired
    private CachingConnectionFactory connectionFactory;

    /**
     * 客户端配置
     * 配置手动确认消息、消息接收确认
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer()
    {
        //消费者数量,默认10
        int DEFAULT_CONCURRENT = 10;

        //每个消费者获取最大投递数量 默认50
        int DEFAULT_PREFETCH_COUNT = 50;

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setConcurrentConsumers(DEFAULT_CONCURRENT);
        container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);

        // RabbitMQ默认是自动确认,这里改为手动确认消息
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        //添加队列,可添加多个队列
        container.addQueues(new Queue(DIRECT_QUEUE,true));
        container.addQueues(new Queue(DELAY_QUEUE,true));

        //设置消息处理类
        container.setMessageListener(ackReceiver);

        return container;
    }
}

消息接收接口

package top.fate.service;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;

import java.io.IOException;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:52
 */
public interface ConsumerReceiver {

    void receiverJson(Message message, Channel channel) throws IOException;

    void receiverMap(Message message, Channel channel) throws IOException;
}

消息接收实现类

package top.fate.service.impl;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Service;
import top.fate.entity.TestEntity;
import top.fate.service.ConsumerReceiver;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:54
 */
@Service
public class ConsumerReceiverImpl implements ConsumerReceiver {


    private static final Logger LOG = LogManager.getLogger();

    @Override
    public void receiverJson(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            //将JSON格式数据转换为实体对象
            TestEntity testEntity = JSON.parseObject(message.getBody(), TestEntity.class);

            LOG.info("接收者收到JSON格式消息:");
            System.out.println("账号:" + testEntity.getUsername());
            System.out.println("密码:" + testEntity.getPassword());

            /**
             * 确认消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             */
            channel.basicAck(deliveryTag, true);

            /**
             * 否定消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean multiple:是否批处理,当该参数为 true 时,
             * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            //channel.basicNack(deliveryTag, true, false);
        }
        catch (Exception e)
        {
            /**
             * 拒绝消息,参数说明:
             * long deliveryTag:唯一标识 ID。
             * boolean requeue:如果 requeue 参数设置为 true,
             * 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
             * 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
             * 而不会把它发送给新的消费者。
             */
            channel.basicReject(deliveryTag, false);

            e.printStackTrace();
        }
    }

    @Override
    public void receiverMap(Message message, Channel channel) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try
        {
            //将JSON格式数据转换为Map对象
            Map map = JSON.parseObject(message.getBody(), Map.class);

            LOG.info("接收者收到Map格式消息:");

            LOG.info(map.get("user1"));
            LOG.info(map.get("user2"));
            LOG.info(map.get("user3"));

            //确认消息
            channel.basicAck(deliveryTag, true);

            //否定消息
            //channel.basicNack(deliveryTag, true, false);
        }
        catch (Exception e)
        {
            //拒绝消息
            channel.basicReject(deliveryTag, false);

            e.printStackTrace();
        }
    }
}

消息分发处理类

package top.fate.service.impl;

import com.rabbitmq.client.Channel;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import top.fate.config.RabbitMqConfig;
import top.fate.service.ConsumerReceiver;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @auther:Wangxl
 * @Emile:18335844494@163.com
 * @Time:2022/5/26 14:50
 */
@Service
public class AckReceiver implements ChannelAwareMessageListener {

    private static final Logger LOG = LogManager.getLogger();

    /**
     * 用户消息接收类
     */
    @Autowired
    private ConsumerReceiver consumerReceiver;

    @Override
    public void onMessage(Message message, Channel channel) throws Exception
    {
        //时间格式
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        LOG.info("消息接收成功,接收时间:" + dateFormat.format(new Date()) + "\n");

        //获取队列名称
        String queueName = message.getMessageProperties().getConsumerQueue();

        //接收用户信息Json格式数据
        if (queueName.equals(RabbitMqConfig.DIRECT_QUEUE))
        {
            consumerReceiver.receiverJson(message, channel);
        }

        //延时接收用户信息Map格式数据
        if (queueName.equals(RabbitMqConfig.DELAY_QUEUE))
        {
            consumerReceiver.receiverMap(message, channel);
        }
        //多个队列的处理,则如上述代码,继续添加方法....
    }
}

启动测试

在这里插入图片描述
项目启动的时候创建交换机绑定路由key,及创建队列
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-06-01 15:02:20  更:2022-06-01 15:05:56 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 20:35:30-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码