IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> RPC 实战与核心原理分析 -> 正文阅读

[网络协议]RPC 实战与核心原理分析

RPC 实战与核心原理分析

  • RPCX是一个分布式的Go语言的 RPC 框架,支持Zookepper、etcd、consul多种服务发现方式,多种服务路由方式,

例子

服务端

package main

import (
"flag"

example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)

var addr = flag.String("addr", "localhost:8972", "server address")

func main() {
flag.Parse()

s := server.NewServer()
// s.RegisterName("Arith", new(example.Arith), "")
s.Register(new(example.Arith), "")
s.Serve("tcp", *addr)
}

客户端

package main

import (
	"context"
	"flag"
	"log"
	"time"

	example "github.com/rpcxio/rpcx-examples"
	"github.com/smallnest/rpcx/client"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
	flag.Parse()

	d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
	defer xclient.Close()

	args := &example.Args{
		A: 10,
		B: 20,
	}

	for {
		reply := &example.Reply{}
		err := xclient.Call(context.Background(), "Mul", args, reply)
		if err != nil {
			log.Fatalf("failed to call: %v", err)
		}

		log.Printf("%d * %d = %d", args.A, args.B, reply.C)
		time.Sleep(1e9)
	}

}

这是一个简单的例子,使用RPCX实现了服务端和客户端。

RPCX实例分析

如何发布自己的服务

在上面的例子中,我们只需要用service.NewServer()→ RegisterName → Serve 就可以开始一个rpc服务端。我们也可以使用rpcx的插件功能,将服务端的地址注册到持久化存储当中,这样client 就可以使用服务发现的功能来对服务端进行rpc调用。

使用RegistryPlugin的方式进行服务注册时,rpcx 会在服务启动前将服务注册到注册中心中。

// RegisterPlugin is .
// 注册中心接口定义,如果你要实现自己的注册中心,就需要实现这个RegisterPlugin接口
RegisterPlugin interface {
	Register(name string, rcvr interface{}, metadata string) error
	Unregister(name string) error
}

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {
	_, err := s.register(rcvr, name, true)
	if err != nil {
		return err
	}
	if s.Plugins == nil {
		s.Plugins = &pluginContainer{}
	}
    // RegistryPlugin 执行,进行服务注册
	return s.Plugins.DoRegister(name, rcvr, metadata)
}
  • rpcx 支持 ZooKeeper,Etcd,mDNS,Consul,Redis,DNS等方式进行服务注册。例子可以参考:https://github.com/rpcxio/rpcx-examples/tree/master/registry。

如何调用他人的远程服务

我们知道,一个RPC的调用会涉及到两端,一个是Client,一个是Server端。两端的基本工作如下

Server 端工作

  • 监听端口
  • 响应连接请求
  • 接收数据包
  • 解析数据包
  • 调用相应方法
  • 组装请求处理结果数据包
  • 发送结果数据包
  • 序列化协议

服务端如何接收客户端请求

serveListener
  • 服务端在监听端口之后,会有一个serveListener 函数,
  • 这个函数一个for循环 Accept 客户端过来的连接,
  • 最后会调用 serveConn 来处理这个请求
// serveListener accepts incoming connections on the Listener ln,
// creating a new service goroutine for each.
// The service goroutines read requests and then call services to reply to them.
func (s *Server) serveListener(ln net.Listener) error {
	var tempDelay time.Duration

	s.mu.Lock()
	s.ln = ln
	s.mu.Unlock()

	for {
        // 接受客户端过来的连接
		conn, e := ln.Accept()         
        // 省略代码........   
        if tc, ok := conn.(*net.TCPConn); ok {
			period := s.options["TCPKeepAlivePeriod"]
			if period != nil {
				tc.SetKeepAlive(true)
				tc.SetKeepAlivePeriod(period.(time.Duration))
				tc.SetLinger(10)
			}
		}
        // 处理请求之前的操作
		conn, ok := s.Plugins.DoPostConnAccept(conn)
		if !ok {
			conn.Close()
			continue
		}
        // 保存到这个请求
		s.mu.Lock()
		s.activeConn[conn] = struct{}{}
		s.mu.Unlock()

		if share.Trace {
			log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())
		}
        // 处理连接请求
		go s.serveConn(conn)
	}
}
serveConn

