有时候我们需要一个定长的队列,超过的数据会挤掉最先添加的数据
这里提供一个技术数组实现
/***
* data format of offset+separator+operation+separator+identify+separator+key+separator+dataSerial+endMark
*
*/
public class RaftLeaderAppendData implements FixedLengthStoreQueue{
private Integer appendArraySize = 1024 ;
private RaftData[] appendData ;
private int headIndex = -1 ;
private long headOffset ;
public RaftLeaderAppendData(int size){
this.appendArraySize = size;
appendData = new RaftData[appendArraySize];
}
public RaftLeaderAppendData(){
appendData = new RaftData[appendArraySize];
}
@Override
public synchronized void append(RaftData raftData){
Assert.notNull(raftData,"raft data cannot be null .");
long offset = raftData.getOffset() ;
int position = computeHeadAndSetOffset(offset);
appendData[position] = raftData ;
}
@Override
public synchronized RaftData getDataByOffset(long offset){
int index = computeOffsetIndex(offset);
if(index>0){
return appendData[index];
}
return null ;
}
@Override
public synchronized RaftData[] getDataByStartOffset(long startOffset){
if(startOffset<headOffset){
throw new IndexOutOfBoundsException("start offset is lower than head offset .");
}
int size = (int)(appendArraySize-startOffset+headOffset);
RaftData[] res = new RaftData[size];
for(int i = 0 ; i < res.length;i++){
res[i] = getDataByOffset(startOffset+i);
}
return res ;
}
private int computeOffsetIndex(long offset){
if(offset>headOffset||offset<=(headOffset-appendArraySize)){
return -1 ;
}
int away = (int)(headOffset-offset) ;
int index = headIndex-away;
return index>0?index:index+appendArraySize;
}
private synchronized int computeHeadAndSetOffset(long offset){
headOffset = offset;
if(headIndex == appendArraySize-1){
headIndex = 0 ;
return headIndex ;
}
int position = ++headIndex ;
return position ;
}
}
其中RaftData:
package org.orange.persistence.raft.model;
import lombok.Getter;
import lombok.Setter;
import org.orange.persistence.raft.config.RaftLeaderDataConfig;
import org.orange.persistence.raft.enums.NodeDataOperation;
import org.orange.rpc.utils.ReflectUtils;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.security.InvalidParameterException;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;
@Getter
@Setter
public class RaftData implements RaftLeaderDataConfig {
private Long offset ;
private NodeDataOperation operation;
private String identify ;
private String key ;
private String dataType ;
private Object data ;
private static final AtomicLong offsetGenerate = new AtomicLong(0);
private RaftData(){
this.offset = offsetGenerate.incrementAndGet();
}
public RaftData(String identify,String key,Object data){
this.offset = offsetGenerate.incrementAndGet();
this.identify=identify;
this.key = key;
this.data = data ;
if(Objects.nonNull(data)){
this.dataType = data.getClass().getTypeName();
}
}
public static RaftData ofAddData(String identify,String key ,Object data){
check(identify,key,data);
RaftData raftData = new RaftData(identify,key,data);
raftData.setOperation(NodeDataOperation.ADD);
return raftData ;
}
public static RaftData ofUpdateData(String identify,String key ,Object data){
check(identify,key,data);
RaftData raftData = new RaftData(identify,key,data);
raftData.setOperation(NodeDataOperation.UPDATE);
return raftData ;
}
public static RaftData ofDeleteData(String identify,String key){
Assert.notNull(identify,"identify cannot be null .");
Assert.notNull(key,"key cannot be null .");
RaftData raftData = new RaftData(identify,key,null);
raftData.setOperation(NodeDataOperation.DELETE);
return raftData ;
}
public static RaftData ofClearData(String identify){
Assert.notNull(identify,"identify cannot be null .");
RaftData raftData = new RaftData(identify,null,null);
raftData.setOperation(NodeDataOperation.CLEAR);
return raftData ;
}
public static void check(String identify,String key,Object data){
Assert.notNull(identify,"identify cannot be null .");
Assert.notNull(key,"key cannot be null .");
Assert.notNull(data,"data cannot be null .");
}
public String searialData(){
Assert.notNull(offset,"data offset cannot be null .");
StringBuilder sb = new StringBuilder();
sb.append(offset).append(separator)
.append(operation.getCode()).append(separator)
.append(identify).append(separator)
.append(StringUtils.isEmpty(key)?NULL_VALUE:key).append(separator)
.append(Objects.isNull(data)?NULL_VALUE:dataType).append(separator)
.append(Objects.isNull(data)?NULL_VALUE:ReflectUtils.toJson(data)).append(END_MARK);
return sb.toString();
}
public static RaftData deSerialData(String data) {
Assert.notNull(data ,"string data cannot be null .");
String[] split = data.split(separator);
if(split.length!=6){
throw new InvalidParameterException("data is not valid .");
}
RaftData raftData = new RaftData();
raftData.setOffset(Long.valueOf(split[0]));
raftData.setOperation(NodeDataOperation.valueOf(split[1]));
raftData.setIdentify(split[2]);
String key = split[3];
raftData.setKey(NULL_VALUE.equals(key)?null:key);
String dataType = split[4];
raftData.setDataType(NULL_VALUE.equals(dataType)?null:dataType);
String dataValue = split[5];
if(!NULL_VALUE.equals(dataType)&&Objects.nonNull(raftData.getDataType())){
try {
raftData.setData(ReflectUtils.toObject(dataValue,Class.forName(raftData.getDataType())));
} catch (ClassNotFoundException e) {
throw new InvalidParameterException("raft data cannot to object by type "+dataType);
}
}
return raftData ;
}
}
那么在NodeDataStore中
public class NodeDataStore {
/** identifier ,key**/
private Map<String,Map<String,Object>> storeData = new ConcurrentHashMap<>(2);
private RaftLeaderAppendData appendLog = new RaftLeaderAppendData();
private AsyncEventLoop eventLoop ;
public RaftData getAppendLog(long offset){
return appendLog.getDataByOffset(offset);
}
public RaftData[] getAppendLogs(long startOffset){
return appendLog.getDataByStartOffset(startOffset);
}
public void putData(String identify ,String key ,Object value){
Assert.notNull(key,"data key cannot be null .");
Map<String,Object> identifyMap = getIdentifyMap(identify);
RaftData raftData = null ;
if(identifyMap.containsKey(key)){
raftData = RaftData.ofUpdateData(identify,key,value);
}else{
raftData = RaftData.ofAddData(identify,key,value);
}
identifyMap.put(key,value);
appendAndPublishEvent(raftData);
}
public Object getData(String identify, String key){
Assert.notNull(key,"data key cannot be null .");
return getIdentifyMap(identify).get(key);
}
public Object removeData(String identify ,String key){
Assert.notNull(key,"data key cannot be null .");
appendAndPublishEvent(RaftData.ofDeleteData(identify, key));
return getIdentifyMap(identify).remove(key);
}
public void putIdentify(String identify,Map<String,Object> identifyData){
Map<String,Object> identifyMap = getIdentifyMap(identify);
identifyMap.putAll(identifyData);
for(Map.Entry<String,Object> keyValue:identifyData.entrySet()){
appendAndPublishEvent(RaftData.ofAddData(identify,keyValue.getKey(),keyValue.getValue()));
}
}
public Map<String,Object> getIdentifyData(String identify){
if(storeData.containsKey(identify)){
return getIdentifyMap(identify);
}
return null ;
}
public Map<String,Object> removeIdentifyData(String identify){
appendAndPublishEvent(RaftData.ofClearData(identify));
return storeData.remove(identify);
}
private Map<String,Object> getIdentifyMap(String identify){
Assert.notNull(identify,"data identify cannot be null .");
return storeData.getOrDefault(identify,new ConcurrentHashMap<>());
}
private AsyncEventLoop getEventLoop(){
if(Objects.isNull(eventLoop)){
eventLoop = SpringContextHolder.getBean(AsyncEventLoop.class);
}
return eventLoop;
}
private void appendAndPublishEvent(RaftData raftData){
appendLog.append(raftData);
getEventLoop().publishEvent(RaftDataEvent.of(raftData));
}
}
增加相关的日志和事件,事件处理器负责将更改的数据下发至各个Fllower
选举成功或者是fllower重新连接后需要同步数据,可以根据自身的offset跟leader进行同步
|