IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> 如何剖析一个项目之Redis(一) -> 正文阅读

[大数据]如何剖析一个项目之Redis(一)

拆解的项目源码地址:
https://github.com/wiqer/ef-redis.git

感谢开源!!!


已知

这是一款阉割的Java版的redis,通信基于Netty编写。

已知的未知(该篇我们能学到什么)

一个命令被该系统接收到后,是如何处理然后又返回的。

解决已知的未知

我们先准备一个redis-client(我这里用的Windows)。

windows版redis下载地址

下载完成后,我们开始调试这个java版的redis。

在这里插入图片描述
用这个main方法来启动该redis服务!


redis已启动!!!

在这里插入图片描述
可以看出启动时候实际上是启动了一个Netty的服务端,redis-cli客户端首先会建立网络连接,然后通过该连接发送命令,这里首先会使用CommandDecoder来解码。

public class CommandDecoder extends LengthFieldBasedFrameDecoder
{
    private static final Logger LOGGER = Logger.getLogger(CommandDecoder.class);
    private static final int MAX_FRAME_LENGTH = Integer.MAX_VALUE;
    private  Aof aof=null;
//    static {
//        if(PropertiesUtil.getAppendOnly()) {
//            aof=new Aof();
//        }
//    }
    public CommandDecoder(Aof aof){
        this();
        this.aof=aof;
    }
    public CommandDecoder() {
        super(MAX_FRAME_LENGTH, 0, 4);
    }
    @Override
    public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        TRACEID.newTraceId();
        while (in.readableBytes() != 0)
        {
            int mark = in.readerIndex();
            try
            {
                Resp resp = Resp.decode(in);
                if (!(resp instanceof RespArray||resp instanceof SimpleString))
                {
                    throw new IllegalStateException("客户端发送的命令应该只能是Resp Array 和 单行命令 类型");
                }
                Command command=null;
                if(resp instanceof RespArray) {
                    command = CommandFactory.from((RespArray) resp);
                }else if(resp instanceof SimpleString){
                    command  = CommandFactory.from((SimpleString) resp);

                }
                if (command == null)
                {
                    //取出命令
                    ctx.writeAndFlush(new Errors("unsupport command:" + ((BulkString) ((RespArray) resp).getArray()[0]).getContent().toUtf8String()));
                }
                else
                {
                    if (aof!=null&&command instanceof WriteCommand) {
                        aof.put(resp);
                    }
                    return command;
                }
            }
            catch (Exception e)
            {
                in.readerIndex(mark);
                LOGGER.error("解码命令", e);
                break;
            }
        }
        return null;
    }

LengthFieldBasedFrameDecoder快速了解

使用redis-cli发送命令:set hello world。
在这里插入图片描述
redis通信协议详细指南

这一条命令按照通信协议会呈现出这种样子。

*3
$3
SET
$5
hello
$7
world

redis-cli发出命令后CommandDecoder的decode方法会被调用,该方法中会调用Resp的decode静态方法解析命令。

 static Resp decode(ByteBuf buffer) {
        if (buffer.readableBytes() <= 0) {
            new IllegalStateException("没有读取到完整的命令");
        }
        char c = (char) buffer.readByte();
        if (c == RespType.STATUS.getCode()) {
            return new SimpleString(getString(buffer));
        } else if (c == RespType.ERROR.getCode()) {
            return new Errors(getString(buffer));
        } else if (c == RespType.INTEGER.getCode()) {
            int value = getNumber(buffer);
            return new RespInt(value);
        } else if (c == RespType.BULK.getCode()) {
            int length = getNumber(buffer);
            if (buffer.readableBytes() < length + 2) {
                throw new IllegalStateException("没有读取到完整的命令");
            }
            byte[] content;
            if (length == -1) {
                content = null;
            } else {
                content = new byte[length];
                buffer.readBytes(content);
            }
            if (buffer.readByte() != RespType.R.getCode() || buffer.readByte() != RespType.N.getCode()) {
                throw new IllegalStateException("没有读取到完整的命令");
            }
            return new BulkString(new BytesWrapper(content));
            //set hello world会走到MULTYBULK分支
        } else if (c == RespType.MULTYBULK.getCode()) {
            //获取*3后面的3
            int numOfElement = getNumber(buffer);
            Resp[] array = new Resp[numOfElement];
             //逐一解析出set、hello、world
            for (int i = 0; i < numOfElement; i++) {
                array[i] = decode(buffer);
            }
            return new RespArray(array);
        } else {
            /**
             * A~Z
             */
            if (c > 64 && c < 91) {
                return new SimpleString(c + getString(buffer));
            } else {
                return decode(buffer);
            }

            //throw new IllegalArgumentException("意外地命令");
        }
    }