serveConn 是读取和处理用户过来的请求,如果开启了异步写,服务端会异步response给client,
其中 readRequest 从网络连接中读取到客户端过来的数据,这里会包含解码解压缩操作
processOneRequest 就是真正处理客户端请求的函数了,这里会通过反射调用服务的方法

func (s *Server) serveConn(conn net.Conn) {

    // 省略代码........

	r := bufio.NewReaderSize(conn, ReaderBuffsize)
    // 开启异步response给client
	var writeCh chan *[]byte
	if s.AsyncWrite {
		writeCh = make(chan *[]byte, 1)
		defer close(writeCh)
		go s.serveAsyncWrite(conn, writeCh)
	}

	// read requests and handle it
    // 读取和处理请求
	for {
        // 省略代码......
        // 读取请求
		req, err := s.readRequest(ctx, r)
	    // 处理请求
		if s.pool != nil {
			s.pool.Submit(func() {
				s.processOneRequest(ctx, req, conn, writeCh)
			})
		} else {
			go s.processOneRequest(ctx, req, conn, writeCh)
		}
	}
}

服务端解析客户端请求

  • 协议详解见:RPCX协议详解
  • 在Decode中会进行协议解析
func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Message, err error) {
    // 处理request之前的操作
	err = s.Plugins.DoPreReadRequest(ctx)
	if err != nil {
		return nil, err
	}
	// pool req?
	req = protocol.GetPooledMsg()
    // 解码解压缩
	err = req.Decode(r)
	if err == io.EOF {
		return req, err
	}
    // 处理request之前的操作
	perr := s.Plugins.DoPostReadRequest(ctx, req, err)
	if err == nil {
		err = perr
	}
	return req, err
}

如果配置了压缩方式,也会对服务的数据进行解压缩

func (m *Message) Decode(r io.Reader) error {

    // 前面都是协议的解析

    // 如果配置了压缩方式,对服务的数据进行解压缩到 Payload 中
	if m.CompressType() != None {
		compressor := Compressors[m.CompressType()]
		if compressor == nil {
			return ErrUnsupportedCompressor
		}
		m.Payload, err = compressor.Unzip(m.Payload)
		if err != nil {
			return err
		}
	}

	return err
}

服务端调用相应方法和返回结果给客户端端

接受到用户的请求之后,就是调用服务端对应的方法了,在用户消息Decode 中,服务端已经把ServicePath、ServiceMethod 解析出来了,剩下的就是只要找到对应的方法就可以执行

	m.ServicePath = util.SliceByteToString(data[n:nEnd])
	m.ServiceMethod = util.SliceByteToString(data[n:nEnd])

前面在 servConn 中的函数 processOneRequest 会处理Client的请求,我们来看下,这里是怎么处理的,

  • 先解析出 Payload 中的 请求参数,
  • 然后用参数去调用 service 这个函数service := s.serviceMap[serviceName] 就是 服务注册过去的业务函数。
  • 然后用sendResponse 发送responseclient
func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
	serviceName := req.ServicePath
	methodName := req.ServiceMethod

    // 代码省略....

	// get a argv object from object pool
	argv := reflectTypePools.Get(mtype.ArgType)

	codec := share.Codecs[req.SerializeType()]
	if codec == nil {
		err = fmt.Errorf("can not find codec for %d", req.SerializeType())
		return s.handleError(res, err)
	}
    // 解析用户的请求参数
	err = codec.Decode(req.Payload, argv)
	if err != nil {
		return s.handleError(res, err)
	}

	argv, err = s.Plugins.DoPreCall(ctx, serviceName, methodName, argv)
	if err != nil {
		// return reply to object pool
		reflectTypePools.Put(mtype.ReplyType, replyv)
		return s.handleError(res, err)
	}
    // 调用service中注册的业务函数,处理业务方式
	if mtype.ArgType.Kind() != reflect.Ptr {
		err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
	} else {
		err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
	}

   // 代码省略....
	return res, nil
}

Client 端工作

  • 建立与Server的连接
  • 组装数据
  • 发送数据包
  • 接收处理结果数据包
  • 解析返回数据包

客户端和服务端的连接是怎样建立的

client 在调用 server的时候,只需要,NewXClient,然后调用Call 就可以请求对应服务的方法。

