IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 用rabbitmq实现消息重发的功能 -> 正文阅读

[大数据]用rabbitmq实现消息重发的功能

前言:

在开发工作中,有很多时候会遇到要把数据同步给其他部门或三方的场景,这个时候光写一个同步接口是不太稳定的,因为有很多因素会导致同步接口运行失败或未运行,比如调接口之前的代码出现了bug,异常被throws或被catch,没有往下走。再比如对方接收代码出现问题,或者网络问题,接口没通,同步失败。

遇到上面同步失败的情况,就会影响到业务的正常使用了,本文只讨论第二种调用失败的情况(第一种情况可以把同步代码封装起来,提供一个接口出来用于手动调用hhhh,很笨但是很救命的办法),所以必须要加入重发机制,来让程序更加的健壮。

因为项目中也用到了rabbitmq,所以第一时间就想到了死信队列,之前的一种实现方式是使用死信队列的超时时间特性,第一次失败后,把参数放入死信队列,但是参数的bean中要有一个记录次数的值,第一次放的时候set为1。在消息超时后放入死信队列,被监听到时,再去调用接口,如果失败了,就按照次数去计算下一次执行的时间,然后重新放入到参数的bean中,再把bean重新放入到正常消费队列中,直到下一次消息超时被死信队列接收。不过这样的是要在代码中设置一个最大循环次数的,否则调用不通的情况下,会一直循环。如果成功了,那么我们就手动调用下 channel.basicAck 去手动签收一下(这里是要在配置中把自动签收改成手动签收)。

spring.rabbitmq.listener.simple.acknowledge-mode=manual 手动签收

这种方式虽然实现了功能,但是确增大了代码量,尤其是需要增添2个队列(一个正常消费队列,一个消费队列绑定的死信队列),不太方便扩展,而且也增加了复杂度,所以不太推荐这样的写法(代码就不贴了,如果有人感兴趣,可以给我留言,到时候我再更新)。

为了优化掉现有代码,于是我就又重新研究了一下,发现不就是重发么,rabbitmq早就给我们准备好了,-_-|| 自己这又是死信又是手动签收的,,,一顿操作,确实浪费了不少功夫。


实现过程:

那下面我们来看一下是如何实现的。

首先写贴下配置

##rabbit地址
spring.rabbitmq.addresses=amqp://guest:guest@localhost:5672
# 开启重发
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重发次数
spring.rabbitmq.listener.simple.retry.max-attempts=10
# 重试间隔时间 单位毫秒
spring.rabbitmq.listener.simple.retry.initial-interval=3000ms
# 重试最大间隔时间 单位毫秒
spring.rabbitmq.listener.simple.retry.max-interval=86400000ms
# 重发间隔因子 间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
spring.rabbitmq.listener.simple.retry.multiplier=2

开启rabbitmq的重发机制,并且设置好重试间隔时间(这个间隔时间应该是第一次到第二次的间隔时间,往后的间隔时间是通过间隔因子算的),以及最大间隔时间(避免出现无限重试的问题),还有重要的间隔因子,这样保证了每次的间隔时间是成比例增长的。

配置好后,接下来就要声明我们所用到的队列

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * rabbitmq重发配置
 */
@Configuration
public class RepeatSendRabbitmqConfig {
    /**
     * 正常队列
     */
    public final static String REPEAT_QUEUE = "repeat_queue";
    /**
     * 交换机
     */
    public final static String REPEAT_EXCHANGE = "repeat_exchange";
    /**
     * 路由键
     */
    public final static String REPEAT_ROUTING_KEY = "repeat_routing_key";


    /**
     * 声明队列
     */
    @Bean
    Queue repeatQueue() {
        return QueueBuilder.durable(REPEAT_QUEUE).build();
    }

    /**
     * 声明交换机
     * @return
     */
    @Bean
    DirectExchange repeatExchange() {
        return new DirectExchange(REPEAT_EXCHANGE);
    }

    /**
     * 将队列和交换机进行绑定
     * @param repeatQueue
     * @param repeatExchange
     * @return
     */
    @Bean
    Binding dlxBinding(Queue repeatQueue, DirectExchange repeatExchange) {
        return BindingBuilder.bind(repeatQueue).to(repeatExchange).with(REPEAT_ROUTING_KEY);
    }

}

我们把队列,交换机,路由键都声明好后,下一步就要写接收代码了。

首先我们要写好生产者,也就是把消息放入到消息队列中的那步。


import com.google.gson.Gson;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

/**
 * 生产者,将消息放入队列
 */
@Component
public class MqSender {
    Gson gson = new Gson();

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 将消息放入队列
     * @param request
     */
    public void repeatSend(RequestBean request){
        log.err("repeat request " + gson.toJson(request),"接收时间:" + LocalDateTime.now());
        rabbitTemplate.convertAndSend(RepeatSendRabbitmqConfig.REPEAT_EXCHANGE,
                RepeatSendRabbitmqConfig.REPEAT_ROUTING_KEY,request);
    }
}

注意了,repeatSend这个方法,只运行一次,这里并没有指定超时时间,仅仅是传入了exchange和routing key,通过之前的绑定,就能定位到是哪个队列,然后把参数放到队列中。

(这里有一个小坑,之前是使用的 AmqpTemplate amqpTemplate 这个接口来做数据存放,但是用了其中的方法,在测试的时候偶尔不会消费,,不知道什么原因,为了赶工,就改成使用 RabbitTemplate rabbitTemplate 了,有知道的小伙伴可以留言讨论一下)。

前期准备工作好后,最后一步就是拿来消费了。


import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import java.time.LocalDateTime;


/**
 * 接收者,消费者
 */
@Component
public class MqReceiver {

    Gson gson = new Gson();

    @Autowired
    private RemoteService remoteService;


    /**
     * 同步接口重发队列实现
     * @param request
     */
    @RabbitListener(queues = RepeatSendRabbitmqConfig.REPEAT_QUEUE)
    public void ListenRepeatSend(RequestBean request){
        log.err(" ListenRepeatSend request " + gson.toJson(request),
                "同步时间 " + LocalDateTime.now());
        BaseResp response = remoteService.send(request);
        // 重新同步,失败后抛异常重试
        if(!"SUCCESS".equals(response.getCode())){
            throw new RuntimeException();
        }
    }

}

使用 @RabbitListener 指定监听队列,那么这个队列就会被这个消费者所监听了。把参数传入我们自己的同步方法中,如果失败了,我们就抛异常出去,不用做其他的任何操作,只需要抛出去,rabbitmq就会按照配置的时间,以及间隔,来重新执行方法了,直到不抛异常,或者超过了配置中的最大时间,就停止重复执行了。

最后,使用生产者时,直接在代码中直接调用一下就好。

至此,我们的功能就整改完成了。


  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-07-23 10:51:49  更:2021-07-23 10:54:23 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/15 13:42:20-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码