IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> SpringBoot集成RabbitMQ---保证消息可靠性 -> 正文阅读

[Java知识库]SpringBoot集成RabbitMQ---保证消息可靠性

SpringBoot集成RabbitMQ—保证消息可靠性

在我们使用RabbitMQ消息队列时,使用生产者发送消息,可能出现发送失败,rabbitMQ宕机,一个消息重复发送等问题,一旦出现这种问题,如果不进行相应处理,就可能导致消息丢失,消费者重复消费一条信息等问题,消息就变得不可靠了,进而影响我们的业务逻辑。所以我们要对可能出现的问题进行相应的操作,来保证消息的可靠性。下面我们来介绍在生产端实现消息可靠性投递消费端幂等性操作两种实现方式。

本篇文章是springboot集成RabbitMQ中保证消息可靠性的一种方案,还有其他的高可用方案(大家可以去网上study一下)

一、生产端消息可靠性投递

生产端消息可靠性传递是为了解决消息投递时丢失的问题,导致这种问题的原因不单一,比如rabbitMQ宕机,投递过程中交换机发生问题等。

生产端消息可靠性投递实现方案:

数据库中核心存储消息的唯一id,消息状态,重试次数,重试时间

如果生产者判定消息发送失败,则进行重试(重新发送),并设置重试时间(也就是超过这个时间才重新发送,这样可以尽量排除因系统延迟原因导致生产者判定消息发送失败),当然不能无限重试,要设定最大重试次数,如果超过最大重试次数,就判定为发送失败,不能重试

发送消息前,存储消息状态(投递中),然后生产者发送消息给MQ,若生产者监听到MQ的接收确认,则更改数据库中消息的状态为发送成功;若监听到MQ的接收拒绝,则更改数据库中的消息状态为发送失败(可以加上其他业务)。

开启一个定时任务,获取所有投递中,并且重试时间小于当前时间的消息集合。然后遍历这个消息集合,判断重试次数是否超过最大可重试次数,如果超过最大重试次数,就判定为发送失败,不能重试;如果不超过,消息的重试次数+1,修改时间为当前时间,重试时间为当前时间加上消息超时时间,重新发送消息;

image-20220429165419752

step1:消息数据进行存储

step2:发送消息

step3:生产者监听到MQ的确认消息

step4:更改数据库中消息的状态

step5:定时任务(定时去获取数据库中消息状态为投递中状态,并且重试时间小于当前时间的消息)

step6:如果重试次数不超过3次,消息的重试次数+1,更新重试时间为当前时间+指定超时时间

step7:如果重试次数超过3次,更新状态为投递失败,不能重试

代码实现

为了不发生错乱 先写一下代码中会用到的参数

public class MailConstants {

    //消息投递中
    public static final Integer DELIVERING = 0;
    //消息投递成功
    public static final Integer SUCCESS = 1;
    //消息投递失败
    public static final Integer FAILURE = 2;
    //最大重试次数
    public static final Integer MAX_TRY_COUNT = 3;
    //消息超时时间 单位min
    public static final Integer MSG_TIMEOUT = 1;
    //队列
    public static final String MAIL_QUEUE_NAME = "mail.queue";
    //交换机
    public static final String MAIL_EXCHANGE_NAME = "mail.exchange";
    //路由键
    public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key";
}
  1. 消息落库,向rabbitMq发送消息前,把要发送的消息持久化,即保存到数据库中,核心参数:唯一的消息id,消息状态(消息投递中,投递成功,投递失败),重试次数,重试时间;

    public class MailLog implements Serializable {
    
        private static final long serialVersionUID = 1L;
    
        @ApiModelProperty(value = "消息id")
        private String msgId;
    
        @ApiModelProperty(value = "接收员工id")
        private Integer eid;
    
        @ApiModelProperty(value = "状态(0:消息投递中 1:投递成功 2:投递失败)")
        private Integer status;
    
        @ApiModelProperty(value = "路由键")
        private String routeKey;
    
        @ApiModelProperty(value = "交换机")
        private String exchange;
    
        @ApiModelProperty(value = "重试次数")
        private Integer count;
    
        @ApiModelProperty(value = "重试时间")
        private LocalDateTime tryTime;
    
        @ApiModelProperty(value = "创建时间")
        private LocalDateTime createTime;
    
        @ApiModelProperty(value = "更新时间")
        private LocalDateTime updateTime;
    
    
    }
    
  2. 发送消息时带上唯一的消息id