NewXClient
  • 通过服务发现的方式获取到服务器列表
  • 选择一种负载均衡的方式
  • 监听服务列表的变化
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
	client := &xClient{
		failMode:     failMode,
		selectMode:   selectMode,
		discovery:    discovery,
		servicePath:  servicePath,
		cachedClient: make(map[string]RPCClient),
		option:       option,
	}
    // 通过服务发现的方式获取到服务器列表
	pairs := discovery.GetServices()
	sort.Slice(pairs, func(i, j int) bool {
		return strings.Compare(pairs[i].Key, pairs[j].Key) <= 0
	})
	servers := make(map[string]string, len(pairs))
	for _, p := range pairs {
		servers[p.Key] = p.Value
	}
	filterByStateAndGroup(client.option.Group, servers)

    // 选择一种负载均衡的方式
	client.servers = servers
	if selectMode != Closest && selectMode != SelectByUser {
		client.selector = newSelector(selectMode, servers)
	}

	client.Plugins = &pluginContainer{}

   // 监听服务列表的变化
	ch := client.discovery.WatchService()
	if ch != nil {
		client.ch = ch
		go client.watch(ch)
	}

	return client
}
Call
call 方法做了两件事
  • selectClient:根据负载均衡的算法选出一个服务器
  • 与服务端建立连接。
func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, args interface{}) (RPCClient, error) {

    // 先通过缓存拿到RPCClient,如果能拿到,就不需要再新建连接了
	c.mu.Lock()
	client = c.findCachedClient(k, servicePath, serviceMethod)
	if client != nil {
		if !client.IsClosing() && !client.IsShutdown() {
			c.mu.Unlock()
			return client, nil
		}
		c.deleteCachedClient(client, k, servicePath, serviceMethod)
	}
    // 为什么这里会再查一次缓存,有可能在第一次findCachedClient的时候还没有缓存,
    // 但是之前后有连接建立了,新建了缓存,这里做一个Double Check
	client = c.findCachedClient(k, servicePath, serviceMethod)
	c.mu.Unlock()

	if client == nil || client.IsShutdown() {
		c.mu.Lock()
		generatedClient, err, _ := c.slGroup.Do(k, func() (interface{}, error) {
            // 去建立连接
			return c.generateClient(k, servicePath, serviceMethod)
		})
		c.mu.Unlock()

		c.slGroup.Forget(k)
		if err != nil {
			return nil, err
		}

		client = generatedClient.(RPCClient)
		if c.Plugins != nil {
			needCallPlugin = true
		}

		client.RegisterServerMessageChan(c.serverMessageChan)

		c.mu.Lock()
		c.setCachedClient(client, k, servicePath, serviceMethod)
		c.mu.Unlock()
	}

	return client, nil
}

generateClient 函数最底层又调用了 RPCClient.Connect,Connect 实现了各种网络类型的连接方式,http,ws,wss,或者自定义,也在这里会有一个client.input() 读取返回的数据, client.heartbeat 发送心跳请求

// Connect connects the server via specified network.
func (client *Client) Connect(network, address string) error {
	var conn net.Conn
	var err error
    // 实现了各种网络类型的连接方式,http,ws,wss,或者自定义
    // 默认是tcp
	switch network {
	case "http":
		conn, err = newDirectHTTPConn(client, network, address)
	case "ws", "wss":
		conn, err = newDirectWSConn(client, network, address)
	default:
		fn := ConnFactories[network]
		if fn != nil {
            // 自定义的连接方式
			conn, err = fn(client, network, address)
		} else {
            // tcp
			conn, err = newDirectConn(client, network, address)
		}
	}

	if err == nil && conn != nil {
		if tc, ok := conn.(*net.TCPConn); ok && client.option.TCPKeepAlivePeriod > 0 {
			_ = tc.SetKeepAlive(true)
			_ = tc.SetKeepAlivePeriod(client.option.TCPKeepAlivePeriod)
		}

		if client.option.IdleTimeout != 0 {
			_ = conn.SetDeadline(time.Now().Add(client.option.IdleTimeout))
		}

		if client.Plugins != nil {
			conn, err = client.Plugins.DoConnCreated(conn)
			if err != nil {
				return err
			}
		}

		client.Conn = conn
		client.r = bufio.NewReaderSize(conn, ReaderBuffsize)
		// c.w = bufio.NewWriterSize(conn, WriterBuffsize)

		// start reading and writing since connected
        // 读取返回的数据
		go client.input()

		if client.option.Heartbeat && client.option.HeartbeatInterval > 0 {
			go client.heartbeat()
		}

	}

	if err != nil && client.Plugins != nil {
		client.Plugins.DoConnCreateFailed(network, address)
	}

	return err
}