getNumber方法在该示例中用于获取*3的3、$3的3、$5的5、以及$7的7。

    static int getNumber(ByteBuf buffer) {
        char t;
        t = (char) buffer.readByte();
        boolean positive = true;
        int value = 0;
        // 错误(Errors): 响应的首字节是 "-"
        if (t == RespType.ERROR.getCode()) {
            positive = false;
        } else {
            value = t - RespType.ZERO.getCode();
        }
        while (buffer.readableBytes() > 0 && (t = (char) buffer.readByte()) != RespType.R.getCode()) {
            value = value * 10 + (t - RespType.ZERO.getCode());
        }
        if (buffer.readableBytes() == 0 || buffer.readByte() != RespType.N.getCode()) {
            throw new IllegalStateException("没有读取到完整的命令");
        }
        if (!positive) {
            value = -value;
        }
        return value;
    }

回到Command的decode方法,上面我们已经知道resp属于RespArray类型。

  public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        TRACEID.newTraceId();
        while (in.readableBytes() != 0)
        {
            int mark = in.readerIndex();
            try
            {
                Resp resp = Resp.decode(in);
                if (!(resp instanceof RespArray||resp instanceof SimpleString))
                {
                    throw new IllegalStateException("客户端发送的命令应该只能是Resp Array 和 单行命令 类型");
                }
                Command command=null;
                if(resp instanceof RespArray) {
                  //走到这个分支
                    command = CommandFactory.from((RespArray) resp);
                }else if(resp instanceof SimpleString){
                    command  = CommandFactory.from((SimpleString) resp);

                }
                if (command == null)
                {
                    ctx.writeAndFlush(new Errors("unsupport command:" + ((BulkString) ((RespArray) resp).getArray()[0]).getContent().toUtf8String()));
                }
                else
                {
                    if (aof!=null&&command instanceof WriteCommand) {
                        aof.put(resp);
                    }
                    return command;
                }
            }
            catch (Exception e)
            {
                in.readerIndex(mark);
                LOGGER.error("解码命令", e);
                break;
            }
        }
        return null;
    }

我们看下CommandFactory类。

public class CommandFactory {

    private static final Logger LOGGER = Logger.getLogger(CommandFactory.class);

    static Map<String, Supplier<Command>> map = new HashMap<>();

    static {
        for (CommandType each : CommandType.values()) {
            map.put(each.name(), each.getSupplier());
        }
    }


    public static Command from(RespArray arrays) {
        Resp[] array = arrays.getArray();
        String commandName = ((BulkString) array[0]).getContent().toUtf8String().toLowerCase();
        //根据命令名,该例中命令名为set,获取Command提供者
        Supplier<Command> supplier = map.get(commandName);
        if (supplier == null) {
            LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName);
            return null;
        } else {
            try {
                //获取Command实例
                Command command = supplier.get();
                command.setContent(array);
                return command;
            } catch (Throwable t) {
                LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName + ",数据读取异常", t);
                return null;
            }
        }
    }

    public static Command from(SimpleString string) {
        String commandName = string.getContent().toLowerCase();
        Supplier<Command> supplier = map.get(commandName);
        if (supplier == null) {
            LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName);
            return null;
        } else {
            try {
                return supplier.get();
            } catch (Throwable t) {
                LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName + ",数据读取异常", t);
                return null;
            }
        }
    }
}

我们进入Command#setContent方法看一下。

public interface Command {

    Charset CHARSET = StandardCharsets.UTF_8;

