?
微服务架构中的服务注册与发现
在微服务架构中,多个微服务间的通信需要依赖服务注册与发现组件获取指定服务实例的地址信息,才能正确地发起 RPC 调用,保证分布式系统的高可用、高并发。服务注册与发现主要包含两部分:服务注册的功能与服务发现的功能。服务注册是指服务实例启动时将自身信息注册到服务注册与发现中心,并在运行时通过心跳等方式向其汇报自身服务状态;服务发现是指服务实例向服务注册与发现中心获取其他服务实例信息,用于远程调用。
下面我们来看服务注册与发现中心的职责和服务实例进行服务注册的基本流程,以及分布式系统中数据同步的基本原理 CAP。
服务注册与发现中心有什么功能?
在传统单体应用中,应用部署在固定的物理机器或云平台上,一般通过固定在代码内部或者配置文件的服务地址和端口直接发起调用。由于应用数量较少,系统结构复杂度不高,开发人员和运维人员可以较为轻松地进行管理和配置。
随着应用架构向微服务架构迁移,服务数量的增加和动态部署、动态扩展的特性,使得服务地址和端口在运行时随时可变。对此,我们需要一个额外的中心化组件统一管理动态部署的微服务应用的服务实例元数据,一般称它为服务注册与发现中心。服务注册与发现中心主要有以下职责:
-
管理当前注册到服务注册与发现中心的微服务实例元数据信息,包括服务实例的服务名、IP 地址、端口号、服务描述和服务状态等; -
与注册到服务发现与注册中心的微服务实例维持心跳,定期检查注册表中的服务实例是否在线,并剔除无效服务实例信息; -
提供服务发现能力,为服务调用方提供服务提供方的服务实例元数据。
通过服务发现与注册中心,我们可以很方便地管理系统中动态变化的服务实例信息。但是与此同时,它也可能成为系统的瓶颈和故障点。因为服务之间的调用信息来自服务注册与发现中心,当它不可用时,服务之间的调用也就无法正常进行。因此服务发现与注册中心一般会集群化部署,提供高可用性和高稳定性。
除了具体的功能,那么有没有理论指导如何实现微服务架构中的服务注册与发现呢?这就要提到分布式中著名的 CAP 理论了。
分布式中的 CAP 理论
在本质上来讲,微服务应用属于分布式系统的一种落地实践,而分布式系统最大的难点是处理各个节点之间数据状态的一致性。即使是无状态的 HTTP RESTful API 请求,在处理多服务实例场景下修改数据状态的请求,也需要通过数据库或者分布式缓存等外部系统维护数据的一致性。我们要讲到的 CAP 原理就是描述分布式系统下节点数据同步的基本定理。
CAP 原理由加州大学的 Eric Brewer 教授提出,分别指Consistency(一致性)、Availablity (可用性)、Partition tolerance(分区容忍性)。Eric Brewer 认为,以上三个指标最多只能同时满足两个。
-
Consistency,指数据一致性,表示一个系统的数据信息(包括备份数据)在同一时刻都是一致的。在分布式系统下,同一份数据可能存在于多个不同的实例中,在数据强一致性的要求下,修改其中一份数据后必须同步到它的所有备份中。在数据同步的任何时候,都需要保证所有对该份数据的请求将返回同样的状态。 -
Availablity,指服务可用性,要求服务在接收到客户端请求后,都能够给出响应。服务可用性考量的是系统的可用性,要求系统在高并发情况和部分节点宕机的情况下,系统整体依然能够响应客户端的请求。 -
Partition tolerance,指分区容忍性。在分布式系统中,不同节点之间通过网络进行通信。由于网络的不可靠性,位于不同网络分区的服务节点可能会通信失败,如果系统能够容忍这种情况,说明它是满足分区容忍性的。如果系统不能满足,将会限制分布式系统的扩展性,即服务节点的部署数量和地区都会受限,也就违背了分布式系统设计的初衷,所以一般来说分布式系统都会满足分区容忍性。
但是在满足了分区容忍性的前提下,分布式系统并不能同时满足数据一致性和服务可用性。假设服务 A 现在有两个实例 A1 和 A2,它们之间的网络通信出现了异常,基于分区容忍性,这并不会影响 A1 和 A2 独立的正常运行。假如此时客户端请求 A1,请求将数据 B 从 B1 状态修改为 B2,由于网络的不可用,数据 B 的修改并不能通知到实例 A2。
如果此时另一个客户端向 A2 请求数据 B,如果 A2 返回数据 B1,将满足服务可用性,但并不能满足数据一致性;如果 A2 等待 A1 的通知后才返回数据 B 的正确状态,虽然满足了数据一致性,但并不能响应客户端请求,违背了服务可用性的指标。
基于分布式系统的基本特质,必须要满足 P,接下来我们需要考虑满足 C 还是 A。根据 CAP 理论,etcd 在满足网络分区可用性的基础上,优先满足了一致性 C。但是当 etcd 出现异常时,无法向其写入新数据。
原生实现服务注册与发现
了解了服务注册与发现的相关概念,下面我们基于 etcd 原生的 clientv3 API 实现服务的注册,对外提供服务。其他服务调用该服务时,则通过服务名发现对应的服务实例,随后发起调用。简单示例的服务架构图如下所示:
Gateway 作为调用服务,user-service 作为被调用服务,所有的服务都注册到 etcd。Gateway 发起调用时,首先请求 etcd 获取其对应的服务器地址和端口,各个服务通过 lease 租约机制与 etcd 保持心跳,通过 watch 机制监测注册到 etcd 上的服务实例变化。
user-service 的实现
首先实现的是 user-service,user-service 将实例信息注册到 etcd,包括 host、port 等信息。我们暂且注册 host 地址与 port,注册到 etcd 之后,user-service 定期续租服务实例信息,相当于保持心跳。续租的频率可以控制,因为频繁的续租请求会造成通信资源的占用。
package main
import?(
"context"
"go.etcd.io/etcd/clientv3"
"log"
"time"
)
// 创建租约注册服务
type ServiceReg struct {
client *clientv3.Client
lease clientv3.Lease
leaseResp *clientv3.LeaseGrantResponse
canclefunc func()
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string
}
func NewServiceReg(addr []string, timeNum int64) (*ServiceReg, error) {
conf := clientv3.Config{
Endpoints: addr,
DialTimeout: 5 * time.Second,
}
var (
client *clientv3.Client
)
if clientTem, err := clientv3.New(conf); err == nil {
client = clientTem
} else {
return nil, err
}
ser := &ServiceReg{
client: client,
}
if err := ser.setLease(timeNum); err != nil {
return nil, err
}
go ser.ListenLeaseRespChan()
return ser, nil
}
func main() {
ser, _ := NewServiceReg([]string{"localhost:2379"}, 5)
defer ser.RevokeLease()
if err := ser.PutService("/user", "http://localhost:8080"); err != nil {
log.Panic(err)
}
// 阻塞,持续运行
select {}
}
上述代码首先初始化 etcd 连接,通过 ServiceReg 创建租约注册服务。随后设置 Lease、续租,将服务的注册绑定到创建好的 Lease 上面。主函数入口中增加了 select 阻塞,模拟服务的持续运行。
下面我们具体来看创建租约和设置续租的实现,代码如下所示:
// 设置租约
func (this *ServiceReg) setLease(timeNum int64) error {
lease := clientv3.NewLease(this.client)
// 设置租约时间
leaseResp, err := lease.Grant(context.TODO(), timeNum)
if err != nil {
return err
}
// 设置续租
ctx, cancelFunc := context.WithCancel(context.TODO())
leaseRespChan, err := lease.KeepAlive(ctx, leaseResp.ID)
if err != nil {
return err
}
this.lease = lease
this.leaseResp = leaseResp
this.canclefunc = cancelFunc
this.keepAliveChan = leaseRespChan
return nil
}
// 监听续租情况
func (this *ServiceReg) ListenLeaseRespChan() {
for {
select {
case leaseKeepResp := <-this.keepAliveChan:
if leaseKeepResp == nil {
log.Println("已经关闭续租功能\n")
return
} else {
log.Println("续租成功\n")
}
}
}
}
// 通过租约注册服务
func (this *ServiceReg) PutService(key, val string) error {
kv := clientv3.NewKV(this.client)
log.Printf("register user server for %s\n", val)
_, err := kv.Put(context.TODO(), key, val, clientv3.WithLease(this.leaseResp.ID))
return err
}
// 撤销租约
func (this *ServiceReg) RevokeLease() error {
this.canclefunc()
time.Sleep(2 * time.Second)
_, err := this.lease.Revoke(context.TODO(), this.leaseResp.ID)
return err
}
上面的实现中,设置键值对的租期是 5 秒,即服务心跳的 TTL。为了过期之后 user-service 依然能够被其他服务正确调用,我们需要定期续租。其实这也是一种保持心跳的形式,通过单独开启协程进行续租,keepAliveChan 用于接收续租的结果。当服务关闭,调用RevokeLease ,释放租约。
客户端调用
我们接着调用 user-service 的客户端,客户端将从 etcd 获取 user 服务的实例信息,并监听 etcd 中 user 服务实例的变更。
package main
import (
"context"
"github.com/coreos/etcd/mvcc/mvccpb"
"go.etcd.io/etcd/clientv3"
"log"
"sync"
"time"
)
// 客户端连接的结构体
type ClientInfo struct {
client *clientv3.Client
serverList map[string]string
lock sync.Mutex
}
// 初始化 etcd 客户端连接
func NewClientInfo(addr []string) (*ClientInfo, error) {
conf := clientv3.Config{
Endpoints: addr,
DialTimeout: 5 * time.Second,
}
if client, err := clientv3.New(conf); err == nil {
return &ClientInfo{
client: client,
serverList: make(map[string]string),
}, nil
} else {
return nil, err
}
}
// 获取服务实例信息
func (this *ClientInfo) GetService(prefix string) ([]string, error) {
if addrs, err := this.getServiceByName(prefix); err != nil {
panic(err)
} else {
log.Println("get service ", prefix, " for instance list: ", addrs)
go this.watcher(prefix)
return addrs, nil
}
}
// 监控指定键值对的变更
func (this *ClientInfo) watcher(prefix string) {
rch := this.client.Watch(context.Background(), prefix, clientv3.WithPrefix())
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: // 写入的事件
this.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: // 删除的事件
this.DelServiceList(string(ev.Kv.Key))
}
}
}
}
func main() {
cli, _ := NewClientInfo([]string{"localhost:2379"})
cli.GetService("/user")
// select 阻塞,持续运行
select {}
}
上述实现主要包括以下几个步骤:
总的来说,先创建 etcd 连接,构建 ClientInfo 对象,随后获取指定的服务user 实例信息;最后监测 user 服务实例的变更事件,根据不同的事件产生不同的行为。
如下代码是客户端实现涉及的主要函数:
// 根据服务名,获取服务实例信息
func (this *ClientInfo) getServiceByName(prefix string) ([]string, error) {
resp, err := this.client.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return nil, err
}
addrs := this.extractAddrs(resp)
return addrs, nil
}
// 根据 etcd 的响应,提取服务实例的数组
func (this *ClientInfo) extractAddrs(resp *clientv3.GetResponse) []string {
addrs := make([]string, 0)
if resp == nil || resp.Kvs == nil {
return addrs
}
for i := range resp.Kvs {
if v := resp.Kvs[i].Value; v != nil {
this.SetServiceList(string(resp.Kvs[i].Key), string(resp.Kvs[i].Value))
addrs = append(addrs, string(v))
}
}
return addrs
}
// 设置 serverList
func (this *ClientInfo) SetServiceList(key, val string) {
this.lock.Lock()
defer this.lock.Unlock()
this.serverList[key] = string(val)
log.Println("set data key :", key, "val:", val)
}
// 删除本地缓存的服务实例信息
func (this *ClientInfo) DelServiceList(key string) {
this.lock.Lock()
defer this.lock.Unlock()
delete(this.serverList, key)
log.Println("del data key:", key)
if newRes, err := this.getServiceByName(key); err != nil {
log.Panic(err)
} else {
log.Println("get key ", key, " current val is: ",newRes)
}
}
// 工具方法,转换数组
func (this *ClientInfo) SerList2Array() []string {
this.lock.Lock()
defer this.lock.Unlock()
addrs := make([]string, 0)
for _, v := range this.serverList {
addrs = append(addrs, v)
}
return addrs
}
客户端本地有保存服务实例的数组:serverList,获取到 user 的服务实例信息后,将数据保存到 serverList 中,客户端会监控 user 的服务实例变更事件,并相应调整自身保存的 serverList。
?
运行结果
我们依次运行 user 服务和调用的客户端,结果如下所示:
// 服务端控制台输出
2021-03-14 13:08:13.913059 I | register user server for http://localhost:8080
2021-03-14 13:08:13.932964 I | 续租成功
...
// client 控制台输出
2021-03-14 18:25:37.462231 I | set data key : /user val: http://localhost:8080
2021-03-14 18:25:37.462266 I | get service /user for instance list: [http://localhost:8080]
可以看到,服务端控制台在持续输出续租的内容,客户端启动后监测到服务端的 put 事件,并成功获取到/user 的服务实例信息:http://localhost:8080 。user 服务关闭,控制台有如下的输出:
// user 服务关闭之后,client 控制台输出
2021-03-14 18:25:47.509583 I | del data key: /user
2021-03-14 18:25:47.522095 I | get key /user current val is: []
user 服务关闭后,服务实例信息从 etcd 删除。再次获取指定的服务名,返回空的信息,符合预期。
|