客户端是怎样给服务端发送数据的

  • wrapCall:建立连接之后,发送请求给服务端,最底层其实是调用了 client.send(ctx, call)

  • 首先会利用pending 一个map 结构 记录当前未结束的请求

  • 然后将当前的序列号+1

  • 对请求进行编码

  • 对服务方请求,把请求给发送到服务端

func (client *Client) send(ctx context.Context, call *Call) {

    // .......

    // 代码省略......
    // 记录当前未结束的请求
	if client.pending == nil {
		client.pending = make(map[uint64]*Call)
	}

	seq := client.seq
    // 序列号+1
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()

	if cseq, ok := ctx.Value(seqKey{}).(*uint64); ok {
		*cseq = seq
	}

    // 对请求进行编码 .....
	data, err := codec.Encode(call.Args)
    // 代码省略......
    // 对服务方请求,这个时候就是把请求给发送过去了
	_, err = client.Conn.Write(*allData)

    // 代码省略......

}

客户端是怎样获取服务端的返回数据的

前面我们在阅读代码的时候可以看到,在 generateClient 的时候,会起一个协程执行 client.input(),这个client.input() 其实就是读取服务端返回数据的协程。

  • input 对不同的几类请求都有处理:

    • 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
      是一个request 返回的错误请求。则需要处理错误
    • 默认是一个reqeust 的 response
func (client *Client) input() {
	var err error

	for err == nil {
        // 代码省略......

        // 解码
		err = res.Decode(client.r)
		if err != nil {
			break
		}
		if client.Plugins != nil {
			_ = client.Plugins.DoClientAfterDecode(res)
		}

		 // 代码省略......

        seq := res.Seq()
		var call *Call
		isServerMessage := (res.MessageType() == protocol.Request && !res.IsHeartbeat() && res.IsOneway())
		if !isServerMessage {
			client.mutex.Lock()
			call = client.pending[seq]
			delete(client.pending, seq)
			client.mutex.Unlock()
		}

         // 针对不同的请求进行处理
		switch {
		case call == nil:
         // 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
			if isServerMessage {
				if client.ServerMessageChan != nil {
					client.handleServerRequest(res)
				}
				continue
			}
		case res.MessageStatusType() == protocol.Error:
            // 是一个request 返回的错误请求。则需要处理错误
			// We've got an error response. Give this to the request
			if len(res.Metadata) > 0 {
				call.ResMetadata = res.Metadata

				// convert server error to a customized error, which implements ServerError interface
				if ClientErrorFunc != nil {
					call.Error = ClientErrorFunc(res.Metadata[protocol.ServiceError])
				} else {
					call.Error = strErr(res.Metadata[protocol.ServiceError])
				}

			}

			if call.Raw {
				call.Metadata, call.Reply, _ = convertRes2Raw(res)
				call.Metadata[XErrorMessage] = call.Error.Error()
			} else if len(res.Payload) > 0 {
				data := res.Payload
				codec := share.Codecs[res.SerializeType()]
				if codec != nil {
					_ = codec.Decode(data, call.Reply)
				}
			}
			call.done()
		default:
            // 默认就是一个request 的 response 请求
			if call.Raw {
				call.Metadata, call.Reply, _ = convertRes2Raw(res)
			} else {
				data := res.Payload
				if len(data) > 0 {
					codec := share.Codecs[res.SerializeType()]
					if codec == nil {
						call.Error = strErr(ErrUnsupportedCodec.Error())
					} else {
						err = codec.Decode(data, call.Reply)
						if err != nil {
							call.Error = strErr(err.Error())
						}
					}
				}
				if len(res.Metadata) > 0 {
					call.ResMetadata = res.Metadata
				}

			}

			call.done()
		}
	}
    // 代码省略......
}

其他

