IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> RPC服务调用方核心功能设计 -> 正文阅读

[网络协议]RPC服务调用方核心功能设计

上一篇我们介绍了RPC基础服务的实现,现在我们接着将如何将一个RPC服务做成一个标准的产品还有哪些需要实现的。

我们的服务消费方仅仅实现远程调用是不够的,离产品化还有很长一段距离。

Consumer的核心功能包括:

  • 连接管理
  • 负载均衡
  • 请求路由
  • 超时处理
  • 健康检查

Provider的核心功能包括:

  • 队列/线程池
  • 超时丢弃
  • 优雅关闭
  • 过载保护

一、RPC服务消费方核心功能设计实现

1、连接管理

首先我们来分析下consumer的连接管理功能设计。
在这里插入图片描述

业务逻辑发起一个远程调用服务,通过proxy进行调用,首先进行序列化,然后找到连接发送请求,接收结果。

consumer需要保持与服务提供方长连接,用于传输请求数据也返回结果。

连接管理包括以下几个功能点:

  • 初始化时机:什么时候创建连接,不同的模块不同的要求。
    • gateway(网关):网关需要对接很多模块,有的模块可能请求很少,如果才能预先创建的方式,可能会存在资源浪费的情况。所以网关最好采用懒加载的方式,服务调用的时候再与对应的模块创建连接
    • 业务服务之间可以采用预先创建的方式
  • 连接数维护
  • 心跳/重连

2、负载均衡

在这里插入图片描述

负载均衡需要确保多个服务提供方节点流量均匀/合理,支持节点扩容与灰度发布。

负载均衡的算法有多种

  • 轮询
  • 随机
  • 取模
  • 带权重
  • 一致性Hash

轮询、随机、取模大家都比较了解,这里就不展开讲了,主要给大家讲下带权重的算法。

(1)权重负载均衡设计

  • 权重0~10范围内取值
  • 值越大表示权重越高
  • 权重越高代表分配流量比例越大
    在这里插入图片描述

算法设计:

数据结构

  • 数组,根据权重值填充
  • 0,1的位置随机打乱

算法描述

  • 负载均衡选出一个节点
  • 生成0~9之间的随机数,还是选用一个数组?
  • 对应的数值:0使用该节点,1不使用该节点

在这里插入图片描述

权重数组的创建:

//  num 表示生成的数组中1的个数 在数组中1表示抛弃请求 0表示接受请求
public static byte[] randomGenerator(int limit, int num) {

    byte[] tempArray = new byte[limit];

    if (num <= 0) {
        for (int i = 0; i < limit; i++) {
            tempArray[i] = 0;
        }
        return tempArray;
    }
    if (num >= limit) {
        for (int i = 0; i < limit; i++) {
            tempArray[i] = 1;
        }
        return tempArray;
    }

    //在数组中随机填充num个1
    Random random = new Random();
    for (int i = 0; i < num; i++) {
        int temp = Math.abs(random.nextInt()) % limit;
        while (tempArray[temp] == 1) {
            temp = Math.abs(random.nextInt()) % limit;
        }
        tempArray[temp] = 1;
    }
    return tempArray;
}

(2)轮询+权重负载均衡实现

  • 轮询到某一个server节点
  • 然后根据权重再进行一次过滤
  • 轮询到下一个节点
for (int i = start; i < start + count; i++) {
  int index = i % count;
  Server server = servers.get(index);
  if (needChooseAnotherOne.test(server)) {
    requestCount.getAndIncrement();
    continue;
  }
  
  int requestTime = this.getRequestTimeCountAndSet(server, count);
  if (server.getWeights() <10 && server.getWeights() > -1) {
    byte[] abandonArray = server.getAbandonArray();
    // abandonTimes[i] == 1 表示server不接受请求
    if (abandonArray[requestTime % abandonArray.length] == 1) {
      requestCount.getAndIncrement();
    	continue;
    }
  }
  
  if (serverState.NORMAL == server.getState()) {
    result = server;
    break;
  }
  
  requestCount.getAndIncrement();
}

(3)请求路由

请求路由的作用是通过一系列规则过滤出可以选择的服务提供节点列表,在应用隔离、读写分离、灰度发布中有重要作用。

