IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> Java知识库 -> RabbitMq+死信队列+邮箱通知的SpringBoot实战 -> 正文阅读

[Java知识库]RabbitMq+死信队列+邮箱通知的SpringBoot实战

项目场景:

公司项目中有一段业务并发访问量大,lz思考耗时逻辑代码交予消息中间件RabbitMq异步处理,但是如果这段耗时逻辑报异常了怎么办,这时可以想到使用死性队列,那死性队列中的异常消息怎么处理呢,lz想到了使用邮箱通知来通知管理人员,管理人员处理好异常后对异常消息进行手动转移至主业务队列达到解决问题的目的。废话不多说,思路有了开始实战:


RabbitMq的安装与环境配置

工具的安装配置就不多讲了,可以网上自行搜索


引入依赖

?<dependency>
????????<groupId>org.springframework.boot</groupId>
????????<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

项目中的yml的RabbitMq配置,邮箱配置(包含我自己对参数的简单理解和介绍):

spring:?

??rabbitmq:
? ? addresses: localhost:5672
? ? username: guest
? ? password: guest
? ? listener:
? ? ? simple:
? ? ? ? concurrency: 5 #并发设置
? ? ? ? max-concurrency: 10 #最大并发
? ? ? ? acknowledge-mode: manual #设置的手动确认
? ? ? ? prefetch: 1 #消费端限流
? ? ? ? retry:
? ? ? ? ? enabled: true # 开启消费者失败重试
? ? ? ? ? initial-interval: 1000ms # 初识的失败等待时长为1秒
? ? ? ? ? multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
? ? ? ? ? max-attempts: 3 # 最大重试次数
? ? ? ? ? stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
? ? ? direct:
? ? ? ? acknowledge-mode: manual?#设置的手动确认
? ? connection-timeout: 15000 #连接超时
? ? virtual-host: / #将用户分配给mq的虚拟主机
? ? publisher-confirm-type: correlated #发布消息成功到交换器后会触发回调方法,默认是? ? ? ?????????????????????#node禁用触发,simple较为复杂没用过,含有逻辑判断在里面有以上两种效果
? ? publisher-returns: true #确保消息在未被队列接收时返回
? ? template:
? ? ? mandatory: true #AMQP的标识位:true表示不达目的的消息返回生产者,false则扔掉消息

?mail: ? #发送邮箱的账号
? ? host: smtp.126.com
? ? username: yourEmail@126.com
? ? password: yourPassword
? ? port: 465
? ? receive: defineEmailRecever #定义短信接收人
? ? properties:
? ? ? mail:
? ? ? ? smtp:
? ? ? ? ? auth: true #SMTP 服务器是否需要用户认证,默认为 false
? ? ? ? ? starttls:
? ? ? ? ? ? enable: true #启用STARTTLS命令,建立连接发送
? ? ? ? ? ? required: true
? ? ? ? ? socketFactory:
? ? ? ? ? ? port: 465
? ? ? ? ? ? class: javax.net.ssl.SSLSocketFactory #通过设置该属性可以覆盖提供者默认的实? ? ? ? ? ? ? ? ? ? ? ? #现,必须实现javax.NET.SocketFactory接口
? ? ? ? ? ? fallback: false #默认为 true,当使用指定的socket 类创建 socket 失败后,将使用? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???????#Java.net.Socket 创建socket,


RabbitMq配置:

1、配置mq的消息回调问题,正常业务交换机和队列以及死信交换机、死信队列,和配置正常队列未确认消息转发死信队列关系

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class DeadConfig {

    /**
     * rabbitmq对象,多例模式,用来解决confirm机制时的一个消费一个回调的问题(消息回调)
     */
    @Bean
    @Scope("prototype")
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setMandatory(true);
        template.setMessageConverter(new SerializerMessageConverter());
        return template;
    }

    /**
     * 业务交换机,开启持久化
     */
    @Bean
    TopicExchange examinationExchange() {
        return new TopicExchange("examinationExchange", true, false);
    }

    @Bean
    public Queue examinationQueue() {
        // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然            
        //存在,暂存队列:当前连接有效
        // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。 
        //此参考优先级高于durable
        // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        Map<String, Object> args = deadQueueArgs();
        return new Queue("examinationQueue", true, false, false, args);
    }

    /**
     *交换机和队列绑定关系
     */
    @Bean
    Binding examinationRouteBinding() {
        return BindingBuilder.bind(examinationQueue()).to(examinationExchange()).with("examination.*");
    }
    
    /* 死信配置 **********************************************************************************************************/

    /**
     * 异常交接死信交换机
     */
    @Bean
    DirectExchange deadExchange() {
        return new DirectExchange("deadExchange", true, false);
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue deadQueue() {
        return new Queue("deadQueue", true, false, false);
    }

    @Bean
    Binding deadRouteBinding() {
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("deadRouting");
    }

    /**
     * 转发到 死信队列,配置参数
     */
    private Map<String, Object> deadQueueArgs() {
        Map<String, Object> map = new HashMap<>();
        // 绑定该队列到私信交换机
        map.put("x-dead-letter-exchange", "deadExchange");
        map.put("x-dead-letter-routing-key", "deadRouting");
        return map;
    }


}