消息里为什么要有SeqID

RPCX里面对服务端的调用其实是异步的,即对于当前线程来说,将请求发送出来后,协程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:

  • 怎么让当前协程“暂停”,等结果回来后,再向后执行?
  • 如果有多个协程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是随机的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个协程调用的?

怎么解决呢?

  • client线程每次通过socket调用一次远程接口前,生成一个唯一的ID,即SeqID(SeqID必需保证在一个Socket连接里面是唯一的),一般常常使用uint64从0开始累计数字生成唯一ID;
  • 存放到client的pending里面(requestID, call);
  • 当协程异步发送消息后,紧接着执行call.Done()的方法试图获取远程返回的结果。在Done()内部,会等待 chan *Call
  • 服务端接收到请求并处理后,将response结果(此结果中包含了前面的SeqID)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到SeqID,再从前面的pending里面get(SeqID),从而找到Call 对象,再将结果发送到chan *Call 中
  • 客户端从Done()取到Call 对象,处理返回的结果

RPCX 核心功能

连接管理

  • 保持与服务提供方长连接,用于传输请求数据也返回结果。

RPCX 的连接管理比较简单,其实就是用一个map将连接对存储起来。

client 端

type xClient struct {
    // key: 是根据:servicePath, serviceMethod,args 由不同的负载均衡算法算出来的,
    // RPCClient 就是真正的持有连接的Client
	cachedClient map[string]RPCClient
}

负载均衡

  • 确保多个服务提供方节点流量均匀/合理,支持节点扩容与灰度发布。RPCX 提供了多种负载均衡的方式。轮训,随机,带权重,一致性Hash,网络质量,地理位置。你也可以通过自己实现 Select 接口来实现自己的负载均衡方式,
  • 自己实现的负载均衡的方式的话,你就可以自己实现请求路由,达到应用隔离,读写分离,灰度发布中的作用
type Selector interface {
	Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string // SelectFunc
	UpdateServer(servers map[string]string)
}

// 负载均衡的实现
func newSelector(selectMode SelectMode, servers map[string]string) Selector {
	switch selectMode {
    // 随机     
	case RandomSelect:
		return newRandomSelector(servers)
    // 轮训
	case RoundRobin:
		return newRoundRobinSelector(servers)
    // 带权重    
	case WeightedRoundRobin:
		return newWeightedRoundRobinSelector(servers)
    // 网络质量
	case WeightedICMP:
		return newWeightedICMPSelector(servers)
    // 一致性Hash
	case ConsistentHash:
		return newConsistentHashSelector(servers)
    // 
	case SelectByUser:
		return nil
	default:
    // 随机
		return newRandomSelector(servers)
	}
}

超时处理

  • 对于长时间没有返回的请求,需要作出异常处理,及时释放资源。RPCX的超市处理其实是从两个方面来做的
    • 在底层网络连接上,通过SetDeadline 对读写请求设置读写超时时间
    • 在RPCX框架层面通过select + ctx 实现超时处理

服务保护

  • 服务提供方为保证正常运行,主动丢弃超出处理能力外的请求。常见的有限流、熔断、流量丢弃这几种方式
    • 限流:对他人调用自己服务的限制,常见的限流算法有:固定窗口, 滑动窗口,令牌桶算法,漏桶限流算法。
    • 熔断: 当调用他人服务出现问题的时候,主动放弃调用其他服务。Rpcx 提供了一个简单的断路器 ConsecCircuitBreaker, 它在连续失败一定次数后就会断开,再经过一段时间后打开。 你可以将你的断路器设置到 Option.Breaker中。
    • 流量丢弃:框架实现,当RPC Server 端挤压的请求较多时,RPC框架直接将这个请求丢弃。在Go中,可以用一个带缓冲的channel 实现,当缓冲区满了时,直接将流量丢弃。

