IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 网络协议 -> 读猿码系列——1. gRPC+Etcd3的服务发现&负载均衡 -> 正文阅读

[网络协议]读猿码系列——1. gRPC+Etcd3的服务发现&负载均衡

项目源码:https://github.com/wwcd/grpc-lb

项目文档:https://segmentfault.com/a/1190000008672912
?

我们先把项目down下来,它的目录结构如下:

我们先去掉其他组件,单来看下gRPC的调用流程,下图是官方文档中的调用流程图:

  • 首先客户端(gRPC stub)调用A方法,发起RPC调用;

  • 对请求信息使用Protobuf进行对象序列化压缩;

  • 然后在服务端(gRPC Sever)接收到请求后,解码请求体,进行业务处理逻辑并返回;

  • 对响应结果使用Protobuf进行对象序列化压缩;

  • 客户端接收到服务端响应,解码请求体。回调被调用的A方法,唤醒正在等待响应(阻塞)的客户端调用并响应结果。

gRPC使用流程如图:

我们先定义一个helloworld.proto,然后自动用protoc-gen-go生成go代码。以下是proto:

service?Greeter?{
????rpc?SayHello?(HelloRequest)?returns?(HelloReply)?{
????????option?(google.api.http)?=?{
????????????post:?"/hello"
????????????body:?"*"
????????};
????}
}

message?HelloRequest?{
????string?name?=?1;
}

message?HelloReply?{
????string?message?=?1;
}

首先在服务启动后gRPC客户端向命名服务器发出名称解析请求,名称将解析为一个或多个IP地址,对应cli.go。

package?main

import?(
????"context"
????"flag"
????"strconv"
????"time"

????"google.golang.org/grpc"
????"google.golang.org/grpc/balancer/roundrobin"
????"google.golang.org/grpc/resolver"

????"github.com/sirupsen/logrus"
????pb?"github.com/wwcd/grpc-lb/cmd/helloworld"
????grpclb?"github.com/wwcd/grpc-lb/etcdv3"
)

var?(
????svc?=?flag.String("service",?"hello_service",?"service?name")
????reg?=?flag.String("reg",?"http://localhost:2379",?"register?etcd?address")
)

func?main()?{
????flag.Parse()
????r?:=?grpclb.NewResolver(*reg,?*svc)
????resolver.Register(r)

????ctx,?cancel?:=?context.WithTimeout(context.Background(),?10*time.Second)
????conn,?err?:=?grpc.DialContext(ctx,?r.Scheme()+"://authority/"+*svc,?grpc.WithInsecure(),?grpc.WithBalancerName(roundrobin.Name),?grpc.WithBlock())
????cancel()
????if?err?!=?nil?{
????????panic(err)
????}

????ticker?:=?time.NewTicker(1000?*?time.Millisecond)
????for?t?:=?range?ticker.C?{
????????client?:=?pb.NewGreeterClient(conn)
????????resp,?err?:=?client.SayHello(context.Background(),?&pb.HelloRequest{Name:?"world?"?+?strconv.Itoa(t.Second())})
????????if?err?==?nil?{
????????????logrus.Infof("%v:?Reply?is?%s\n",?t,?resp.Message)
????????}
????}
}

DialContext为给定目标创建客户端连接,方法默认是非阻塞的,即该功能不会等待建立连接,连接在后台进行,可以使用WithBlock()改为阻塞。非阻塞情况下,ctx不会对连接起作用只用作设置;阻塞情况可以使用ctx取消或终止挂起的连接。

NewTicker设置一个滴答时钟,用来调整时间间隔和发送速度,返回一个包含时间channel的结构体。声明一个变量接收已建立连接的客户端GreeterClient对象,客户端对象开始调用SayHello方法。

客户端调用方式我们大概有数了,对应服务端svr.go代码如下。

package?main

import?(
????"context"
????"flag"
????"net"
????"os"
????"os/signal"
????"syscall"
????"time"

????"google.golang.org/grpc"

????"github.com/sirupsen/logrus"
????pb?"github.com/wwcd/grpc-lb/cmd/helloworld"
????grpclb?"github.com/wwcd/grpc-lb/etcdv3"
)

var?(
????serv?=?flag.String("service",?"hello_service",?"service?name")
????host?=?flag.String("host",?"localhost",?"listening?host")
????port?=?flag.String("port",?"50001",?"listening?port")
????reg??=?flag.String("reg",?"http://localhost:2379",?"register?etcd?address")
)

