RPC 实战与核心原理分析
RPCX 是一个分布式的Go语言的 RPC 框架,支持Zookepper、etcd、consul多种服务发现方式,多种服务路由方式,
例子
服务端
package main
import (
"flag"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var addr = flag.String("addr", "localhost:8972", "server address")
func main() {
flag.Parse()
s := server.NewServer()
s.Register(new(example.Arith), "")
s.Serve("tcp", *addr)
}
客户端
package main
import (
"context"
"flag"
"log"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for {
reply := &example.Reply{}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(1e9)
}
}
这是一个简单的例子,使用RPCX实现了服务端和客户端。
RPCX实例分析
如何发布自己的服务
在上面的例子中,我们只需要用service.NewServer()→ RegisterName → Serve 就可以开始一个rpc服务端。我们也可以使用rpcx的插件功能,将服务端的地址注册到持久化存储当中,这样client 就可以使用服务发现的功能来对服务端进行rpc调用。
使用RegistryPlugin 的方式进行服务注册时,rpcx 会在服务启动前将服务注册到注册中心中。
RegisterPlugin interface {
Register(name string, rcvr interface{}, metadata string) error
Unregister(name string) error
}
func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {
_, err := s.register(rcvr, name, true)
if err != nil {
return err
}
if s.Plugins == nil {
s.Plugins = &pluginContainer{}
}
return s.Plugins.DoRegister(name, rcvr, metadata)
}
- rpcx 支持 ZooKeeper,Etcd,mDNS,Consul,Redis,DNS等方式进行服务注册。例子可以参考:https://github.com/rpcxio/rpcx-examples/tree/master/registry。
如何调用他人的远程服务
我们知道,一个RPC的调用会涉及到两端,一个是Client ,一个是Server 端。两端的基本工作如下
Server 端工作
- 监听端口
- 响应连接请求
- 接收数据包
- 解析数据包
- 调用相应方法
- 组装请求处理结果数据包
- 发送结果数据包
- 序列化协议
服务端如何接收客户端请求
serveListener
- 服务端在监听端口之后,会有一个
serveListener 函数, - 这个函数一个for循环
Accept 客户端过来的连接, - 最后会调用
serveConn 来处理这个请求
func (s *Server) serveListener(ln net.Listener) error {
var tempDelay time.Duration
s.mu.Lock()
s.ln = ln
s.mu.Unlock()
for {
conn, e := ln.Accept()
if tc, ok := conn.(*net.TCPConn); ok {
period := s.options["TCPKeepAlivePeriod"]
if period != nil {
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(period.(time.Duration))
tc.SetLinger(10)
}
}
conn, ok := s.Plugins.DoPostConnAccept(conn)
if !ok {
conn.Close()
continue
}
s.mu.Lock()
s.activeConn[conn] = struct{}{}
s.mu.Unlock()
if share.Trace {
log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())
}
go s.serveConn(conn)
}
}
serveConn
serveConn 是读取和处理用户过来的请求,如果开启了异步写,服务端会异步response 给client, 其中 readRequest 从网络连接中读取到客户端过来的数据,这里会包含解码解压缩操作 processOneRequest 就是真正处理客户端请求的函数了,这里会通过反射调用服务的方法
func (s *Server) serveConn(conn net.Conn) {
r := bufio.NewReaderSize(conn, ReaderBuffsize)
var writeCh chan *[]byte
if s.AsyncWrite {
writeCh = make(chan *[]byte, 1)
defer close(writeCh)
go s.serveAsyncWrite(conn, writeCh)
}
for {
req, err := s.readRequest(ctx, r)
if s.pool != nil {
s.pool.Submit(func() {
s.processOneRequest(ctx, req, conn, writeCh)
})
} else {
go s.processOneRequest(ctx, req, conn, writeCh)
}
}
}
服务端解析客户端请求
- 协议详解见:RPCX协议详解
- 在Decode中会进行协议解析
func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Message, err error) {
err = s.Plugins.DoPreReadRequest(ctx)
if err != nil {
return nil, err
}
req = protocol.GetPooledMsg()
err = req.Decode(r)
if err == io.EOF {
return req, err
}
perr := s.Plugins.DoPostReadRequest(ctx, req, err)
if err == nil {
err = perr
}
return req, err
}
如果配置了压缩方式,也会对服务的数据进行解压缩
func (m *Message) Decode(r io.Reader) error {
if m.CompressType() != None {
compressor := Compressors[m.CompressType()]
if compressor == nil {
return ErrUnsupportedCompressor
}
m.Payload, err = compressor.Unzip(m.Payload)
if err != nil {
return err
}
}
return err
}
服务端调用相应方法和返回结果给客户端端
接受到用户的请求之后,就是调用服务端对应的方法了,在用户消息Decode 中,服务端已经把ServicePath、ServiceMethod 解析出来了,剩下的就是只要找到对应的方法就可以执行
m.ServicePath = util.SliceByteToString(data[n:nEnd])
m.ServiceMethod = util.SliceByteToString(data[n:nEnd])
前面在 servConn 中的函数 processOneRequest 会处理Client的请求,我们来看下,这里是怎么处理的,
- 先解析出
Payload 中的 请求参数, - 然后用参数去调用
service 这个函数 ,service := s.serviceMap[serviceName] 就是 服务注册过去的业务函数。 - 然后用
sendResponse 发送response 给client
func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
serviceName := req.ServicePath
methodName := req.ServiceMethod
argv := reflectTypePools.Get(mtype.ArgType)
codec := share.Codecs[req.SerializeType()]
if codec == nil {
err = fmt.Errorf("can not find codec for %d", req.SerializeType())
return s.handleError(res, err)
}
err = codec.Decode(req.Payload, argv)
if err != nil {
return s.handleError(res, err)
}
argv, err = s.Plugins.DoPreCall(ctx, serviceName, methodName, argv)
if err != nil {
reflectTypePools.Put(mtype.ReplyType, replyv)
return s.handleError(res, err)
}
if mtype.ArgType.Kind() != reflect.Ptr {
err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
} else {
err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
}
return res, nil
}
Client 端工作
- 建立与Server的连接
- 组装数据
- 发送数据包
- 接收处理结果数据包
- 解析返回数据包
客户端和服务端的连接是怎样建立的
client 在调用 server的时候,只需要,NewXClient ,然后调用Call 就可以请求对应服务的方法。
NewXClient
- 通过服务发现的方式获取到服务器列表
- 选择一种负载均衡的方式
- 监听服务列表的变化
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
client := &xClient{
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
servicePath: servicePath,
cachedClient: make(map[string]RPCClient),
option: option,
}
pairs := discovery.GetServices()
sort.Slice(pairs, func(i, j int) bool {
return strings.Compare(pairs[i].Key, pairs[j].Key) <= 0
})
servers := make(map[string]string, len(pairs))
for _, p := range pairs {
servers[p.Key] = p.Value
}
filterByStateAndGroup(client.option.Group, servers)
client.servers = servers
if selectMode != Closest && selectMode != SelectByUser {
client.selector = newSelector(selectMode, servers)
}
client.Plugins = &pluginContainer{}
ch := client.discovery.WatchService()
if ch != nil {
client.ch = ch
go client.watch(ch)
}
return client
}
Call
call 方法做了两件事
- selectClient:根据负载均衡的算法选出一个服务器
- 与服务端建立连接。
func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, args interface{}) (RPCClient, error) {
c.mu.Lock()
client = c.findCachedClient(k, servicePath, serviceMethod)
if client != nil {
if !client.IsClosing() && !client.IsShutdown() {
c.mu.Unlock()
return client, nil
}
c.deleteCachedClient(client, k, servicePath, serviceMethod)
}
client = c.findCachedClient(k, servicePath, serviceMethod)
c.mu.Unlock()
if client == nil || client.IsShutdown() {
c.mu.Lock()
generatedClient, err, _ := c.slGroup.Do(k, func() (interface{}, error) {
return c.generateClient(k, servicePath, serviceMethod)
})
c.mu.Unlock()
c.slGroup.Forget(k)
if err != nil {
return nil, err
}
client = generatedClient.(RPCClient)
if c.Plugins != nil {
needCallPlugin = true
}
client.RegisterServerMessageChan(c.serverMessageChan)
c.mu.Lock()
c.setCachedClient(client, k, servicePath, serviceMethod)
c.mu.Unlock()
}
return client, nil
}
generateClient 函数最底层又调用了 RPCClient.Connect,Connect 实现了各种网络类型的连接方式,http,ws,wss ,或者自定义,也在这里会有一个client.input() 读取返回的数据, client.heartbeat 发送心跳请求
func (client *Client) Connect(network, address string) error {
var conn net.Conn
var err error
switch network {
case "http":
conn, err = newDirectHTTPConn(client, network, address)
case "ws", "wss":
conn, err = newDirectWSConn(client, network, address)
default:
fn := ConnFactories[network]
if fn != nil {
conn, err = fn(client, network, address)
} else {
conn, err = newDirectConn(client, network, address)
}
}
if err == nil && conn != nil {
if tc, ok := conn.(*net.TCPConn); ok && client.option.TCPKeepAlivePeriod > 0 {
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(client.option.TCPKeepAlivePeriod)
}
if client.option.IdleTimeout != 0 {
_ = conn.SetDeadline(time.Now().Add(client.option.IdleTimeout))
}
if client.Plugins != nil {
conn, err = client.Plugins.DoConnCreated(conn)
if err != nil {
return err
}
}
client.Conn = conn
client.r = bufio.NewReaderSize(conn, ReaderBuffsize)
go client.input()
if client.option.Heartbeat && client.option.HeartbeatInterval > 0 {
go client.heartbeat()
}
}
if err != nil && client.Plugins != nil {
client.Plugins.DoConnCreateFailed(network, address)
}
return err
}
客户端是怎样给服务端发送数据的
func (client *Client) send(ctx context.Context, call *Call) {
if client.pending == nil {
client.pending = make(map[uint64]*Call)
}
seq := client.seq
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
if cseq, ok := ctx.Value(seqKey{}).(*uint64); ok {
*cseq = seq
}
data, err := codec.Encode(call.Args)
_, err = client.Conn.Write(*allData)
}
客户端是怎样获取服务端的返回数据的
前面我们在阅读代码的时候可以看到,在 generateClient 的时候,会起一个协程执行 client.input() ,这个client.input() 其实就是读取服务端返回数据的协程。
-
input 对不同的几类请求都有处理:
- 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
是一个request 返回的错误请求。则需要处理错误 - 默认是一个reqeust 的 response
func (client *Client) input() {
var err error
for err == nil {
err = res.Decode(client.r)
if err != nil {
break
}
if client.Plugins != nil {
_ = client.Plugins.DoClientAfterDecode(res)
}
seq := res.Seq()
var call *Call
isServerMessage := (res.MessageType() == protocol.Request && !res.IsHeartbeat() && res.IsOneway())
if !isServerMessage {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
}
switch {
case call == nil:
if isServerMessage {
if client.ServerMessageChan != nil {
client.handleServerRequest(res)
}
continue
}
case res.MessageStatusType() == protocol.Error:
if len(res.Metadata) > 0 {
call.ResMetadata = res.Metadata
if ClientErrorFunc != nil {
call.Error = ClientErrorFunc(res.Metadata[protocol.ServiceError])
} else {
call.Error = strErr(res.Metadata[protocol.ServiceError])
}
}
if call.Raw {
call.Metadata, call.Reply, _ = convertRes2Raw(res)
call.Metadata[XErrorMessage] = call.Error.Error()
} else if len(res.Payload) > 0 {
data := res.Payload
codec := share.Codecs[res.SerializeType()]
if codec != nil {
_ = codec.Decode(data, call.Reply)
}
}
call.done()
default:
if call.Raw {
call.Metadata, call.Reply, _ = convertRes2Raw(res)
} else {
data := res.Payload
if len(data) > 0 {
codec := share.Codecs[res.SerializeType()]
if codec == nil {
call.Error = strErr(ErrUnsupportedCodec.Error())
} else {
err = codec.Decode(data, call.Reply)
if err != nil {
call.Error = strErr(err.Error())
}
}
}
if len(res.Metadata) > 0 {
call.ResMetadata = res.Metadata
}
}
call.done()
}
}
}
其他
消息里为什么要有SeqID
RPCX里面对服务端的调用其实是异步的,即对于当前线程来说,将请求发送出来后,协程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:
- 怎么让当前协程“暂停”,等结果回来后,再向后执行?
- 如果有多个协程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是随机的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个协程调用的?
怎么解决呢?
- client线程每次通过socket调用一次远程接口前,生成一个唯一的ID,即SeqID(SeqID必需保证在一个Socket连接里面是唯一的),一般常常使用uint64从0开始累计数字生成唯一ID;
- 存放到client的pending里面(requestID, call);
- 当协程异步发送消息后,紧接着执行call.Done()的方法试图获取远程返回的结果。在Done()内部,会等待
chan *Call - 服务端接收到请求并处理后,将response结果(此结果中包含了前面的SeqID)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到SeqID,再从前面的pending里面get(SeqID),从而找到Call 对象,再将结果发送到chan *Call 中
- 客户端从Done()取到Call 对象,处理返回的结果
RPCX 核心功能
连接管理
- 保持与服务提供方长连接,用于传输请求数据也返回结果。
RPCX 的连接管理比较简单,其实就是用一个map将连接对存储起来。
client 端
type xClient struct {
cachedClient map[string]RPCClient
}
负载均衡
- 确保多个服务提供方节点流量均匀/合理,支持节点扩容与灰度发布。RPCX 提供了多种负载均衡的方式。轮训,随机,带权重,一致性Hash,网络质量,地理位置。你也可以通过自己实现 Select 接口来实现自己的负载均衡方式,
- 自己实现的负载均衡的方式的话,你就可以自己实现请求路由,达到应用隔离,读写分离,灰度发布中的作用
type Selector interface {
Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string
UpdateServer(servers map[string]string)
}
func newSelector(selectMode SelectMode, servers map[string]string) Selector {
switch selectMode {
case RandomSelect:
return newRandomSelector(servers)
case RoundRobin:
return newRoundRobinSelector(servers)
case WeightedRoundRobin:
return newWeightedRoundRobinSelector(servers)
case WeightedICMP:
return newWeightedICMPSelector(servers)
case ConsistentHash:
return newConsistentHashSelector(servers)
case SelectByUser:
return nil
default:
return newRandomSelector(servers)
}
}
超时处理
- 对于长时间没有返回的请求,需要作出异常处理,及时释放资源。RPCX的超市处理其实是从两个方面来做的
- 在底层网络连接上,通过SetDeadline 对读写请求设置读写超时时间
- 在RPCX框架层面通过select + ctx 实现超时处理
服务保护
- 服务提供方为保证正常运行,主动丢弃超出处理能力外的请求。常见的有限流、熔断、流量丢弃这几种方式
- 限流:对他人调用自己服务的限制,常见的限流算法有:固定窗口, 滑动窗口,令牌桶算法,漏桶限流算法。
- 熔断: 当调用他人服务出现问题的时候,主动放弃调用其他服务。Rpcx 提供了一个简单的断路器 ConsecCircuitBreaker, 它在连续失败一定次数后就会断开,再经过一段时间后打开。 你可以将你的断路器设置到 Option.Breaker中。
- 流量丢弃:框架实现,当RPC Server 端挤压的请求较多时,RPC框架直接将这个请求丢弃。在Go中,可以用一个带缓冲的channel 实现,当缓冲区满了时,直接将流量丢弃。
失败重试
Failbackup 重试模式
- 生成两个Call,用来接受可能的两次rpc请求的结果
- 第一个请求开始
- 设置第一个请求多久没有返回才开启第二个请求的时间
- select 监听 ctx是否退出,第一个请求, 是否达到设置的请求时间第一个请求还没有返回
- 如果ctx Done。则直接返回
- 如果第一个请求的结果返回,则处理结果
- 如果达到设置的请求时间第一个请求还没有返回,则开始第二个请求
- 开始发送第二个请求
- select 就需要 ctx是否退出,第一个请求,第二个请求。看哪一个数据先准备好
- 如果ctx Done。则直接返回
- 如果第一个请求的结果返回,则处理结果
- 如果第二个请求的结果返回,则处理结果
case Failbackup:
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
call1 := make(chan *Call, 10)
call2 := make(chan *Call, 10)
var reply1, reply2 interface{}
if reply != nil {
reply1 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
reply2 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
}
_, err1 := c.Go(ctx, serviceMethod, args, reply1, call1)
t := time.NewTimer(c.option.BackupLatency)
select {
case <-ctx.Done():
err = ctx.Err()
return err
case call := <-call1:
err = call.Error
if err == nil && reply != nil {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
}
return err
case <-t.C:
}
_, err2 := c.Go(ctx, serviceMethod, args, reply2, call2)
if err2 != nil {
if uncoverError(err2) {
c.removeClient(k, c.servicePath, serviceMethod, client)
}
err = err1
return err
}
select {
case <-ctx.Done():
err = ctx.Err()
case call := <-call1:
err = call.Error
if err == nil && reply != nil && reply1 != nil {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
}
case call := <-call2:
err = call.Error
if err == nil && reply != nil && reply2 != nil {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply2).Elem())
}
}
return err
总结
-
通过分析了RPCX框架的实现原理,我们大概知道了一次RPC请求到底发生了什么事情。我们也可以知道 -
RPC 主要用于公司内部的服务调用,性能消耗低,传输效率高,实现复杂。 -
HTTP 主要用于对外的异构环境,浏览器接口调用,App 接口调用,第三方接口调用等。 -
RPC 使用场景(大型的网站,内部子系统较多、接口非常多的情况下适合使用 RPC):
- 长链接。不必每次通信都要像 HTTP 一样去 3 次握手,减少了网络开销。
- 注册发布机制。RPC 框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
- 安全性,没有暴露资源操作。
- 微服务支持。就是最近流行的服务化架构、服务化治理,RPC 框架是一个强力的支撑。
|