拆解的项目源码地址: https://github.com/wiqer/ef-redis.git
感谢开源!!!
如何剖析一个项目之Redis(一) 如何剖析一个项目之Redis(二)
已知
- 这是一款阉割的Java版的redis,通信基于Netty编写。
- 我已经知到一个redis-cli发送一个命令过来后,是如何解析处理返回报文的。
已知的未知(该篇我们能学到什么)
Java版得redis实现了aof机制,aof机制是如何实现得呢。
解决已知的未知
我们先来看Aof类,aof主要执行逻辑:先从磁盘读取aof文件还原到内存,每一秒刷一次内存到aof文件。
public class Aof {
private static final Logger LOGGER = Logger.getLogger(Aof.class);
private static final String suffix = ".aof";
public static final int shiftBit = 26;
private Long aofPutIndex = 0L;
private final String dir = PropertiesUtil.getAofPath();
private final BlockingQueue<Resp> runtimeRespQueue = new LinkedBlockingQueue<>();
private final ScheduledThreadPoolExecutor persistenceExecutor = new ScheduledThreadPoolExecutor(1, r -> new Thread(r, "Aof_Single_Thread"));
private final RedisCore redisCore;
final ReadWriteLock reentrantLock = new ReentrantReadWriteLock();
public Aof(RedisCore redisCore) {
this.redisCore = redisCore;
createAofFileDir();
start();
}
private void createAofFileDir() {
File file = new File(this.dir + suffix);
if (!file.isDirectory()) {
File parentFile = file.getParentFile();
if (null != parentFile && !parentFile.exists()) {
boolean ok = parentFile.mkdirs();
if (ok) {
LOGGER.info("create aof file dir : " + dir);
}
}
}
}
public void put(Resp resp) {
runtimeRespQueue.offer(resp);
}
public void start() {
persistenceExecutor.execute(this::pickupDiskDataAllSegment);
persistenceExecutor.scheduleAtFixedRate(this::downDiskAllSegment, 10, 1, TimeUnit.SECONDS);
}
public void close() {
try {
persistenceExecutor.shutdown();
} catch (Throwable t) {
LOGGER.error("error: ", t);
}
}
我们先看如何从磁盘读取aof文件还原到内存的。
public void pickupDiskDataAllSegment() {
reentrantLock.writeLock().lock();
try {
long segmentId = -1;
Segment:
while (segmentId != (aofPutIndex >> shiftBit)) {
segmentId = (aofPutIndex >> shiftBit);
RandomAccessFile randomAccessFile = new RandomAccessFile(dir + "_" + segmentId + suffix, "r");
FileChannel channel = randomAccessFile.getChannel();
long len = channel.size();
int putIndex = Format.uintNBit(aofPutIndex, shiftBit);
long baseOffset = aofPutIndex - putIndex;
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_ONLY, 0, len);
ByteBuf bufferPolled = PooledByteBufAllocator.DEFAULT.buffer((int) len);
bufferPolled.writeBytes(mappedByteBuffer);
do {
Resp resp;
try {
resp = Resp.decode(bufferPolled);
if (resp == null){
bufferPolled.release();
clean(mappedByteBuffer);
aofPutIndex = baseOffset + (1 << shiftBit);
randomAccessFile.close();
break;
}
} catch (Throwable t) {
clean(mappedByteBuffer);
randomAccessFile.close();
bufferPolled.release();
break Segment;
}
assert resp instanceof RespArray;
Command command = CommandFactory.from((RespArray) resp);
WriteCommand writeCommand = (WriteCommand) command;
assert writeCommand != null;
writeCommand.handle(this.redisCore);
putIndex = bufferPolled.readerIndex();
aofPutIndex = putIndex + baseOffset;
if (putIndex >= (1 << shiftBit)) {
bufferPolled.release();
clean(mappedByteBuffer);
aofPutIndex = baseOffset + (1 << shiftBit);
randomAccessFile.close();
break;
}
} while (true);
}
LOGGER.info("read aof end");
} catch (Throwable t) {
if (t instanceof FileNotFoundException) {
LOGGER.info("read aof end");
} else {
LOGGER.error("read aof error: ", t);
}
} finally {
reentrantLock.writeLock().unlock();
}
}
实时将阻塞队列runtimeRespQueue中的Resp对象写入aof分段文件。
public void downDiskAllSegment() {
if (reentrantLock.writeLock().tryLock()) {
try {
long segmentId = -1;
Segment:
while (segmentId != (aofPutIndex >> shiftBit)) {
segmentId = (aofPutIndex >> shiftBit);
ByteBuf bufferPolled = PooledByteBufAllocator.DEFAULT.buffer(1024);
RandomAccessFile randomAccessFile = new RandomAccessFile(dir + "_" + segmentId + suffix, "rw");
FileChannel channel = randomAccessFile.getChannel();
long len = channel.size();
int putIndex = Format.uintNBit(aofPutIndex, shiftBit);
long baseOffset = aofPutIndex - putIndex;
if (len == 0) {
len = 1L << shiftBit;
}
MappedByteBuffer mappedByteBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, len);
do {
Resp resp = runtimeRespQueue.peek();
if (resp == null) {
bufferPolled.release();
clean(mappedByteBuffer);
randomAccessFile.close();
break Segment;
}
Resp.write(resp, bufferPolled);
int respLen = bufferPolled.readableBytes();
int capacity = mappedByteBuffer.capacity();
if ((respLen + putIndex >= capacity)) {
bufferPolled.release();
clean(mappedByteBuffer);
randomAccessFile.close();
aofPutIndex = baseOffset + (1 << shiftBit);
break;
}
while (respLen > 0) {
respLen--;
mappedByteBuffer.put(putIndex++, bufferPolled.readByte());
}
aofPutIndex = baseOffset + putIndex;
runtimeRespQueue.poll();
} while (true);
}
} catch (IOException e) {
System.err.println(e.getMessage());
LOGGER.error("aof IOException ", e);
} catch (Exception e) {
System.err.println(e.getMessage());
LOGGER.error("aof Exception", e);
} finally {
reentrantLock.writeLock().unlock();
}
}
}
runtimeRespQueue中的Resp对象中时何时被放入的呢?我们看下CommandDecoder的decode方法。
public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
TRACEID.newTraceId();
while (in.readableBytes() != 0) {
int mark = in.readerIndex();
try {
Resp resp = Resp.decode(in);
if (!(resp instanceof RespArray || resp instanceof SimpleString)) {
throw new IllegalStateException("客户端发送的命令应该只能是Resp Array 和 单行命令 类型");
}
Command command;
if (resp instanceof RespArray) {
command = CommandFactory.from((RespArray) resp);
} else {
command = CommandFactory.from((SimpleString) resp);
}
if (command == null) {
assert resp instanceof RespArray;
ctx.writeAndFlush(new Errors("unsupport command:" + ((BulkString) ((RespArray) resp).getArray()[0]).getContent().toUtf8String()));
} else {
if (aof != null && command instanceof WriteCommand) {
aof.put(resp);
}
return command;
}
} catch (Throwable t) {
in.readerIndex(mark);
LOGGER.error("解码命令", t);
break;
}
}
return null;
}
总结
本篇主要剖析了java版redis如何将写命令写入aof文件,又如何在重启时从aof文件重新读回到内存。
|