func?main()?{
????flag.Parse()

????lis,?err?:=?net.Listen("tcp",?net.JoinHostPort(*host,?*port))
????if?err?!=?nil?{
????????panic(err)
????}

????err?=?grpclb.Register(*reg,?*serv,?*host,?*port,?time.Second*10,?15)
????if?err?!=?nil?{
????????panic(err)
????}

????ch?:=?make(chan?os.Signal,?1)
????signal.Notify(ch,?syscall.SIGTERM,?syscall.SIGINT,?syscall.SIGHUP,?syscall.SIGQUIT)
????go?func()?{
????????s?:=?<-ch
????????logrus.Infof("receive?signal?'%v'",?s)
????????grpclb.UnRegister()
????????os.Exit(1)
????}()

????logrus.Infof("starting?hello?service?at?%s",?*port)
????s?:=?grpc.NewServer()
????pb.RegisterGreeterServer(s,?&server{})
????s.Serve(lis)
}


type?server?struct{}


func?(s?*server)?SayHello(ctx?context.Context,?in?*pb.HelloRequest)?(*pb.HelloReply,?error)?{
????logrus.Infof("%v:?Receive?is?%s\n",?time.Now(),?in.Name)
????return?&pb.HelloReply{Message:?"Hello?"?+?in.Name?+?"?from?"?+?net.JoinHostPort(*host,?*port)},?nil
}

JoinHostPort将主机和端口组合成“host: port”形式的网络地址,用来作为监听对象;

Etcd中的Register方法将“host: port”形式的网络监听信息用KV形式存储,之后进行建立租约、存放键值、赋予租约永久有效操作。

NewServer创建一个gRPC服务器,该服务器没有注册服务,并且还没有开始接受请求,调用注册GreeterServer方法后,Server方法为每个监听的连接创建一个新的ServerTransport和service goroutine。在func SayHello中补充了业务逻辑处理。

客户端和服务端的逻辑大概了解了,我们知道gRPC是支持多路复用技术的,我们在网关处创建一个映射为空的ServeMux,然后将handlers处理程序注册到里面,对应gw.go。

package?main

import?(
????"context"
????"flag"
????"net"
????"net/http"
????"time"

????"github.com/grpc-ecosystem/grpc-gateway/runtime"
????"github.com/sirupsen/logrus"
????"google.golang.org/grpc"
????"google.golang.org/grpc/balancer/roundrobin"
????"google.golang.org/grpc/resolver"

????pb?"github.com/wwcd/grpc-lb/cmd/helloworld"
????grpclb?"github.com/wwcd/grpc-lb/etcdv3"
)

var?(
????svc??=?flag.String("service",?"hello_service",?"service?name")
????host?=?flag.String("host",?"localhost",?"listening?host")
????port?=?flag.String("port",?"60001",?"listening?port")
????reg??=?flag.String("reg",?"http://localhost:2379",?"register?etcd?address")
)

func?main()?{
????flag.Parse()
????r?:=?grpclb.NewResolver(*reg,?*svc)
????resolver.Register(r)

????ctx,?cancel?:=?context.WithTimeout(context.Background(),?10*time.Second)
????//?https://github.com/grpc/grpc/blob/master/doc/naming.md
????//?The?gRPC?client?library?will?use?the?specified?scheme?to?pick?the?right?resolver?plugin?and?pass?it?the?fully?qualified?name?string.
????conn,?err?:=?grpc.DialContext(ctx,?r.Scheme()+"://authority/"+*svc,?grpc.WithInsecure(),?grpc.WithBalancerName(roundrobin.Name),?grpc.WithBlock())
????cancel()
????if?err?!=?nil?{
????????panic(err)
????}

????mux?:=?runtime.NewServeMux()
????err?=?pb.RegisterGreeterHandler(ctx,?mux,?conn)
????if?err?!=?nil?{
????????panic(err)
????}

????//?Start?HTTP?server?(and?proxy?calls?to?gRPC?server?endpoint)
????logrus.Fatal(http.ListenAndServe(net.JoinHostPort(*host,?*port),?mux))
}

到此我们详细看了客户端及服务端的具体实现。在gRPC的设计文档中提供了服务注册及服务发现的思路,也为不同语言提供了命名解析和负载均衡接口供扩展。

