IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 如何剖析一个项目之Redis(二) -> 正文阅读

[大数据]如何剖析一个项目之Redis(二)

拆解的项目源码地址:
https://github.com/wiqer/ef-redis.git

感谢开源!!!

如何剖析一个项目之Redis(一)
如何剖析一个项目之Redis(二)


已知

  • 这是一款阉割的Java版的redis,通信基于Netty编写。
  • 我已经知到一个redis-cli发送一个命令过来后,是如何解析处理返回报文的。

已知的未知(该篇我们能学到什么)

Java版得redis实现了aof机制,aof机制是如何实现得呢。

解决已知的未知

我们先来看Aof类,aof主要执行逻辑:先从磁盘读取aof文件还原到内存,每一秒刷一次内存到aof文件。

public class Aof {

    private static final Logger LOGGER = Logger.getLogger(Aof.class);

    private static final String suffix = ".aof";

    //控制单个文件不超过64M
    public static final int shiftBit = 26;

    //全局的aof写入进度
    private Long aofPutIndex = 0L;

    //aof文件目录
    private final String dir = PropertiesUtil.getAofPath();

    //缓冲队列
    private final BlockingQueue<Resp> runtimeRespQueue = new LinkedBlockingQueue<>();

    //单线程调度器
    private final ScheduledThreadPoolExecutor persistenceExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "Aof_Single_Thread"));

    private final RedisCore redisCore;

    final ReadWriteLock reentrantLock = new ReentrantReadWriteLock();


    public Aof(RedisCore redisCore) {
        this.redisCore = redisCore;
        //新建aof目录
        createAofFileDir();
        //aof启动
        start();
    }

    private void createAofFileDir() {
        File file = new File(this.dir + suffix);
        if (!file.isDirectory()) {
            File parentFile = file.getParentFile();
            if (null != parentFile && !parentFile.exists()) {
                boolean ok = parentFile.mkdirs();
                if (ok) {
                    LOGGER.info("create aof file dir : " + dir);
                }
            }
        }
    }

    //写命令进入系统时会调用该方法将Resp对象放入缓冲队列一份
    public void put(Resp resp) {
        runtimeRespQueue.offer(resp);
    }

    public void start() {
        //先从磁盘读取aof文件还原到内存
        persistenceExecutor.execute(this::pickupDiskDataAllSegment);
        //每一秒刷一次内存到aof文件
        persistenceExecutor.scheduleAtFixedRate(this::downDiskAllSegment, 10, 1, TimeUnit.SECONDS);
    }

    public void close() {
        try {
            persistenceExecutor.shutdown();
        } catch (Throwable t) {
            LOGGER.error("error: ", t);
        }
    }

我们先看如何从磁盘读取aof文件还原到内存的。

public void pickupDiskDataAllSegment() {
        //获取锁
        reentrantLock.writeLock().lock();
        try {
            long segmentId = -1;

            Segment:
            /* 初始化时segmentId为-1
             * aofPutIndex的低26位代表单个分段aof文件的putIndex
             * aofPutIndex的高38位代表对应的分段segmentId
             * segmentId != (aofPutIndex >> shiftBit) 当aofPutIndex跳到下一个分段时成立
             */
            while (segmentId != (aofPutIndex >> shiftBit)) {
                //获取分段Id
                segmentId = (aofPutIndex >> shiftBit);
                RandomAccessFile randomAccessFile = new RandomAccessFile(dir + "_" + segmentId + suffix, "r");
                FileChannel channel = randomAccessFile.getChannel();
                long len = channel.size();
                //相对当前segment的写入位置
                int putIndex = Format.uintNBit(aofPutIndex, shiftBit);
                //当前segment的基址位置
                long baseOffset = aofPutIndex - putIndex;

                MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, len);
                ByteBuf bufferPolled = PooledByteBufAllocator.DEFAULT.buffer((int) len);
                
                //文件中的命令写入ByteBuf,用于Resp.decode
                bufferPolled.writeBytes(mappedByteBuffer);

                //处理ByteBuf的命令写入内存中
                do {
                    Resp resp;
                    try {
                        resp = Resp.decode(bufferPolled);
                        if (resp == null){
                            bufferPolled.release();
                            clean(mappedByteBuffer);
                            //设置aofPutIndex到下一个分段的开始位置
                            aofPutIndex = baseOffset + (1 << shiftBit);
                            randomAccessFile.close();
                            //跳到Segment,到下一个分段继续处理文件命令
                            break;
                        }
                    } catch (Throwable t) {
                        clean(mappedByteBuffer);
                        randomAccessFile.close();
                        bufferPolled.release();
                        break Segment;
                    }
                    assert resp instanceof RespArray;
                    Command command = CommandFactory.from((RespArray) resp);
                    WriteCommand writeCommand = (WriteCommand) command;
                    assert writeCommand != null;
                    //写入内存中
                    writeCommand.handle(this.redisCore);
                    putIndex = bufferPolled.readerIndex();
                    aofPutIndex = putIndex + baseOffset;
                    if (putIndex >= (1 << shiftBit)) {
                        bufferPolled.release();
                        clean(mappedByteBuffer);
                        //设置aofPutIndex到下一个分段的开始位置
                        aofPutIndex = baseOffset + (1 << shiftBit);
                        //跳到Segment,到下一个分段继续处理文件命令
                        randomAccessFile.close();
                        break;
                    }
                } while (true);

            }
            LOGGER.info("read aof end");
        } catch (Throwable t) {
            if (t instanceof FileNotFoundException) {
                LOGGER.info("read aof end");
            } else {
                LOGGER.error("read aof error: ", t);
            }
        } finally {
            reentrantLock.writeLock().unlock();
        }

    }