    org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Command.class);

    CommandType type();

    void setContent(Resp[] array);

    void handle(ChannelHandlerContext ctx, RedisCore redisCore);
}

找到我们Set命令实现类,这里的WriteCommand是实现Command的接口,之所以多加一层WriteCommand是为实现AOF。

public class Set implements WriteCommand {

    private BytesWrapper key;

    private BytesWrapper value;

    private long timeout = -1;

    private boolean notExistSet = false;

    private boolean existSet = false;

    @Override
    public CommandType type() {
        return CommandType.set;
    }

    @Override
    public void setContent(Resp[] array) {
        //这里是获取参数的key、value放入成员变量的key、value处
        key = ((BulkString) array[1]).getContent();
        value = ((BulkString) array[2]).getContent();
        //下面是解析set命令的一些附加参数,附加参数的含义见下图
        int index = 3;
        while (index < array.length) {
            String string = ((BulkString) array[index]).getContent().toUtf8String();
            index++;
            if (string.startsWith("EX")) {
                String seconds = ((BulkString) array[index]).getContent().toUtf8String();
                timeout = Integer.parseInt(seconds) * 1000;
            } else if (string.startsWith("PX")) {
                String seconds = ((BulkString) array[index]).getContent().toUtf8String();
                timeout = Integer.parseInt(seconds);
            } else if (string.equals("NX")) {
                notExistSet = true;
            } else if (string.equals("XX")) {
                existSet = true;
            }
        }
    }

在这里插入图片描述

构造出具体的Command后,就轮到CommandHandler的channelRead0,又Netty基础的应该对SimpleChannelInboundHandler不陌生的。

SimpleChannelInboundHandler快速来了解

package com.wiqer.redis;

import com.wiqer.redis.command.Command;
import com.wiqer.redis.util.TRACEID;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class CommandHandler extends SimpleChannelInboundHandler<Command> {

    private static final Logger LOGGER = LoggerFactory.getLogger(CommandHandler.class);

    private final RedisCore redisCore;

    public CommandHandler(RedisCore redisCore) {
        this.redisCore = redisCore;
    }


    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Command command) throws Exception {
        String traceId = TRACEID.currentTraceId();
        LOGGER.debug("traceId:" + traceId + " 本次处理的命令:" + command.type().name());
        try {
            //这里是核心处理逻辑
            command.handle(ctx, redisCore);

        } catch (Exception e) {
            LOGGER.error("处理数据时", e);
        }

        LOGGER.debug("traceId:" + traceId + " 命令处理完毕");
    }

}

进入Set的handle方法。

 @Override
    public void handle(ChannelHandlerContext ctx, RedisCore redisCore) {
        if (notExistSet && redisCore.exist(key)) {
            ctx.writeAndFlush(BulkString.NullBulkString);
        } else if (existSet && !redisCore.exist(key)) {
            ctx.writeAndFlush(BulkString.NullBulkString);
        } else {
            if (timeout != -1) {
                timeout += System.currentTimeMillis();
            }
            //这里实例化了一个RedisString继承自RedisData
            RedisString stringData = new RedisString();
            stringData.setValue(value);
            stringData.setTimeout(timeout);
            //这里调用的redisCoreImpl的put方法
            redisCore.put(key, stringData);
            ctx.writeAndFlush(new SimpleString("OK"));
        }
    }

进入redisCoreImpl的put方法,ConcurrentHashMap容器在数据量比较大的时候,链表会转换为红黑树。红黑树在并发情况下,删除和插入过程中有个平衡的过程,会牵涉到大量节点,因此竞争锁资源的代价相对比较高。而跳跃表的操作针对局部,需要锁住的节点少,因此在并发场景下的性能会更好一些。因为这个map需要装载redis所有的key、value所以我们这里使用ConcurrentHashMap。


public class RedisCoreImpl implements RedisCore {
    private final ConcurrentNavigableMap<BytesWrapper, RedisData> map = new ConcurrentSkipListMap<BytesWrapper, RedisData>();

    private final ConcurrentHashMap<BytesWrapper, Channel> clients = new ConcurrentHashMap<>();
    private final Map<Channel, BytesWrapper> clientNames = new ConcurrentHashMap<>();