失败重试

  • rpcx支持四种调用失败模式,用来处理服务调用失败后的处理逻辑, 你可以在创建XClient的时候设置它。

    • Failfast: 一旦调用一个节点失败, rpcx立即会返回错误。 注意这个错误不是业务上的 Error, 业务上服务端返回的Error应该正常返回给客户端,这里的错误可能是网络错误或者服务异常。默认策略就是.
      Failfast
    • Failtry:rpcx如果调用一个节点的服务出现错误, 它也会尝试,但是还是选择这个节点进行重试, 直到节点正常返回数据或者达到最大重试次数。
    • Failover: rpcx如果遇到错误,它会尝试调用另外一个节点, 直到服务节点能正常返回信息,或者达到最大的重试次数。 重试测试Retries在参数Option中设置, 默认设置为3。
    • Failbackup:如果服务节点在一定的时间内不返回结果, rpcx客户端会发送相同的请求到另外一个节点, 只要这两个节点有一个返回, rpcx就算调用成功。这种通过资源换取延迟的方式可以参看 Jeff Dean的文章 Achieving Rapid Response Times in Large Online Services,这种实现非常重要,我们重点来看这是怎么实现的

Failbackup 重试模式

  • 生成两个Call,用来接受可能的两次rpc请求的结果
  • 第一个请求开始
  • 设置第一个请求多久没有返回才开启第二个请求的时间
  • select 监听 ctx是否退出,第一个请求, 是否达到设置的请求时间第一个请求还没有返回
    • 如果ctx Done。则直接返回
    • 如果第一个请求的结果返回,则处理结果
    • 如果达到设置的请求时间第一个请求还没有返回,则开始第二个请求
  • 开始发送第二个请求
  • select 就需要 ctx是否退出,第一个请求,第二个请求。看哪一个数据先准备好
    • 如果ctx Done。则直接返回
    • 如果第一个请求的结果返回,则处理结果
    • 如果第二个请求的结果返回,则处理结果
// Failbackup 重试模式
case Failbackup:
		ctx, cancelFn := context.WithCancel(ctx)
		defer cancelFn()
        // 生成两个Call,用来接受可能的两次rpc请求的结果
		call1 := make(chan *Call, 10)
		call2 := make(chan *Call, 10)
		var reply1, reply2 interface{}

		if reply != nil {
			reply1 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
			reply2 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
		}
        // 第一个请求开始
		_, err1 := c.Go(ctx, serviceMethod, args, reply1, call1)

        // 第一个请求多久没有返回才开启第二个请求
		t := time.NewTimer(c.option.BackupLatency)
        // select 监听 ctx,call,t 看谁先返回数据
		select {
        // 如果ctx Done。则直接返回
		case <-ctx.Done(): // cancel by context
			err = ctx.Err()
			return err
        // 第一个请求的结果
		case call := <-call1:
			err = call.Error
            // 有请求数据返回
			if err == nil && reply != nil {
                // 设置给reply
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
			}
            // 结束
			return err
         // 如果t的时间先到,则这里什么也不做,而直接走到下面的代码   
		case <-t.C:

		}
        // 这里开始发送第二个请求
		_, err2 := c.Go(ctx, serviceMethod, args, reply2, call2)
		if err2 != nil {
			if uncoverError(err2) {
				c.removeClient(k, c.servicePath, serviceMethod, client)
			}
			err = err1
			return err
		}
        // 这里select 就需要监听 ctx,第一个请求call1,第二个请求call2。看哪一个数据先准备好
		select {
		case <-ctx.Done(): // cancel by context
			err = ctx.Err()
		case call := <-call1:
			err = call.Error
			if err == nil && reply != nil && reply1 != nil {
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
			}
		case call := <-call2:
			err = call.Error
			if err == nil && reply != nil && reply2 != nil {
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply2).Elem())
			}
		}
		return err

总结

  • 通过分析了RPCX框架的实现原理,我们大概知道了一次RPC请求到底发生了什么事情。我们也可以知道

  • RPC 主要用于公司内部的服务调用,性能消耗低,传输效率高,实现复杂。

  • HTTP 主要用于对外的异构环境,浏览器接口调用,App 接口调用,第三方接口调用等。

  • RPC 使用场景(大型的网站,内部子系统较多、接口非常多的情况下适合使用 RPC):

    • 长链接。不必每次通信都要像 HTTP 一样去 3 次握手,减少了网络开销。
    • 注册发布机制。RPC 框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
    • 安全性,没有暴露资源操作。
    • 微服务支持。就是最近流行的服务化架构、服务化治理,RPC 框架是一个强力的支撑。
  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-09-24 21:27:49  更:2022-09-24 21:27:58 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/28 5:29:31-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计