//emp为实体类(传递的内容),msgId为唯一消息ID(
String msgId = UUID.randomUUID().toString();
rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,emp,new CorrelationData(msgId));
  1. 生产端配置RabbitTemplate,配置MQ的消息接收确认回调和消息接收拒绝回调

    @Bean
        public RabbitTemplate rabbitTemplate(){
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
            /**
             * 消息接收确认回调,确认消息是都到达broker
             * data:消息唯一标识
             * ack:确认结果
             * cause:失败原因
             */
            rabbitTemplate.setConfirmCallback((data,ack,cause)->{
                String msgId = data.getId();
                if (ack){
                    LOGGER.info("{}=====>消息发送成功",msgId);
                    mailLogService.update(new UpdateWrapper<MailLog>().set("status",1).eq("msgId",msgId));
                }else{
                    LOGGER.error("{}=====>消息发送失败",msgId);
                }
            });
    
            /**
             * 消息接收拒绝回调,比如router不到queue时回调
             * msg:消息主体
             * repCode:响应码
             * repTest:相应描述
             * exchange:交换机
             * routingkey:路由键
             */
            rabbitTemplate.setReturnCallback((msg,repCode,repText,exchange,routingkey)->{
                LOGGER.error("{}=====>消息发送queue时失败",msg.getBody());
            });
            return rabbitTemplate;
        }
    
  2. 开启定时任务,获取所有投递中,并且重试时间小于当前时间的消息集合。然后遍历这个消息集合,判断重试次数是否超过最大可重试次数

    	/**
         * 10s执行1次
         */
        @Scheduled(cron = "0/10 * * * * ?")
        public void mailTask(){
            //获取所有投递中的消息 并且重试时间小于当前时间
            List<MailLog> list = mailLogService.list(new QueryWrapper<MailLog>()
                    .eq("status", 0).lt("tryTime", LocalDateTime.now()));
    
            list.forEach(mailLog -> {
                //如果重试次数超过3次,更新状态为投递失败,不能重试
                if (mailLog.getCount()>= MailConstants.MAX_TRY_COUNT){
                    mailLogService.update(new UpdateWrapper<MailLog>().set("status",2).eq("msgId",mailLog.getMsgId()));
                }
                //消息的重试次数+1,修改时间为当前时间,重试时间为当前时间加上消息超时时间
                mailLogService.update(new UpdateWrapper<MailLog>().set("count",mailLog.getCount()+1).set("updateTime",
                        LocalDateTime.now()).set("tryTime",LocalDateTime.now().plusMinutes(MailConstants.MSG_TIMEOUT))
                        .eq("msgId",mailLog.getMsgId()));
                //获取员工的id
                Employee emp = employeeService.getEmployee(mailLog.getEid()).get(0);
                //发送消息
                rabbitTemplate.convertAndSend(MailConstants.MAIL_EXCHANGE_NAME,MailConstants.MAIL_ROUTING_KEY_NAME,emp,
                        new CorrelationData(mailLog.getMsgId()));
            });
    
        }
    

二、消费端幂等性操作

幂等性:通俗的说就是一个接口,多次发起同一个请求,必须保证操作只能执行一次

有这么一种情况:当刚刚执行完发送消息给MQ,MQ接收到了,但是数据库中的消息状态还未来得及更改为已接收,此时定时任务刚好执行,那么就会判定这条消息发送失败,需要重新发送。这种情况下,这一条消息就给MQ发送了两次,很显然,会对系统的业务逻辑造成影响。我们需要在消费端进行幂等性操作,具体步骤如下:

消费端幂等性操作方案

当消费端监听到一条消息时,去redis中查找该消息的唯一id,若为查找到,则证明第一次接收这条消息,并在redis中保存这条消息的唯一id,回调接收确认;若查找到,则证明不是第一次接收这条消息,回调接收失败;

代码实现

  1. rabbitmq必须开启手动确认

    rabbitmq:
        listener:
          simple:
            # 开启手动确认
            acknowledge-mode: manual
    
  2. 消费端监听时 若接收到一条消息,去redis中查找该消息的唯一id,若为查找到,则证明第一次接收这条消息,并在redis中保存这条消息的唯一id,回调接收确认;若查找到,则证明不是第一次接收这条消息,回调接收失败;

    @RabbitListener(queues = MailConstants.MAIL_QUEUE_NAME)
        public void handler(Message message, Channel channel){
            Employee employee = (Employee) message.getPayload();
            MessageHeaders headers = message.getHeaders();
            //消息序号
            long tag = (long) headers.get(AmqpHeaders.DELIVERY_TAG);
            //headers.get("spring_returned_message_correlation")获取的是发送消息时 加上的new CorrelationData(msgId)里的msgId
            String msgId = (String) headers.get("spring_returned_message_correlation");
            HashOperations hashOperations = redisTemplate.opsForHash();
    
            try {
                if (hashOperations.entries("mail_log").containsKey(msgId)){
                    LOGGER.error("消息已经被消费=======>{}",msgId);
                    /**
                     * 手动确认消息
                     * tag:消息序号
                     * multiple:是否确认多条
                     */
                    channel.basicAck(tag,false);
                    return;
                }
    
    
                MimeMessage mimeMessage = javaMailSender.createMimeMessage();
                MimeMessageHelper helper = new MimeMessageHelper(mimeMessage);
                //发件人
                helper.setFrom(mailProperties.getUsername());
                //收件人
                helper.setTo(employee.getEmail());
                //主题
                helper.setSubject("入职欢迎邮件");
                //发送日期
                helper.setSentDate(new Date());
                //邮件内容
                Context context = new Context();
                context.setVariable("name",employee.getName());
                context.setVariable("posName",employee.getPosition().getName());
                context.setVariable("joblevelName",employee.getJoblevel().getName());
                context.setVariable("departmentName",employee.getDepartment().getName());
                String mail = templateEngine.process("mail",context);
                helper.setText(mail,true);
                //发送邮件
                javaMailSender.send(mimeMessage);
                LOGGER.info("邮件发送成功");
                //将消息id存入redis
                hashOperations.put("mail_log",msgId,"OK");
                //手动确认消息
                channel.basicAck(tag,false);
    
            } catch (Exception e) {
                /**
                 * 手动确认消息 拒绝
                 * tag:消息序号
                 * multiple:是否确认多条
                 * requeue:是否退回队列
                 */
                try {
                    channel.basicNack(tag,false,true);
                } catch (IOException ioException) {
                    LOGGER.error("邮件发送失败=======》{}",e.getMessage());
                }
                LOGGER.error("邮件发送失败=======》{}",e.getMessage());
            }
        }
    
  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-05-01 15:33:33  更:2022-05-01 15:34:39 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/24 2:06:00-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码