在这里插入图片描述

路由的功能设计包括

  • 匹配规则
  • 行为
  • 链表

匹配规则设计

  • 待比较属性
  • 运算符
  • 属性匹配值
  • 匹配节点

数据结构设计

  • 链表

在这里插入图片描述

IP分流规则:attribute=IP,operator=IN,value={IP1, IP2},servers={Node1, Node2}

在这里插入图片描述

上面是Dubbo一个经典的路由策略。我们有4个服务提供方,现在进行灰度发布,将ProviderA-1进行灰度,这里的规则链有2个规则:第一个规则是所有的流量都不打给ProviderA-1,这样的话不会有流量走到ProviderA-1;第二个规则是灰度的流量打到ProviderA-1,其他的流量都正常的进行访问。

(4)超时处理

调用方超时处理:

  • 工作线程阻塞
    • 等待回包通知
  • 超时逻辑
    • 工作线程等待通知
    • 数据返回终止等待
    • 超时抛出异常
  1. 调用方超时处理
/**代理类请求发送和接收调用逻辑
 * 1、注册WindowData
 * 2、异步发送
 * 3、等待数据到达
 * 4、注销WindowData
 */
public Protocol request(Protocol requestProtocol) throws Exception {
  // Server 状态判断
  if (ServerState.Reboot == state || ServerState.Dead == state) {
    throw new RebootException();
  }
  
  increaseCU();
  
  CSocket socket = null;
  try {
    try {
      socket = socketPool.getSocket();
      byte[] data = requestProtocol.toBytes(socket.isRights(), socket.getDESKey()); // 序列化
      // 注册WondowData,放入Session-WindowData的Map
      socket.registerRec(requestProtocol.getSessionId());
      // 异步发送,将数据放入发送队列
      socket.send(data);
    } catch (TimeoutException e) {
      timeout();
      throw e;
    } catch (IOException e) {
      if (socket == null || !socket.connecting()) {
        if (testServerSocket() != ServerState.Normal) {
          this.asDeath();
          logger.info("this server : {}  is dead , will choose another one !", address);
//        logger.error(String.format("server %s is dead", new Object[]{this.address}), e);
          throw new RebootException();
        }
      }
      throw e;
    } catch (Exception e) {
      throw e;
    } finally {
      if (socket != null) {
        socket.dispose();
      }
    }
    
    // 接收数据
    byte[] buffer = socket.receive(requestProtocol.getSessionId(), currUserCount);
    Protocol receiveProtocol = Protocol.fromBytes(buffer, socket.isRights(), socket.getDESKey());
    
    return receiveProtocol;
  } finally {
    if (socket != null) {
      if (sockett != null) {
        // 注销windowData
        socket.unregisterRec(requestProtocol.getSessionId());
      }
    }
  }
}

// 数据发送实现逻辑
// 1、注册发送事件SessionID-WindowData的Map
public void registerRec(int sessionId) {
  AutoResetEvent event = new AutoResetEvent();
  WindowData wd = new WindowData(event);
  WaitWindows.put(sessionId, wd);
}

// 2、异步发送
public void send(byte[] data) {
  try {
    if (null != transmitter) {
      TiresiasClientHelper.getInstance().setEndPoint(channel);
      TransmitterTask task = new TransmitterTask(this, data);
      transmitter.invoke(task);
    }
  } catch (NotYetConnectedException ex) {
    _connecting = false;
    throw ex;
  }
}

public void invoke(TransmitterTask task) {
  int size = wqueue.size();
  if (size > 1024 * 64) {
    logger.warn(Version.ID + " send queue is to max size is:" + size);
  }
  //放入队列,异步发送
  wqueue.offer(task);
}