定义生产者:

1、发送指定交换机,定义路由key

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

@Slf4j
@Component
public class SendExamAnswerId implements RabbitTemplate.ConfirmCallback {
//自动注入发送消息的模板

    private RabbitTemplate rabbitTemplate;

    @Autowired
    public SendExamAnswerId(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    public void send(Map<String, String> map) {
        log.info("***********************RabbitMQ开始发送消息*************************");
        CorrelationData correlationData = new CorrelationData();//correlationData消息唯一id
        correlationData.setId(UUID.randomUUID().toString());
        this.rabbitTemplate.convertAndSend("examinationExchange",//exchange
                "examination.exam",//routingKey 路由key 在rabbitmq中交换机Rount Key对应的值
                map, //消息体内容
                correlationData);//correlationData消息唯一id
        log.info("***********************RabbitMQ发送消息完成*************************");
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm: " + correlationData.getId());
    }

}

定义业务消费者和死信消费者:

1、业务消费者,这里解释一下我定义了一个ConcurrentHashMap,为了处理业务发生异常时,将该消息重新归队,在指定次数下重新消费(lz定义了三次),消费三次还是异常则拒绝确认进入死信队列

import xxxx.commons.enums.CommonExEnum;
import xxxx.commons.exception.BusinessException;
import xxxx.commons.util.Constants;
import xxxx.service.ExamAnswerService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;


@Slf4j
@Component
public class ReceiverExamAnswer {
    @Autowired
    private ExamAnswerService examAnswerService;

    private final Map<Long, Integer> counterMap=new ConcurrentHashMap<>();

    /**
     * 如果抛异常(basicNack拒绝消息确认,参数requeue=true表示异常的生产消息重    
     * 新投递)
     * 拒绝消息确认三次之后,通过死信交换机投递给死信队列,然后邮件通知管理员进行异常消息处理 
     *(调整代码后
     * ,移动异常信息投递回examinationQueue进行重新消费)
     * @param map
     * @param heads
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues = "examinationQueue")
    @RabbitHandler//@RabbitHandler 只是标识方法如果有消息过来消费者要消费的时候调用这个方法
    public void onOrderMessage(@Payload Map<String, String> map,
                               @Headers Map<String, Object> heads,
                               Channel channel) throws Exception {
        Long deliverTag = (Long) heads.get(AmqpHeaders.DELIVERY_TAG);//唯一标识ID
        try {
            //消费者操作
            log.info("*****************收到消息,开始消费***************");
            if (map != null) {
                if (map.size() == 0) {
                    throw new BusinessException(CommonExEnum.EXAM_NOT_EXIST);
                }
                String examNumber = map.get("examNumber");
                String candidateNumber = map.get("candidateNumber");
                String uniqueId = map.get("uniqueId");
                try {
                    //examAnswerService.correctExam(examNumber, candidateNumber);
                    //制作异常测试
                    int a = 1/0;
                    channel.basicAck(deliverTag, false);
                } catch (Exception e) {
                    log.error("***发生异常,异常信息:{}***", 
                    e.getMessage());
                    Long errorTagId = Long.valueOf(uniqueId);
                    Integer count = counterMap.get(errorTagId);
                    if (Objects.isNull(count)){
                        counterMap.put(errorTagId, 1);
                    }else{
                        counterMap.put(errorTagId, count + 1);
                    }
                    log.info("重试次数:{}",counterMap.get(errorTagId));
                    //尝试三次发生异常则进入死信队列
                    if( counterMap.get(errorTagId) >= 3){
                        channel.basicNack(deliverTag,false,false);
                    }else{
                        channel.basicNack(deliverTag,false,true);
                    }
                }
                log.info("*****完成*****");
            }else{
                channel.basicNack(deliverTag,false,false);
            }
        } catch (Throwable e) {
            channel.basicNack(deliverTag,false,false);
        }
    }
}

2、死信队列消费者

import xxxxxx.service.DailyService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Slf4j
@Component
public class DeadConsumer {

    @Autowired
    private DailyService dailyService;

    @Value("${spring.mail.receive}")
    private String receive;

    @RabbitListener(queues = "deadQueue")
    @RabbitHandler
    public void dlxListener(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException, TimeoutException {
        System.out.println("xxxx死信队列收到消息 : " + message.toString() + ",退回待确认消息的唯一标识是 : " + mqMsg.getMessageProperties().getCorrelationId());
        //邮箱通知
        dailyService.sendSimpleMail("xxxx异常","请联系管理员,消息异常信息为:"+message.toString(),receive,null);
        channel.close();
    }

}

邮箱业务实现

import xxxxx.service.DailyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.SimpleMailMessage;

import javax.annotation.Resource;


@Slf4j
@Service
public class DailyServiceImpl implements DailyService {
    @Resource
    protected JavaMailSender javaMailSender;

    @Value("${spring.mail.username}")
    protected String from;

    /**
     * 发送纯文本格式的邮件
     * @param sub 主题
     * @param content 内容
     * @param to_list 发送人
     * @param cc_list 抄送人
     */
    @Override
    public void sendSimpleMail(String sub, String content, String to_list, String cc_list) {
        //创建邮件内容
        SimpleMailMessage message = new SimpleMailMessage();
        message.setFrom(from); //这里指的是发送者的账号
        message.setTo(to_list.split(","));
        message.setCc(cc_list==null?null:cc_list.split(","));
        message.setSubject(sub);
        message.setText(content);
        //发送邮件
        javaMailSender.send(message);
        log.info(sub+"发送邮件成功");
    }


}

业务代码中使用:

1、这段代码是自己业务中,传递参数给异步方法类对象,异步方法类中注入生产者对象将参数作为消息发送给消费者,myAsync是自定义异步配置注解,异步配置使用的是spring的自带的线程池接口类ThreadPoolTaskExecutor,用它的原因是因为Spring提供了xml给我们配置ThreadPoolTaskExecutor线程池,但在SpringBoot开发项目,所以直接上yaml或者properties配置即可,达到了可以实现线程的复用,而且还能控制好线程数,写出更好的多线程并发程序的目的。

/**
 * 这段代码是自己业务中,传递参数给异步方法类对象,
 * 异步方法类中注入生产者对象将参数作为消息发送给消费者
 **/
@Autowired
private AsyncMethod asyncMethod;

public ResponseResult yourMethod(String examNumber, String candidateNumber) {
    .................//中间是自己的业务逻辑
    Map<String, String> map = new HashMap<>();
    map.put("examNumber", examNumber);
    map.put("candidateNumber", candidateNumber);
    map.put("uniqueId", UUID.randomUUID().getMostSignificantBits()+"");
    asyncMethod.sendExamAnswer(map);//将前端传过来的参数map封装后交给异步方法
}
/**
 * 异步方法
 **/
@Slf4j
@Component
@Lazy
public class AsyncMethod {

