IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> RabbitMQ保证数据不丢失 -> 正文阅读

[大数据]RabbitMQ保证数据不丢失

RabbitMQ保证数据不丢失

1.概述

生产者到MQ

首先:需要解决生产者到MQ数据丢失的问题如:MQ端服务器挂了导致数据丢失
1.MQ开启持久化:持久化就是写到硬盘,这样就不会出现MQ服务器挂了数据丢失的问题

@Configuration
public class RabbitMQConfig {

    public static final String PRODUCER_QUEUE="producer";

    //声明队列
    @Bean
    public Queue queue(){
        return new Queue(PRODUCER_QUEUE,true);//第二个属性为true代表开启持久化
    }

    //演示:开启交换机持久化
    @Bean
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("demo")
        .durable(true)//该属性为true代表开启持久化
        .build();
    }
}

2.MQ开启confirm
MQ开启持久化后,还会出现一个问题:如果在持久化的过程中服务器挂了怎么办,所以我们需要开启MQ的confirm功能,默认为关闭,confirm功能就是在MQ接收到消息并持久化完成后会执行的操作,需要自定义实现RabbitTemplate.ConfirmCallback。操作如下
1)配置文件中开启confirm

spring:
  rabbitmq:
    publisher-confirms: true #开启confirm机制

2)实现RabbitTemplate.ConfirmCallback,重写confirm方法,添加发送数据方法

import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

@Component
public class CustomMessageSender implements RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate redisTemplate;

    private static final String MESSAGE_CONFIRM_KEY="message_confirm_";

	//构造方法,为当前rabbitTemplete添加ConfirmCallback处理类
    public CustomMessageSender(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     * 回调方法
     * @param correlationData 本次操作的唯一标识
     * @param ack 成功/失败标识
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack){
            //持久化成功
            //删除临时存储空间的内容
            redisTemplate.delete(correlationData.getId());
            redisTemplate.delete(MESSAGE_CONFIRM_KEY+correlationData.getId());
        }else{
            //持久化失败
            //消息重新发送
            Map<String,String> entries = redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY + correlationData.getId());
            String exchange = entries.get("exchange");
            String routingKey = entries.get("routingKey");
            String message = entries.get("message");
            //重新发送
            rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
        }
    }

    //在发送数据的同时,先向redis存一份数据,等到将来confirm方法返回持久化成功后,再将redis数据删除,否则就重新发送
    //自定义消息发送发送
    public void sendMessage(String exchange,String routingKey,String message){

        //向存储空间中存放本次消息的内容
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        redisTemplate.boundValueOps(correlationData.getId()).set(message);

        Map<String,String> map = new HashMap<>();
        map.put("exchange",exchange);
        map.put("routingKey",routingKey);
        map.put("message",message);
        redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY+correlationData.getId(),map);

    
        //发送消息
        rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData);
    }
}

MQ到消费者

消费者同样有数据在接收到但还没处理的图中消费者服务器挂了,就会出现问题,所以我们同样需要确认,而此次确认是消费者给MQ确认,就又需要开启Ack手动确认功能,默认为自动应答,也就是消费者接收到数据MQ就不管了,你服务器挂了也不关我MQ的事,所以需要切换手动确认。操作如下
1)添加配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual #切换手动应答

2)消费端进行操作后手动Ack,MQ在接受到Ack前,数据会一直保存在MQ中,所以可能MQ端压力会大于之前,所以可以设置一下MQ最大接收数据量,避免无限接收数据导致MQ爆炸,引起所有服务爆炸。

//300表示MQ最多存300条数据
channel.basicQos(300);

余下手动Ack实现

import com.alibaba.fastjson.JSON;
import com.demo.config.RabbitMQConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class ConsumeListener {

    @RabbitListener(queues = RabbitMQConfig.PRODUCER_QUEUE)
    public void handlerMessage(Message message, Channel channel){

        //设置预抓取总数
        try {
            channel.basicQos(300);
        } catch (IOException e) {
            e.printStackTrace();
        }

        //消费数据
        //dohandlerMessage为自定义操作
        boolean result = dohandlerMessage(message);

        if (result){
            //操作成功
            //向mq发送成功通知
            /**
             * deliveryTag: 消息的唯一标识
             * multiple:是否批量操作
             */
            try {
              channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else{
            //操作失败
            //返回失败通知
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }
}
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-08-25 12:16:45  更:2021-08-25 12:17:51 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 19:04:36-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码