    @Override
    public Set<BytesWrapper> keys() {
        return map.keySet();
    }

    @Override
    public void putClient(BytesWrapper connectionName, Channel channelContext) {
        clients.put(connectionName, channelContext);
        clientNames.put(channelContext, connectionName);
    }

    @Override
    public boolean exist(BytesWrapper key) {
        return map.containsKey(key);
    }

    @Override
    public void put(BytesWrapper key, RedisData redisData) {
        //这里put方法很简单
        map.put(key, redisData);
    }

回到Set的handle方法,正常写入的话会ctx.writeAndFlush(new SimpleString(“OK”));不能写入时(设置 了XX或NX)会ctx.writeAndFlush(BulkString.NullBulkString);。走到Netty的出站逻辑,出站会调用
ResponseEncoder的encode方法。

Netty的出站入站,感兴趣可以看下这个

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Resp resp, ByteBuf byteBuf) throws Exception {
        try {
            //主要看静态方法write
            Resp.write(resp, byteBuf);
            byteBuf.writeBytes(byteBuf);
        } catch (Throwable t) {
            LOGGER.error("response error: ", t);
        }
    }

进入Resp的write方法。

static void write(Resp resp, ByteBuf buffer) {
        if (resp instanceof SimpleString) {
        //ctx.writeAndFlush(new SimpleString("OK"));走到这个逻辑
            buffer.writeByte(RespType.STATUS.getCode());
            String content = ((SimpleString) resp).getContent();
            buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
            buffer.writeByte(RespType.R.getCode());
            buffer.writeByte(RespType.N.getCode());
        } else if (resp instanceof Errors) {
            buffer.writeByte(RespType.ERROR.getCode());
            String content = ((Errors) resp).getContent();
            buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
            buffer.writeByte(RespType.R.getCode());
            buffer.writeByte(RespType.N.getCode());
        } else if (resp instanceof RespInt) {
            buffer.writeByte(RespType.INTEGER.getCode());
            String content = String.valueOf(((RespInt) resp).getValue());
            buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
            buffer.writeByte(RespType.R.getCode());
            buffer.writeByte(RespType.N.getCode());
        } else if (resp instanceof BulkString) {
            BytesWrapper content = ((BulkString) resp).getContent();
            if (content == null || content.getByteArray().length == 0) {
             //ctx.writeAndFlush(BulkString.NullBulkString);会走这到这个逻辑
                buffer.writeByte(RespType.BULK.getCode());
                buffer.writeByte(RespType.ERROR.getCode());
                buffer.writeByte(RespType.ONE.getCode());
                buffer.writeByte(RespType.R.getCode());
                buffer.writeByte(RespType.N.getCode());
            } else {
                buffer.writeByte(RespType.BULK.getCode());
                String length = String.valueOf(content.getByteArray().length);
                buffer.writeBytes(length.getBytes(StandardCharsets.UTF_8));
                buffer.writeByte(RespType.R.getCode());
                buffer.writeByte(RespType.N.getCode());
                buffer.writeBytes(content.getByteArray());
                buffer.writeByte(RespType.R.getCode());
                buffer.writeByte(RespType.N.getCode());
            }
        } else if (resp instanceof RespArray) {
            buffer.writeByte(RespType.MULTYBULK.getCode());
            Resp[] array = ((RespArray) resp).getArray();
            String length = String.valueOf(array.length);
            buffer.writeBytes(length.getBytes(StandardCharsets.UTF_8));
            buffer.writeByte(RespType.R.getCode());
            buffer.writeByte(RespType.N.getCode());
            for (Resp each : array) {
                write(each, buffer);
            }
        } else {
            throw new IllegalArgumentException();
        }
    }

总结

本篇主要由Set命令引入解析一个命令从客户端接收到以后的处理过程,CommandDecoder作为Netty入站的解码器,ResponseEncoder作为Netty出站的编码器,CommandHandler进行具体命令的处理。

  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2022-01-28 11:59:11  更:2022-01-28 12:00:20 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/17 1:44:09-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码