上一篇我们介绍了RPC基础服务的实现,现在我们接着将如何将一个RPC服务做成一个标准的产品还有哪些需要实现的。
我们的服务消费方仅仅实现远程调用是不够的,离产品化还有很长一段距离。
Consumer的核心功能包括:
Provider的核心功能包括:
一、RPC服务消费方核心功能设计实现
1、连接管理
首先我们来分析下consumer的连接管理功能设计。
业务逻辑发起一个远程调用服务,通过proxy进行调用,首先进行序列化,然后找到连接发送请求,接收结果。
consumer需要保持与服务提供方长连接,用于传输请求数据也返回结果。
连接管理包括以下几个功能点:
- 初始化时机:什么时候创建连接,不同的模块不同的要求。
- gateway(网关):网关需要对接很多模块,有的模块可能请求很少,如果才能预先创建的方式,可能会存在资源浪费的情况。所以网关最好采用懒加载的方式,服务调用的时候再与对应的模块创建连接
- 业务服务之间可以采用预先创建的方式
- 连接数维护
- 心跳/重连
2、负载均衡
负载均衡需要确保多个服务提供方节点流量均匀/合理,支持节点扩容与灰度发布。
负载均衡的算法有多种
轮询、随机、取模大家都比较了解,这里就不展开讲了,主要给大家讲下带权重的算法。
(1)权重负载均衡设计
- 权重0~10范围内取值
- 值越大表示权重越高
- 权重越高代表分配流量比例越大
算法设计:
数据结构
算法描述
- 负载均衡选出一个节点
- 生成0~9之间的随机数,还是选用一个数组?
- 对应的数值:0使用该节点,1不使用该节点
权重数组的创建:
public static byte[] randomGenerator(int limit, int num) {
byte[] tempArray = new byte[limit];
if (num <= 0) {
for (int i = 0; i < limit; i++) {
tempArray[i] = 0;
}
return tempArray;
}
if (num >= limit) {
for (int i = 0; i < limit; i++) {
tempArray[i] = 1;
}
return tempArray;
}
Random random = new Random();
for (int i = 0; i < num; i++) {
int temp = Math.abs(random.nextInt()) % limit;
while (tempArray[temp] == 1) {
temp = Math.abs(random.nextInt()) % limit;
}
tempArray[temp] = 1;
}
return tempArray;
}
(2)轮询+权重负载均衡实现
- 轮询到某一个server节点
- 然后根据权重再进行一次过滤
- 轮询到下一个节点
for (int i = start; i < start + count; i++) {
int index = i % count;
Server server = servers.get(index);
if (needChooseAnotherOne.test(server)) {
requestCount.getAndIncrement();
continue;
}
int requestTime = this.getRequestTimeCountAndSet(server, count);
if (server.getWeights() <10 && server.getWeights() > -1) {
byte[] abandonArray = server.getAbandonArray();
if (abandonArray[requestTime % abandonArray.length] == 1) {
requestCount.getAndIncrement();
continue;
}
}
if (serverState.NORMAL == server.getState()) {
result = server;
break;
}
requestCount.getAndIncrement();
}
(3)请求路由
请求路由的作用是通过一系列规则过滤出可以选择的服务提供节点列表,在应用隔离、读写分离、灰度发布中有重要作用。
路由的功能设计包括
匹配规则设计
数据结构设计
IP分流规则:attribute=IP,operator=IN,value={IP1, IP2},servers={Node1, Node2}
上面是Dubbo一个经典的路由策略。我们有4个服务提供方,现在进行灰度发布,将ProviderA-1进行灰度,这里的规则链有2个规则:第一个规则是所有的流量都不打给ProviderA-1,这样的话不会有流量走到ProviderA-1;第二个规则是灰度的流量打到ProviderA-1,其他的流量都正常的进行访问。
(4)超时处理
调用方超时处理:
- 调用方超时处理
public Protocol request(Protocol requestProtocol) throws Exception {
if (ServerState.Reboot == state || ServerState.Dead == state) {
throw new RebootException();
}
increaseCU();
CSocket socket = null;
try {
try {
socket = socketPool.getSocket();
byte[] data = requestProtocol.toBytes(socket.isRights(), socket.getDESKey());
socket.registerRec(requestProtocol.getSessionId());
socket.send(data);
} catch (TimeoutException e) {
timeout();
throw e;
} catch (IOException e) {
if (socket == null || !socket.connecting()) {
if (testServerSocket() != ServerState.Normal) {
this.asDeath();
logger.info("this server : {} is dead , will choose another one !", address);
throw new RebootException();
}
}
throw e;
} catch (Exception e) {
throw e;
} finally {
if (socket != null) {
socket.dispose();
}
}
byte[] buffer = socket.receive(requestProtocol.getSessionId(), currUserCount);
Protocol receiveProtocol = Protocol.fromBytes(buffer, socket.isRights(), socket.getDESKey());
return receiveProtocol;
} finally {
if (socket != null) {
if (sockett != null) {
socket.unregisterRec(requestProtocol.getSessionId());
}
}
}
}
public void registerRec(int sessionId) {
AutoResetEvent event = new AutoResetEvent();
WindowData wd = new WindowData(event);
WaitWindows.put(sessionId, wd);
}
public void send(byte[] data) {
try {
if (null != transmitter) {
TiresiasClientHelper.getInstance().setEndPoint(channel);
TransmitterTask task = new TransmitterTask(this, data);
transmitter.invoke(task);
}
} catch (NotYetConnectedException ex) {
_connecting = false;
throw ex;
}
}
public void invoke(TransmitterTask task) {
int size = wqueue.size();
if (size > 1024 * 64) {
logger.warn(Version.ID + " send queue is to max size is:" + size);
}
wqueue.offer(task);
}
class SendTask implements Runnable {
@Override
public void run() {
int offset = 0;
TransmitterTask[] elementData = new TransmitterTask[5];
int waitTime = 0;
for (;;) {
try {
TransmitterTask task = wqueue.poll(waitTime, TimeUnit.MILLISECONDS);
if (null == task) {
if (elementData.length > 0 && offset > 0) {
send(elementData, offset);
offset = 0;
arrayClear(elementData);
}
waitTime = 10;
continue;
}
if (offset == 5) {
if (null != elementData) {
send(elementData, offset);
}
offset = 0;
arrayClear(elementData);
}
elementData[offset] = task;
waitTime = 0;
++offset;
} catch (Exception ex) {
offset = 0;
arrayClear(elementData);
ex.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}
public byte[] receive(int sessionId, int queueLen) {
WindowData wd = WaitWindons.get(sessionId);
if (wd == null) {
throw new RuntimeException("Need invoke 'registerRec' method before invoke 'receive' method!");
}
AutoResetEvent event = wd.getEvent();
if (!event.waitOne(socketConfig.getReceiveTimeout())) {
throw new TimeoutException("ServiceIP:[" + this.getServiceIP() + "],Receive data timeout or error!timeout:" + socketConfig.getReceiveTimeout() + "ms,queue length:"
+ queueLen);
}
byte[] data = wd.getData();
int offset = SFPStruct.Version;
int len = ByteConverter.bytesToIntLitterEndian(data, offset);
if (len != data.length) {
throw new ProtocolException("The data length inconsistent!datalen:" + data.length + ",check len:" + len);
}
return data;
}
public class AutoResetEvent {
CountDownLatch cdl;
public AutoResetEvent() {
cdl = new CountDownLatch(1);
}
public AutoResetEvent(int waitCount) {
cdl = new CountDownLatch(waitCount);
}
public void set() {
cdl.countDown();
}
public boolean waitOne(long time) {
try {
return cdl.await(time, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
public void decode(ByteBuffer receiveBuffer, byte[] receiveArray) throws Exception {
try {
int limit = receiveBuffer.limit();
int num = 0;
for (; num < limit; num++) {
byte b = receiveArray[num];
receiveData.write(b);
if (b == ProtocolConst.P_END_TAG[index]) {
index++;
if (index == ProtocolConst.P_END_TAG.length) {
byte[] pak = receiveData.toByteArray(ProtocolConst.P_START_TAG.length, receiveData.size() - ProtocolConst.P_END_TAG.length - ProtocolConst.P_START_TAG.length);
int pSessionId = ByteConverter.bytesToIntLittleEndian(pak, SFPStruct.Version + SFPStruct.TotalLen);
WindowData wd = WaitWindows.get(pSessionId);
if (wd != null) {
if (wd.getFlag() == 0) {
wd.setData(pak);
wd.getEvent().set();
} else if (wd.getFlag() == 1) {
if (null != unregisterRec(pSessionId)) {
wd.getReceiveHandler().notify(pak, wd.getInvokeCnxn());
}
} else if (wd.getFlag() == 2) {
logger.info("unsupport request type");
}
}
index = 0;
receiveData.reset();
continue;
}
} else if (index != 0) {
if (b == ProtocolConst.P_END_TAG[0]) {
index = 1;
} else {
index = 0;
}
}
}
} catch (Exception e) {
index = 0;
ex.printStackTrace();
receiveData.clear()
}
}
|