拆解的项目源码地址: https://github.com/wiqer/ef-redis.git
感谢开源!!!
已知
这是一款阉割的Java版的redis,通信基于Netty编写。
已知的未知(该篇我们能学到什么)
一个命令被该系统接收到后,是如何处理然后又返回的。
解决已知的未知
我们先准备一个redis-client(我这里用的Windows)。
windows版redis下载地址
下载完成后,我们开始调试这个java版的redis。
用这个main方法来启动该redis服务!
redis已启动!!!
可以看出启动时候实际上是启动了一个Netty的服务端,redis-cli客户端首先会建立网络连接,然后通过该连接发送命令,这里首先会使用CommandDecoder来解码。
public class CommandDecoder extends LengthFieldBasedFrameDecoder
{
private static final Logger LOGGER = Logger.getLogger(CommandDecoder.class);
private static final int MAX_FRAME_LENGTH = Integer.MAX_VALUE;
private Aof aof=null;
public CommandDecoder(Aof aof){
this();
this.aof=aof;
}
public CommandDecoder() {
super(MAX_FRAME_LENGTH, 0, 4);
}
@Override
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
TRACEID.newTraceId();
while (in.readableBytes() != 0)
{
int mark = in.readerIndex();
try
{
Resp resp = Resp.decode(in);
if (!(resp instanceof RespArray||resp instanceof SimpleString))
{
throw new IllegalStateException("客户端发送的命令应该只能是Resp Array 和 单行命令 类型");
}
Command command=null;
if(resp instanceof RespArray) {
command = CommandFactory.from((RespArray) resp);
}else if(resp instanceof SimpleString){
command = CommandFactory.from((SimpleString) resp);
}
if (command == null)
{
ctx.writeAndFlush(new Errors("unsupport command:" + ((BulkString) ((RespArray) resp).getArray()[0]).getContent().toUtf8String()));
}
else
{
if (aof!=null&&command instanceof WriteCommand) {
aof.put(resp);
}
return command;
}
}
catch (Exception e)
{
in.readerIndex(mark);
LOGGER.error("解码命令", e);
break;
}
}
return null;
}
LengthFieldBasedFrameDecoder快速了解
使用redis-cli发送命令:set hello world。 redis通信协议详细指南
这一条命令按照通信协议会呈现出这种样子。
*3 $3 SET $5 hello $7 world
redis-cli发出命令后CommandDecoder的decode方法会被调用,该方法中会调用Resp的decode静态方法解析命令。
static Resp decode(ByteBuf buffer) {
if (buffer.readableBytes() <= 0) {
new IllegalStateException("没有读取到完整的命令");
}
char c = (char) buffer.readByte();
if (c == RespType.STATUS.getCode()) {
return new SimpleString(getString(buffer));
} else if (c == RespType.ERROR.getCode()) {
return new Errors(getString(buffer));
} else if (c == RespType.INTEGER.getCode()) {
int value = getNumber(buffer);
return new RespInt(value);
} else if (c == RespType.BULK.getCode()) {
int length = getNumber(buffer);
if (buffer.readableBytes() < length + 2) {
throw new IllegalStateException("没有读取到完整的命令");
}
byte[] content;
if (length == -1) {
content = null;
} else {
content = new byte[length];
buffer.readBytes(content);
}
if (buffer.readByte() != RespType.R.getCode() || buffer.readByte() != RespType.N.getCode()) {
throw new IllegalStateException("没有读取到完整的命令");
}
return new BulkString(new BytesWrapper(content));
} else if (c == RespType.MULTYBULK.getCode()) {
int numOfElement = getNumber(buffer);
Resp[] array = new Resp[numOfElement];
for (int i = 0; i < numOfElement; i++) {
array[i] = decode(buffer);
}
return new RespArray(array);
} else {
if (c > 64 && c < 91) {
return new SimpleString(c + getString(buffer));
} else {
return decode(buffer);
}
}
}
getNumber方法在该示例中用于获取*3的3、$3的3、$5的5、以及$7的7。
static int getNumber(ByteBuf buffer) {
char t;
t = (char) buffer.readByte();
boolean positive = true;
int value = 0;
if (t == RespType.ERROR.getCode()) {
positive = false;
} else {
value = t - RespType.ZERO.getCode();
}
while (buffer.readableBytes() > 0 && (t = (char) buffer.readByte()) != RespType.R.getCode()) {
value = value * 10 + (t - RespType.ZERO.getCode());
}
if (buffer.readableBytes() == 0 || buffer.readByte() != RespType.N.getCode()) {
throw new IllegalStateException("没有读取到完整的命令");
}
if (!positive) {
value = -value;
}
return value;
}
回到Command的decode方法,上面我们已经知道resp属于RespArray类型。
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
TRACEID.newTraceId();
while (in.readableBytes() != 0)
{
int mark = in.readerIndex();
try
{
Resp resp = Resp.decode(in);
if (!(resp instanceof RespArray||resp instanceof SimpleString))
{
throw new IllegalStateException("客户端发送的命令应该只能是Resp Array 和 单行命令 类型");
}
Command command=null;
if(resp instanceof RespArray) {
command = CommandFactory.from((RespArray) resp);
}else if(resp instanceof SimpleString){
command = CommandFactory.from((SimpleString) resp);
}
if (command == null)
{
ctx.writeAndFlush(new Errors("unsupport command:" + ((BulkString) ((RespArray) resp).getArray()[0]).getContent().toUtf8String()));
}
else
{
if (aof!=null&&command instanceof WriteCommand) {
aof.put(resp);
}
return command;
}
}
catch (Exception e)
{
in.readerIndex(mark);
LOGGER.error("解码命令", e);
break;
}
}
return null;
}
我们看下CommandFactory类。
public class CommandFactory {
private static final Logger LOGGER = Logger.getLogger(CommandFactory.class);
static Map<String, Supplier<Command>> map = new HashMap<>();
static {
for (CommandType each : CommandType.values()) {
map.put(each.name(), each.getSupplier());
}
}
public static Command from(RespArray arrays) {
Resp[] array = arrays.getArray();
String commandName = ((BulkString) array[0]).getContent().toUtf8String().toLowerCase();
Supplier<Command> supplier = map.get(commandName);
if (supplier == null) {
LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName);
return null;
} else {
try {
Command command = supplier.get();
command.setContent(array);
return command;
} catch (Throwable t) {
LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName + ",数据读取异常", t);
return null;
}
}
}
public static Command from(SimpleString string) {
String commandName = string.getContent().toLowerCase();
Supplier<Command> supplier = map.get(commandName);
if (supplier == null) {
LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName);
return null;
} else {
try {
return supplier.get();
} catch (Throwable t) {
LOGGER.warn("traceId:" + TRACEID.currentTraceId() + " 不支持的命令:" + commandName + ",数据读取异常", t);
return null;
}
}
}
}
我们进入Command#setContent方法看一下。
public interface Command {
Charset CHARSET = StandardCharsets.UTF_8;
org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Command.class);
CommandType type();
void setContent(Resp[] array);
void handle(ChannelHandlerContext ctx, RedisCore redisCore);
}
找到我们Set命令实现类,这里的WriteCommand是实现Command的接口,之所以多加一层WriteCommand是为实现AOF。
public class Set implements WriteCommand {
private BytesWrapper key;
private BytesWrapper value;
private long timeout = -1;
private boolean notExistSet = false;
private boolean existSet = false;
@Override
public CommandType type() {
return CommandType.set;
}
@Override
public void setContent(Resp[] array) {
key = ((BulkString) array[1]).getContent();
value = ((BulkString) array[2]).getContent();
int index = 3;
while (index < array.length) {
String string = ((BulkString) array[index]).getContent().toUtf8String();
index++;
if (string.startsWith("EX")) {
String seconds = ((BulkString) array[index]).getContent().toUtf8String();
timeout = Integer.parseInt(seconds) * 1000;
} else if (string.startsWith("PX")) {
String seconds = ((BulkString) array[index]).getContent().toUtf8String();
timeout = Integer.parseInt(seconds);
} else if (string.equals("NX")) {
notExistSet = true;
} else if (string.equals("XX")) {
existSet = true;
}
}
}
构造出具体的Command后,就轮到CommandHandler的channelRead0,又Netty基础的应该对SimpleChannelInboundHandler不陌生的。
SimpleChannelInboundHandler快速来了解
package com.wiqer.redis;
import com.wiqer.redis.command.Command;
import com.wiqer.redis.util.TRACEID;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CommandHandler extends SimpleChannelInboundHandler<Command> {
private static final Logger LOGGER = LoggerFactory.getLogger(CommandHandler.class);
private final RedisCore redisCore;
public CommandHandler(RedisCore redisCore) {
this.redisCore = redisCore;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Command command) throws Exception {
String traceId = TRACEID.currentTraceId();
LOGGER.debug("traceId:" + traceId + " 本次处理的命令:" + command.type().name());
try {
command.handle(ctx, redisCore);
} catch (Exception e) {
LOGGER.error("处理数据时", e);
}
LOGGER.debug("traceId:" + traceId + " 命令处理完毕");
}
}
进入Set的handle方法。
@Override
public void handle(ChannelHandlerContext ctx, RedisCore redisCore) {
if (notExistSet && redisCore.exist(key)) {
ctx.writeAndFlush(BulkString.NullBulkString);
} else if (existSet && !redisCore.exist(key)) {
ctx.writeAndFlush(BulkString.NullBulkString);
} else {
if (timeout != -1) {
timeout += System.currentTimeMillis();
}
RedisString stringData = new RedisString();
stringData.setValue(value);
stringData.setTimeout(timeout);
redisCore.put(key, stringData);
ctx.writeAndFlush(new SimpleString("OK"));
}
}
进入redisCoreImpl的put方法,ConcurrentHashMap容器在数据量比较大的时候,链表会转换为红黑树。红黑树在并发情况下,删除和插入过程中有个平衡的过程,会牵涉到大量节点,因此竞争锁资源的代价相对比较高。而跳跃表的操作针对局部,需要锁住的节点少,因此在并发场景下的性能会更好一些。因为这个map需要装载redis所有的key、value所以我们这里使用ConcurrentHashMap。
public class RedisCoreImpl implements RedisCore {
private final ConcurrentNavigableMap<BytesWrapper, RedisData> map = new ConcurrentSkipListMap<BytesWrapper, RedisData>();
private final ConcurrentHashMap<BytesWrapper, Channel> clients = new ConcurrentHashMap<>();
private final Map<Channel, BytesWrapper> clientNames = new ConcurrentHashMap<>();
@Override
public Set<BytesWrapper> keys() {
return map.keySet();
}
@Override
public void putClient(BytesWrapper connectionName, Channel channelContext) {
clients.put(connectionName, channelContext);
clientNames.put(channelContext, connectionName);
}
@Override
public boolean exist(BytesWrapper key) {
return map.containsKey(key);
}
@Override
public void put(BytesWrapper key, RedisData redisData) {
map.put(key, redisData);
}
回到Set的handle方法,正常写入的话会ctx.writeAndFlush(new SimpleString(“OK”));不能写入时(设置 了XX或NX)会ctx.writeAndFlush(BulkString.NullBulkString);。走到Netty的出站逻辑,出站会调用 ResponseEncoder的encode方法。
Netty的出站入站,感兴趣可以看下这个
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Resp resp, ByteBuf byteBuf) throws Exception {
try {
Resp.write(resp, byteBuf);
byteBuf.writeBytes(byteBuf);
} catch (Throwable t) {
LOGGER.error("response error: ", t);
}
}
进入Resp的write方法。
static void write(Resp resp, ByteBuf buffer) {
if (resp instanceof SimpleString) {
buffer.writeByte(RespType.STATUS.getCode());
String content = ((SimpleString) resp).getContent();
buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
buffer.writeByte(RespType.R.getCode());
buffer.writeByte(RespType.N.getCode());
} else if (resp instanceof Errors) {
buffer.writeByte(RespType.ERROR.getCode());
String content = ((Errors) resp).getContent();
buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
buffer.writeByte(RespType.R.getCode());
buffer.writeByte(RespType.N.getCode());
} else if (resp instanceof RespInt) {
buffer.writeByte(RespType.INTEGER.getCode());
String content = String.valueOf(((RespInt) resp).getValue());
buffer.writeBytes(content.getBytes(StandardCharsets.UTF_8));
buffer.writeByte(RespType.R.getCode());
buffer.writeByte(RespType.N.getCode());
} else if (resp instanceof BulkString) {
BytesWrapper content = ((BulkString) resp).getContent();
if (content == null || content.getByteArray().length == 0) {
buffer.writeByte(RespType.BULK.getCode());
buffer.writeByte(RespType.ERROR.getCode());
buffer.writeByte(RespType.ONE.getCode());
buffer.writeByte(RespType.R.getCode());
buffer.writeByte(RespType.N.getCode());
} else {
buffer.writeByte(RespType.BULK.getCode());
String length = String.valueOf(content.getByteArray().length);
buffer.writeBytes(length.getBytes(StandardCharsets.UTF_8));
buffer.writeByte(RespType.R.getCode());
buffer.writeByte(RespType.N.getCode());
buffer.writeBytes(content.getByteArray());
buffer.writeByte(RespType.R.getCode());
buffer.writeByte(RespType.N.getCode());
}
} else if (resp instanceof RespArray) {
buffer.writeByte(RespType.MULTYBULK.getCode());
Resp[] array = ((RespArray) resp).getArray();
String length = String.valueOf(array.length);
buffer.writeBytes(length.getBytes(StandardCharsets.UTF_8));
buffer.writeByte(RespType.R.getCode());
buffer.writeByte(RespType.N.getCode());
for (Resp each : array) {
write(each, buffer);
}
} else {
throw new IllegalArgumentException();
}
}
总结
本篇主要由Set命令引入解析一个命令从客户端接收到以后的处理过程,CommandDecoder作为Netty入站的解码器,ResponseEncoder作为Netty出站的编码器,CommandHandler进行具体命令的处理。
|