1.概述
生产者到MQ
首先:需要解决生产者到MQ数据丢失的问题如:MQ端服务器挂了导致数据丢失 1.MQ开启持久化:持久化就是写到硬盘,这样就不会出现MQ服务器挂了数据丢失的问题
@Configuration
public class RabbitMQConfig {
public static final String PRODUCER_QUEUE="producer";
@Bean
public Queue queue(){
return new Queue(PRODUCER_QUEUE,true);
}
@Bean
public Exchange exchange(){
return ExchangeBuilder.directExchange("demo")
.durable(true)
.build();
}
}
2.MQ开启confirm MQ开启持久化后,还会出现一个问题:如果在持久化的过程中服务器挂了怎么办,所以我们需要开启MQ的confirm功能,默认为关闭,confirm功能就是在MQ接收到消息并持久化完成后会执行的操作,需要自定义实现RabbitTemplate.ConfirmCallback。操作如下 1)配置文件中开启confirm
spring:
rabbitmq:
publisher-confirms: true
2)实现RabbitTemplate.ConfirmCallback,重写confirm方法,添加发送数据方法
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@Component
public class CustomMessageSender implements RabbitTemplate.ConfirmCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private RedisTemplate redisTemplate;
private static final String MESSAGE_CONFIRM_KEY="message_confirm_";
public CustomMessageSender(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
redisTemplate.delete(correlationData.getId());
redisTemplate.delete(MESSAGE_CONFIRM_KEY+correlationData.getId());
}else{
Map<String,String> entries = redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY + correlationData.getId());
String exchange = entries.get("exchange");
String routingKey = entries.get("routingKey");
String message = entries.get("message");
rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
}
}
public void sendMessage(String exchange,String routingKey,String message){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
redisTemplate.boundValueOps(correlationData.getId()).set(message);
Map<String,String> map = new HashMap<>();
map.put("exchange",exchange);
map.put("routingKey",routingKey);
map.put("message",message);
redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY+correlationData.getId(),map);
rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
}
}
MQ到消费者
消费者同样有数据在接收到但还没处理的图中消费者服务器挂了,就会出现问题,所以我们同样需要确认,而此次确认是消费者给MQ确认,就又需要开启Ack手动确认功能,默认为自动应答,也就是消费者接收到数据MQ就不管了,你服务器挂了也不关我MQ的事,所以需要切换手动确认。操作如下 1)添加配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
2)消费端进行操作后手动Ack,MQ在接受到Ack前,数据会一直保存在MQ中,所以可能MQ端压力会大于之前,所以可以设置一下MQ最大接收数据量,避免无限接收数据导致MQ爆炸,引起所有服务爆炸。
channel.basicQos(300);
余下手动Ack实现
import com.alibaba.fastjson.JSON;
import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class ConsumeListener {
@RabbitListener(queues = RabbitMQConfig.PRODUCER_QUEUE)
public void handlerMessage(Message message, Channel channel){
try {
channel.basicQos(300);
} catch (IOException e) {
e.printStackTrace();
}
boolean result = dohandlerMessage(message);
if (result){
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
e.printStackTrace();
}
}else{
try {
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
|