实时将阻塞队列runtimeRespQueue中的Resp对象写入aof分段文件。

public void downDiskAllSegment() {
        //尝试获取锁
        if (reentrantLock.writeLock().tryLock()) {
            try {
                long segmentId = -1;

                Segment:
                /* 初始化时segmentId为-1
                 * aofPutIndex的低26位代表单个分段aof文件的putIndex
                 * aofPutIndex的高38位代表对应的分段segmentId
                 * segmentId != (aofPutIndex >> shiftBit) 当aofPutIndex跳到下一个分段时成立
                 */
                while (segmentId != (aofPutIndex >> shiftBit)) {
                    segmentId = (aofPutIndex >> shiftBit);
                    ByteBuf bufferPolled = PooledByteBufAllocator.DEFAULT.buffer(1024);
                    RandomAccessFile randomAccessFile = new RandomAccessFile(dir + "_" + segmentId + suffix, "rw");
                    FileChannel channel = randomAccessFile.getChannel();
                    long len = channel.size();
                    int putIndex = Format.uintNBit(aofPutIndex, shiftBit);
                    long baseOffset = aofPutIndex - putIndex;

                    if (len == 0) {
                        //创建一个新的aof分段文件时len=0,设置len = 1L << shiftBit
                        len = 1L << shiftBit;
                    }

                    //创建文件内存映射
                    MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, len);
                    do {
                        //获取runtimeRespQueue对头Resp对象
                        Resp resp = runtimeRespQueue.peek();
                        if (resp == null) {
                            bufferPolled.release();
                            clean(mappedByteBuffer);
                            randomAccessFile.close();
                            break Segment;
                        }
                        //解析Resp对象写入ByteBuff
                        Resp.write(resp, bufferPolled);
                        int respLen = bufferPolled.readableBytes();
                        int capacity = mappedByteBuffer.capacity();
                        //判断写入文件后是否超过设定的size
                        if ((respLen + putIndex >= capacity)) {
                            bufferPolled.release();
                            clean(mappedByteBuffer);
                            randomAccessFile.close();
                            //将aofPutIndex设置到下一个分段文件的开始位置
                            aofPutIndex = baseOffset + (1 << shiftBit);
                            //跳到Segment,到下一个分段继续处理阻塞队列中的Resp对象
                            break;
                        }

                        //写入文件
                        while (respLen > 0) {
                            respLen--;
                            mappedByteBuffer.put(putIndex++, bufferPolled.readByte());
                        }

                        aofPutIndex = baseOffset + putIndex;
                        //消费成功后删除掉对头的Resp对象
                        runtimeRespQueue.poll();

                    } while (true);

                }

            } catch (IOException e) {
                System.err.println(e.getMessage());
                LOGGER.error("aof IOException ", e);
            } catch (Exception e) {
                System.err.println(e.getMessage());
                LOGGER.error("aof Exception", e);
            } finally {
                reentrantLock.writeLock().unlock();
            }

        }
    }

runtimeRespQueue中的Resp对象中时何时被放入的呢?我们看下CommandDecoder的decode方法。

    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        TRACEID.newTraceId();
        while (in.readableBytes() != 0) {
            int mark = in.readerIndex();
            try {
                Resp resp = Resp.decode(in);
                if (!(resp instanceof RespArray || resp instanceof SimpleString)) {
                    throw new IllegalStateException("客户端发送的命令应该只能是Resp Array 和 单行命令 类型");
                }
                Command command;

                if (resp instanceof RespArray) {
                    command = CommandFactory.from((RespArray) resp);
                } else {
                    command = CommandFactory.from((SimpleString) resp);

                }
                if (command == null) {
                    assert resp instanceof RespArray;
                    ctx.writeAndFlush(new Errors("unsupport command:" + ((BulkString) ((RespArray) resp).getArray()[0]).getContent().toUtf8String()));
                } else {
                //如果解析出的命令为WriteCommand
                    if (aof != null && command instanceof WriteCommand) {
                        aof.put(resp);
                    }
                    return command;
                }
            } catch (Throwable t) {
                in.readerIndex(mark);
                LOGGER.error("解码命令", t);
                break;
            }
        }
        return null;
    }

总结

本篇主要剖析了java版redis如何将写命令写入aof文件,又如何在重启时从aof文件重新读回到内存。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-02-03 01:16:27  更:2022-02-03 01:16:48 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:25:46-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码