springboot集成Rabbitmq 手动确认消息
pom文件:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
yml文件:
#配置rabbitMq 服务器
server:
port: 9096
#配置rabbitMq 服务器
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虚拟host 可以不设置,使用server默认host
virtual-host: /
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
template:
# 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
mandatory: true
listener:
simple:
# NONE 自动确认 RabbitMQ成功将消息发出(即将消息成功写入TCP Socket)中立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递。
# 所以这种情况如果消费端消费逻辑抛出异常,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
# 一般这种情况我们都是使用try catch捕捉异常后,打印日志用于追踪数据,这样找出对应数据再做后续处理。
# MANUAL 手动确认 需要人为地获取到channel之后调用方法向server发送ack(或消费失败时的nack)信息。
# AUTO 由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。
acknowledge-mode: manual #手动ACK
redis:
host: 127.0.0.1 # Redis服务器地址
database: 1 # Redis数据库索引(默认为0)
port: 6379 # Redis服务器连接端口
password: 123456 # Redis服务器连接密码(默认为空)
timeout: 5000ms # 连接超时时间(毫秒)
config配置选其一
************************************第一种方式******************************************
package com.jy.springbootRabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Slf4j
@Configuration
public class RabbitMqConfig {
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("ConfirmCallback: "+"相关数据:"+correlationData);
log.info("ConfirmCallback: "+"确认情况:"+ack);
log.info("ConfirmCallback: "+"原因:"+cause);
});
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("ReturnCallback: "+"消息:"+message);
log.info("ReturnCallback: "+"回应码:"+replyCode);
log.info("ReturnCallback: "+"回应信息:"+replyText);
log.info("ReturnCallback: "+"交换机:"+exchange);
log.info("ReturnCallback: "+"路由键:"+routingKey);
});
return rabbitTemplate;
}
}
************************************第二种方式******************************************
package com.jy.springbootRabbitmq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
@Slf4j
@Configuration
public class RabbitInitializingBean implements InitializingBean {
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public void afterPropertiesSet() throws Exception {
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback: " + "相关数据:" + correlationData);
log.info("ConfirmCallback: " + "确认情况:" + ack);
log.info("ConfirmCallback: " + "原因:" + cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("ReturnCallback: " + "消息:" + message);
log.info("ReturnCallback: " + "回应码:" + replyCode);
log.info("ReturnCallback: " + "回应信息:" + replyText);
log.info("ReturnCallback: " + "交换机:" + exchange);
log.info("ReturnCallback: " + "路由键:" + routingKey);
}
});
}
}
主题设置TopicRabbitConfig:
package com.jy.springbootRabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
public final static String STUDENT_QUEUE = "topic.student.queue";
public final static String DELAY_QUEUE = "delay.queue";
public final static String STUDENT_EXCHANGE = "topic.student";
public final static String DELAY_EXCHANGE = "delay.exchange";
public final static String STUDENT_KEY = "topic.student.key";
public final static String DELAY_KEY = "delay.key";
public final static Long EXPIRATION = 5000L;
@Bean
TopicExchange delayEexchange() {
return new TopicExchange(DELAY_EXCHANGE);
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE)
.withArgument("x-dead-letter-exchange", TopicRabbitConfig.STUDENT_EXCHANGE)
.withArgument("x-dead-letter-routing-key", STUDENT_KEY)
.build();
}
@Bean
public Binding delayBinding() {
return BindingBuilder
.bind(delayQueue())
.to(delayEexchange())
.with(DELAY_KEY);
}
@Bean
public Queue studentQueue() {
return new Queue(STUDENT_QUEUE);
}
@Bean
TopicExchange studentExchange() {
return new TopicExchange(STUDENT_EXCHANGE);
}
@Bean
Binding bindingExchangeStudent() {
return BindingBuilder.bind(studentQueue()).to(studentExchange()).with(STUDENT_KEY);
}
}
监听类ReceiveHandler:
package com.jy.springbootRabbitmq.Handler;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.ClassUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.jy.springbootRabbitmq.config.TopicRabbitConfig;
import com.jy.springbootRabbitmq.entity.student;
import com.jy.springbootRabbitmq.entity.RetrySendmsg;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.UUID;
@Slf4j
@Component
public class ReceiveHandler {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private StringRedisTemplate stringRedisTemplate;
@RabbitListener(queues = {TopicRabbitConfig.STUDENT_QUEUE})
public void studentMessage(Channel channel, Message message, student dto) throws IOException {
try {
log.info("=============>>>监听队列数据:"+ dto);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
resendMessage(TopicRabbitConfig.DELAY_EXCHANGE, TopicRabbitConfig.DELAY_KEY, dto,message);
}
}
public void resendMessage(String exchange,String routingKey,Object obj,Message message) {
String messageId = message.getMessageProperties().getMessageId();
if(StrUtil.isNotBlank(messageId) && Convert.toInt(stringRedisTemplate.opsForValue().get(messageId)) >3) {
log.info("===========>>>重试三次失败后依然失败存库人工处理"+obj.toString());
RetrySendmsg retrySendmsg = new RetrySendmsg();
retrySendmsg.setExchange(message.getMessageProperties().getReceivedExchange());
retrySendmsg.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
retrySendmsg.setMsgBody(JSONUtil.toJsonStr(obj));
retrySendmsg.setClassName(ClassUtil.getClassName(obj, false));
return;
}
rabbitTemplate.convertAndSend(exchange, routingKey, obj, msg -> {
MessageProperties messageProperties = msg.getMessageProperties();
String new_messageId = Convert.toStr(UUID.randomUUID());
if(StrUtil.isBlank(messageId)) {
messageProperties.setMessageId(new_messageId);
messageProperties.setExpiration(Convert.toStr(TopicRabbitConfig.EXPIRATION));
stringRedisTemplate.opsForValue().set(new_messageId,"1");
}else {
messageProperties.setMessageId(messageId);
messageProperties.setExpiration(Convert.toStr(TopicRabbitConfig.EXPIRATION));
stringRedisTemplate.opsForValue().set(messageId, Convert.toInt(stringRedisTemplate.opsForValue().get(messageId)) + 1 + "");
}
return msg;
});
}
}
实体类:
package com.jy.springbootRabbitmq.entity;
import lombok.Data;
import java.io.Serializable;
@Data
public class student implements Serializable{
private static final long serialVersionUID = 1L;
private String studentName;
private String studentCode;
public student(String studentName, String studentCode) {
this.studentName = studentName;
this.studentCode = studentCode;
}
}
package com.jy.springbootRabbitmq.entity;
import lombok.Data;
import lombok.EqualsAndHashCode;
@Data
@EqualsAndHashCode(callSuper = false)
public class RetrySendmsg {
private static final long serialVersionUID = 1L;
private String exchange;
private String routingKey;
private String msgBody;
private String newMsgBody;
private String className;
private Integer status = 0;
private Integer isSend = 0;
}
参考:https://blog.csdn.net/qq330983778/article/details/99611193 https://blog.csdn.net/qq_35387940/article/details/100514134
结束END
|