// 发送线程处理逻辑
class SendTask implements Runnable {
  @Override
  public void run() {
    int offset = 0;
    // 缓存数据,用于聚合发送
    TransmitterTask[] elementData = new TransmitterTask[5];
    int waitTime = 0;
    for (;;) {
      try {
        TransmitterTask task = wqueue.poll(waitTime, TimeUnit.MILLISECONDS);
        
        // 请求队列中没有请求数据了,发送数据
        if (null == task) {
          if (elementData.length > 0 && offset > 0) {
            send(elementData, offset);
            offset = 0;
            arrayClear(elementData);
          }
          waitTime = 10;
          continue;
        }
        
        // 数组满了,发送数据
        if (offset == 5) {
          // 发送
          if (null != elementData) {
            send(elementData, offset);
          }
          offset = 0;
          arrayClear(elementData);
        }
        
        // 不实时发送,暂时放入数组
        elementData[offset] = task;
        waitTime = 0;
        ++offset;
      } catch (Exception ex) {
        offset = 0;
        arrayClear(elementData);
        ex.printStackTrace();
      } catch (Throwable e) {
        e.printStackTrace();
      }
    }
  }
}

// 工作线程接收数据处理逻辑
public byte[] receive(int sessionId, int queueLen) {
  // 获取WindowData
  WindowData wd = WaitWindons.get(sessionId);
  if (wd == null) {
    throw new RuntimeException("Need invoke 'registerRec' method before invoke 'receive' method!");
  }
  
  AutoResetEvent event = wd.getEvent();
  // 等待数据到达事件
  if (!event.waitOne(socketConfig.getReceiveTimeout())) {
    throw new TimeoutException("ServiceIP:[" + this.getServiceIP() + "],Receive data timeout or error!timeout:" +	 		 socketConfig.getReceiveTimeout() + "ms,queue length:"
                + queueLen);
  }
  
  // 从WindowData中获取Data数据
  byte[] data = wd.getData();
  int offset = SFPStruct.Version;
  int len = ByteConverter.bytesToIntLitterEndian(data, offset);
  if (len != data.length) {
    throw new ProtocolException("The data length inconsistent!datalen:" + data.length + ",check len:" + len);
  }
  return data;
}

// AutoResetEvent 实现
public class AutoResetEvent {
  CountDownLatch cdl;
  
  public AutoResetEvent() {
        cdl = new CountDownLatch(1);
    }

    public AutoResetEvent(int waitCount) {
        cdl = new CountDownLatch(waitCount);
    }

    public void set() {
        cdl.countDown();
    }

    public boolean waitOne(long time) {
        try {
            return cdl.await(time, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

// 接收线程处理逻辑
public void decode(ByteBuffer receiveBuffer, byte[] receiveArray) throws Exception {
  try {
    int limit = receiveBuffer.limit();
    int num = 0;
    for (; num < limit; num++) {
      byte b = receiveArray[num];
      receiveData.write(b);
      
      if (b == ProtocolConst.P_END_TAG[index]) {
        index++;
        if (index == ProtocolConst.P_END_TAG.length) {
          byte[] pak = receiveData.toByteArray(ProtocolConst.P_START_TAG.length, receiveData.size() - ProtocolConst.P_END_TAG.length - ProtocolConst.P_START_TAG.length);
          
          // 解析返回包中的SessionId
          int pSessionId = ByteConverter.bytesToIntLittleEndian(pak, SFPStruct.Version + SFPStruct.TotalLen);
          
          // 根据sessionId获取对应的windowData
          WindowData wd = WaitWindows.get(pSessionId);
          if (wd != null) {
            if (wd.getFlag() == 0) {
              // 将返回数据放入WindowData
              wd.setData(pak);
              // 调用CountDownlatch的countDown,结束工作线程的等待
              wd.getEvent().set();
            } else if (wd.getFlag() == 1) {
              // 异步
              if (null != unregisterRec(pSessionId)) {
                wd.getReceiveHandler().notify(pak, wd.getInvokeCnxn());
              }
            } else if (wd.getFlag() == 2) {
              // 异步
              logger.info("unsupport request type");
            }
          }
          
          index = 0;
          receiveData.reset();
          continue;
        }
      } else if (index != 0) {
        if (b == ProtocolConst.P_END_TAG[0]) {
          index = 1;
        } else {
          index = 0;
        }
      }
    }
  } catch (Exception e) {
    index = 0;
    ex.printStackTrace();
    receiveData.clear()
  }
}
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2021-09-19 08:19:50  更:2021-09-19 08:20:08 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年11日历 -2024/11/26 1:44:52-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码