其基本实现原理:

  1. gRPC客户端向命名服务器(resolver)发出名称解析请求,名称将解析为一个或者多个IP,每个IP标识它是服务器地址还是负载均衡器地址,以及标识要使用哪个客户端服务配置或负载均衡策略。

  2. 客户端实例化负载均衡策略,如果解析返回负载均衡地址,客户端将使用grpclb策略,否则客户端使用服务配置请求的负载均衡策略。

  3. 负载均衡策略为每个服务器地址创建一个子通道(channel)。

  4. 当有rpc请求时,负载均衡策略决定哪个子通道即grpc服务器将接收请求,当可用服务器为空时客户端的请求将被阻塞。

我们接着对基于Etcd3的服务发现部分代码进行深入,对应etcdv3 resolver.go中的watch(),方法中调用了etcd提供的clientv3.Watcher。

func?(r?*Resolver)?watch(prefix?string)?{
????addrDict?:=?make(map[string]resolver.Address)

????update?:=?func()?{
????????addrList?:=?make([]resolver.Address,?0,?len(addrDict))
????????for?_,?v?:=?range?addrDict?{
????????????addrList?=?append(addrList,?v)
????????}
????????r.cc.UpdateState(resolver.State{Addresses:?addrList})
????}

????resp,?err?:=?r.cli.Get(context.Background(),?prefix,?clientv3.WithPrefix())
????if?err?==?nil?{
????????for?i?:=?range?resp.Kvs?{
????????????addrDict[string(resp.Kvs[i].Value)]?=?resolver.Address{Addr:?string(resp.Kvs[i].Value)}
????????}
????}

????update()

????rch?:=?r.cli.Watch(context.Background(),?prefix,?clientv3.WithPrefix(),?clientv3.WithPrevKV())
????for?n?:=?range?rch?{
????????for?_,?ev?:=?range?n.Events?{
????????????switch?ev.Type?{
????????????case?mvccpb.PUT:
????????????????addrDict[string(ev.Kv.Key)]?=?resolver.Address{Addr:?string(ev.Kv.Value)}
????????????case?mvccpb.DELETE:
????????????????delete(addrDict,?string(ev.PrevKv.Key))
????????????}
????????}
????????update()
????}
}

主要方法在cli.Get()以及cli.Watch()方法,先来看cli.Get(),它的作用是检索key对应的value。

再来看cli.Watch()方法

启动测试程序:

*注: golang1.11以上版本进行测试*
# 分别启动服务端go run -mod vendor cmd/svr/svr.go -port 50001go run -mod vendor cmd/svr/svr.go -port 50002go run -mod vendor cmd/svr/svr.go -port 50003
# 启动客户端go run -mod vendor cmd/cli/cli.go

# 启动grpc-gateway代理,提供HTTP-RESTful服务go run -mod vendor cmd/gw/gw.gocurl -X POST http://localhost:60001/hello -d '{"name": "fromGW"}'

运行三个服务端,一个客户端。经测试我们发现:

  • 各服务端接收的请求数相等;

  • 关闭一个服务端S1,请求会转到另外两个服务端;

  • 重启S1,请求会重新平均分到S1;

  • 关闭etcd3服务器,客户端与服务端仍正常通信,但新服务端不会注册进来,服务端掉线了也无法摘除掉;

  • 重新启动Etcd3服务器,服务端上下线可自动恢复正常;

  • 关闭所有服务端,客户端请求将被阻塞。

参考:

https://segmentfault.com/a/1190000008672912

https://www.grpc.io/doc

https://mp.weixin.qq.com/s/yWwbbBP-n1BjLwlCd_74RA

欢迎关注公众号才浅coding攻略,本人是热爱技术干货的程序媛,从事游戏及微服务后端开发,分享Go、微服务、云原生及Python、网络及算法等相关内容。日拱一卒。欢迎各位催更扯淡一条龙!

  网络协议 最新文章
使用Easyswoole 搭建简单的Websoket服务
常见的数据通信方式有哪些?
Openssl 1024bit RSA算法---公私钥获取和处
HTTPS协议的密钥交换流程
《小白WEB安全入门》03. 漏洞篇
HttpRunner4.x 安装与使用
2021-07-04
手写RPC学习笔记
K8S高可用版本部署
mySQL计算IP地址范围
上一篇文章      下一篇文章      查看所有文章
加:2022-05-08 08:27:37  更:2022-05-08 08:30:40 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2024年12日历 -2024/12/29 10:37:22-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码
数据统计