    @Autowired
    private SendExamAnswerId sendExamAnswer;

    @Async(value = "myAsync")
    public void sendExamAnswer(Map<String, String> map) {
        sendExamAnswer.send(map);
    }

}
@Configuration
@EnableAsync
public class AsyncConfig {
    
    @Value("${spring.task.execution.pool.core-size}")
    private int corePoolSize;
    @Value("${spring.task.execution.pool.max-size}")
    private int maxPoolSize;
    @Value("${spring.task.execution.pool.queue-capacity}")
    private int queueCapacity;
    @Value("${spring.task.execution.thread-name-prefix}")
    private String namePrefix;
    @Value("${spring.task.execution.pool.keep-alive}")
    private int keepAliveSeconds;

    @Bean
    public Executor myAsync() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        //核心线程数
        executor.setCorePoolSize(corePoolSize);
        //任务队列的大小
        executor.setQueueCapacity(queueCapacity);
        //线程前缀名
        executor.setThreadNamePrefix(namePrefix);
        //线程存活时间
        executor.setKeepAliveSeconds(keepAliveSeconds);
        //线程名称前缀
        String threadNamePrefix = "MyExecutor-";
        executor.setThreadNamePrefix(threadNamePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

}

运行项目:(在定义业务消费者的时候,我制作by zero异常,检验代码运行结果)

1、控制台显示:可以发现异常消息重新投递了三次去消费,死信队列也接收到了,并且邮件发送成功了。

管理员处理死信

1、代码调整后,进入死信队列,将消息重新投递给业务队列中进行消费。

?结束:不好勿喷,希望对大家有所帮助,有所启发!!!!!!!

  Java知识库 最新文章
计算距离春节还有多长时间
系统开发系列 之WebService(spring框架+ma
springBoot+Cache(自定义有效时间配置)
SpringBoot整合mybatis实现增删改查、分页查
spring教程
SpringBoot+Vue实现美食交流网站的设计与实
虚拟机内存结构以及虚拟机中销毁和新建对象
SpringMVC---原理
小李同学: Java如何按多个字段分组
打印票据--java
上一篇文章      下一篇文章      查看所有文章
加:2022-07-17 16:07:52  更:2022-07-17 16:10:27 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/23 15:52:32-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码