项目场景:
公司项目中有一段业务并发访问量大,lz思考耗时逻辑代码交予消息中间件RabbitMq异步处理,但是如果这段耗时逻辑报异常了怎么办,这时可以想到使用死性队列,那死性队列中的异常消息怎么处理呢,lz想到了使用邮箱通知来通知管理人员,管理人员处理好异常后对异常消息进行手动转移至主业务队列达到解决问题的目的。废话不多说,思路有了开始实战:
RabbitMq的安装与环境配置
工具的安装配置就不多讲了,可以网上自行搜索
引入依赖
?<dependency> ????????<groupId>org.springframework.boot</groupId> ????????<artifactId>spring-boot-starter-amqp</artifactId> </dependency>
项目中的yml的RabbitMq配置,邮箱配置(包含我自己对参数的简单理解和介绍):
spring:?
??rabbitmq: ? ? addresses: localhost:5672 ? ? username: guest ? ? password: guest ? ? listener: ? ? ? simple: ? ? ? ? concurrency: 5 #并发设置 ? ? ? ? max-concurrency: 10 #最大并发 ? ? ? ? acknowledge-mode: manual #设置的手动确认 ? ? ? ? prefetch: 1 #消费端限流 ? ? ? ? retry: ? ? ? ? ? enabled: true # 开启消费者失败重试 ? ? ? ? ? initial-interval: 1000ms # 初识的失败等待时长为1秒 ? ? ? ? ? multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval ? ? ? ? ? max-attempts: 3 # 最大重试次数 ? ? ? ? ? stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false ? ? ? direct: ? ? ? ? acknowledge-mode: manual?#设置的手动确认 ? ? connection-timeout: 15000 #连接超时 ? ? virtual-host: / #将用户分配给mq的虚拟主机 ? ? publisher-confirm-type: correlated #发布消息成功到交换器后会触发回调方法,默认是? ? ? ?????????????????????#node禁用触发,simple较为复杂没用过,含有逻辑判断在里面有以上两种效果 ? ? publisher-returns: true #确保消息在未被队列接收时返回 ? ? template: ? ? ? mandatory: true #AMQP的标识位:true表示不达目的的消息返回生产者,false则扔掉消息
?mail: ? #发送邮箱的账号 ? ? host: smtp.126.com ? ? username: yourEmail@126.com ? ? password: yourPassword ? ? port: 465 ? ? receive: defineEmailRecever #定义短信接收人 ? ? properties: ? ? ? mail: ? ? ? ? smtp: ? ? ? ? ? auth: true #SMTP 服务器是否需要用户认证,默认为 false ? ? ? ? ? starttls: ? ? ? ? ? ? enable: true #启用STARTTLS命令,建立连接发送 ? ? ? ? ? ? required: true ? ? ? ? ? socketFactory: ? ? ? ? ? ? port: 465 ? ? ? ? ? ? class: javax.net.ssl.SSLSocketFactory #通过设置该属性可以覆盖提供者默认的实? ? ? ? ? ? ? ? ? ? ? ? #现,必须实现javax.NET.SocketFactory接口 ? ? ? ? ? ? fallback: false #默认为 true,当使用指定的socket 类创建 socket 失败后,将使用? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ???????#Java.net.Socket 创建socket,
RabbitMq配置:
1、配置mq的消息回调问题,正常业务交换机和队列以及死信交换机、死信队列,和配置正常队列未确认消息转发死信队列关系
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.SerializerMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Scope;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DeadConfig {
/**
* rabbitmq对象,多例模式,用来解决confirm机制时的一个消费一个回调的问题(消息回调)
*/
@Bean
@Scope("prototype")
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMandatory(true);
template.setMessageConverter(new SerializerMessageConverter());
return template;
}
/**
* 业务交换机,开启持久化
*/
@Bean
TopicExchange examinationExchange() {
return new TopicExchange("examinationExchange", true, false);
}
@Bean
public Queue examinationQueue() {
// durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然
//存在,暂存队列:当前连接有效
// exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。
//此参考优先级高于durable
// autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
Map<String, Object> args = deadQueueArgs();
return new Queue("examinationQueue", true, false, false, args);
}
/**
*交换机和队列绑定关系
*/
@Bean
Binding examinationRouteBinding() {
return BindingBuilder.bind(examinationQueue()).to(examinationExchange()).with("examination.*");
}
/* 死信配置 **********************************************************************************************************/
/**
* 异常交接死信交换机
*/
@Bean
DirectExchange deadExchange() {
return new DirectExchange("deadExchange", true, false);
}
/**
* 死信队列
*/
@Bean
public Queue deadQueue() {
return new Queue("deadQueue", true, false, false);
}
@Bean
Binding deadRouteBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("deadRouting");
}
/**
* 转发到 死信队列,配置参数
*/
private Map<String, Object> deadQueueArgs() {
Map<String, Object> map = new HashMap<>();
// 绑定该队列到私信交换机
map.put("x-dead-letter-exchange", "deadExchange");
map.put("x-dead-letter-routing-key", "deadRouting");
return map;
}
}
定义生产者:
1、发送指定交换机,定义路由key
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Component
public class SendExamAnswerId implements RabbitTemplate.ConfirmCallback {
//自动注入发送消息的模板
private RabbitTemplate rabbitTemplate;
@Autowired
public SendExamAnswerId(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
public void send(Map<String, String> map) {
log.info("***********************RabbitMQ开始发送消息*************************");
CorrelationData correlationData = new CorrelationData();//correlationData消息唯一id
correlationData.setId(UUID.randomUUID().toString());
this.rabbitTemplate.convertAndSend("examinationExchange",//exchange
"examination.exam",//routingKey 路由key 在rabbitmq中交换机Rount Key对应的值
map, //消息体内容
correlationData);//correlationData消息唯一id
log.info("***********************RabbitMQ发送消息完成*************************");
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm: " + correlationData.getId());
}
}
定义业务消费者和死信消费者:
1、业务消费者,这里解释一下我定义了一个ConcurrentHashMap,为了处理业务发生异常时,将该消息重新归队,在指定次数下重新消费(lz定义了三次),消费三次还是异常则拒绝确认进入死信队列
import xxxx.commons.enums.CommonExEnum;
import xxxx.commons.exception.BusinessException;
import xxxx.commons.util.Constants;
import xxxx.service.ExamAnswerService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class ReceiverExamAnswer {
@Autowired
private ExamAnswerService examAnswerService;
private final Map<Long, Integer> counterMap=new ConcurrentHashMap<>();
/**
* 如果抛异常(basicNack拒绝消息确认,参数requeue=true表示异常的生产消息重
* 新投递)
* 拒绝消息确认三次之后,通过死信交换机投递给死信队列,然后邮件通知管理员进行异常消息处理
*(调整代码后
* ,移动异常信息投递回examinationQueue进行重新消费)
* @param map
* @param heads
* @param channel
* @throws Exception
*/
@RabbitListener(queues = "examinationQueue")
@RabbitHandler//@RabbitHandler 只是标识方法如果有消息过来消费者要消费的时候调用这个方法
public void onOrderMessage(@Payload Map<String, String> map,
@Headers Map<String, Object> heads,
Channel channel) throws Exception {
Long deliverTag = (Long) heads.get(AmqpHeaders.DELIVERY_TAG);//唯一标识ID
try {
//消费者操作
log.info("*****************收到消息,开始消费***************");
if (map != null) {
if (map.size() == 0) {
throw new BusinessException(CommonExEnum.EXAM_NOT_EXIST);
}
String examNumber = map.get("examNumber");
String candidateNumber = map.get("candidateNumber");
String uniqueId = map.get("uniqueId");
try {
//examAnswerService.correctExam(examNumber, candidateNumber);
//制作异常测试
int a = 1/0;
channel.basicAck(deliverTag, false);
} catch (Exception e) {
log.error("***发生异常,异常信息:{}***",
e.getMessage());
Long errorTagId = Long.valueOf(uniqueId);
Integer count = counterMap.get(errorTagId);
if (Objects.isNull(count)){
counterMap.put(errorTagId, 1);
}else{
counterMap.put(errorTagId, count + 1);
}
log.info("重试次数:{}",counterMap.get(errorTagId));
//尝试三次发生异常则进入死信队列
if( counterMap.get(errorTagId) >= 3){
channel.basicNack(deliverTag,false,false);
}else{
channel.basicNack(deliverTag,false,true);
}
}
log.info("*****完成*****");
}else{
channel.basicNack(deliverTag,false,false);
}
} catch (Throwable e) {
channel.basicNack(deliverTag,false,false);
}
}
}
2、死信队列消费者
import xxxxxx.service.DailyService;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
@Slf4j
@Component
public class DeadConsumer {
@Autowired
private DailyService dailyService;
@Value("${spring.mail.receive}")
private String receive;
@RabbitListener(queues = "deadQueue")
@RabbitHandler
public void dlxListener(Map<String, Object> message, Channel channel, Message mqMsg) throws IOException, TimeoutException {
System.out.println("xxxx死信队列收到消息 : " + message.toString() + ",退回待确认消息的唯一标识是 : " + mqMsg.getMessageProperties().getCorrelationId());
//邮箱通知
dailyService.sendSimpleMail("xxxx异常","请联系管理员,消息异常信息为:"+message.toString(),receive,null);
channel.close();
}
}
邮箱业务实现
import xxxxx.service.DailyService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.stereotype.Service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.SimpleMailMessage;
import javax.annotation.Resource;
@Slf4j
@Service
public class DailyServiceImpl implements DailyService {
@Resource
protected JavaMailSender javaMailSender;
@Value("${spring.mail.username}")
protected String from;
/**
* 发送纯文本格式的邮件
* @param sub 主题
* @param content 内容
* @param to_list 发送人
* @param cc_list 抄送人
*/
@Override
public void sendSimpleMail(String sub, String content, String to_list, String cc_list) {
//创建邮件内容
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom(from); //这里指的是发送者的账号
message.setTo(to_list.split(","));
message.setCc(cc_list==null?null:cc_list.split(","));
message.setSubject(sub);
message.setText(content);
//发送邮件
javaMailSender.send(message);
log.info(sub+"发送邮件成功");
}
}
业务代码中使用:
1、这段代码是自己业务中,传递参数给异步方法类对象,异步方法类中注入生产者对象将参数作为消息发送给消费者,myAsync是自定义异步配置注解,异步配置使用的是spring的自带的线程池接口类ThreadPoolTaskExecutor,用它的原因是因为Spring提供了xml给我们配置ThreadPoolTaskExecutor线程池,但在SpringBoot开发项目,所以直接上yaml或者properties配置即可,达到了可以实现线程的复用,而且还能控制好线程数,写出更好的多线程并发程序的目的。
/**
* 这段代码是自己业务中,传递参数给异步方法类对象,
* 异步方法类中注入生产者对象将参数作为消息发送给消费者
**/
@Autowired
private AsyncMethod asyncMethod;
public ResponseResult yourMethod(String examNumber, String candidateNumber) {
.................//中间是自己的业务逻辑
Map<String, String> map = new HashMap<>();
map.put("examNumber", examNumber);
map.put("candidateNumber", candidateNumber);
map.put("uniqueId", UUID.randomUUID().getMostSignificantBits()+"");
asyncMethod.sendExamAnswer(map);//将前端传过来的参数map封装后交给异步方法
}
/**
* 异步方法
**/
@Slf4j
@Component
@Lazy
public class AsyncMethod {
@Autowired
private SendExamAnswerId sendExamAnswer;
@Async(value = "myAsync")
public void sendExamAnswer(Map<String, String> map) {
sendExamAnswer.send(map);
}
}
@Configuration
@EnableAsync
public class AsyncConfig {
@Value("${spring.task.execution.pool.core-size}")
private int corePoolSize;
@Value("${spring.task.execution.pool.max-size}")
private int maxPoolSize;
@Value("${spring.task.execution.pool.queue-capacity}")
private int queueCapacity;
@Value("${spring.task.execution.thread-name-prefix}")
private String namePrefix;
@Value("${spring.task.execution.pool.keep-alive}")
private int keepAliveSeconds;
@Bean
public Executor myAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//最大线程数
executor.setMaxPoolSize(maxPoolSize);
//核心线程数
executor.setCorePoolSize(corePoolSize);
//任务队列的大小
executor.setQueueCapacity(queueCapacity);
//线程前缀名
executor.setThreadNamePrefix(namePrefix);
//线程存活时间
executor.setKeepAliveSeconds(keepAliveSeconds);
//线程名称前缀
String threadNamePrefix = "MyExecutor-";
executor.setThreadNamePrefix(threadNamePrefix);
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
运行项目:(在定义业务消费者的时候,我制作by zero异常,检验代码运行结果)
1、控制台显示:可以发现异常消息重新投递了三次去消费,死信队列也接收到了,并且邮件发送成功了。
管理员处理死信
1、代码调整后,进入死信队列,将消息重新投递给业务队列中进行消费。
?结束:不好勿喷,希望对大家有所帮助,有所启发!!!!!!!
|