IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 游戏开发 -> 【raft】学习七:etcd/raft节点启动流程 -> 正文阅读

[游戏开发]【raft】学习七:etcd/raft节点启动流程

前言

在前面的raft学习中,探讨了基于etcd/raft的一些数据结构和raft的日志存储,以及Leader选举算法。随着对raft的使用和了解,本次将带着前面的学习,看看raft 节点的启动流程和一些准备工作,从而在使用raft时能够更加简单地将raft运用到我们的实际工作中。

准备工作

raft本身是一种一致性同步协议,因而需要实体节点来运行协议并提供输入和输出,RawNode是原生的 raft 节点的实体,里面包含了raft协议层和一些状态信息。

RawNode

type RawNode struct {
	raft       *raft
	prevSoftSt *SoftState
	prevHardSt pb.HardState
}

因此我们启动一个raft节点的时候会初始化一个RawNode:

func NewRawNode(config *Config) (*RawNode, error) {
// 初始化一个raft,这个raft主要是实现raft的协议
	r := newRaft(config)
	rn := &RawNode{
		raft: r,
	}
	rn.prevSoftSt = r.softState()
	rn.prevHardSt = r.hardState()
	return rn, nil
}

当初始化好后一个rawNode之后,就可以开启动这个节点,启动方法retcd/raft提供了两种方案:

// 方案一:启动时会检查同辈节点,如果同辈节点为空就会建议采用第二种方案
func StartNode(c *Config, peers []Peer) Node {
	if len(peers) == 0 {
		panic("no peers given; use RestartNode instead")
	}
	// 初始化一个实际节点,里面包含一个raft,实现具体的taft的协议
	rn, err := NewRawNode(c)
	if err != nil {
		panic(err)
	}
	// 在启动前会先加载配置,如果Storage接口方法返回非空这个方法将会返回错误,即初始化启动时Storage接口返回应该为空
	err = rn.Bootstrap(peers)
	if err != nil {
		c.Logger.Warningf("error occurred during starting a new node: %v", err)
	}
	// 包含一个rawNode,同时包含一些channel用于接受传入的消息,通过这个node将消息传入raft
	n := newNode(rn)

	go n.run()
	return &n
}
// 方案二: 是一种重启方案,不会去检查同辈节点,集群成员关系将会从Storage加载,Storage是raft提供给用户自定义持久化数据的接口,换言之我们需要在Storage中保存集群的成员关系
func RestartNode(c *Config) Node {
	rn, err := NewRawNode(c)
	if err != nil {
		panic(err)
	}
	n := newNode(rn)
	go n.run()
	return &n
}

除此之外,启动前,还会初始化一个node节点,因为目前还没有输入输出的渠道,在前面的学习中,我们学习过Node接口,这里就不在赘述啦。这个node节点主要是提供一些输入输出的channel以及一些其他的控制变量:

func newNode(rn *RawNode) node {
	return node{
		propc:      make(chan msgWithResult),
		recvc:      make(chan pb.Message),
		confc:      make(chan pb.ConfChangeV2),
		confstatec: make(chan pb.ConfState),
		readyc:     make(chan Ready),
		advancec:   make(chan struct{}),
		// make tickc a buffered chan, so raft node can buffer some ticks when the node
		// is busy processing raft messages. Raft node will resume process buffered
		// ticks when it becomes idle.
		tickc:  make(chan struct{}, 128),
		done:   make(chan struct{}),
		stop:   make(chan struct{}),
		status: make(chan chan Status),
		rn:     rn,
	}
}

初始化好这node实例,就可以开始启动raft节点了通过raft.run方法,因为需要循环监听应用层的输入以及raft向应用层传递的数据,这里开启一个协程去做这个工作。这种方式充分地应用golang的产的特性。

func (n *node) run() {
	var propc chan msgWithResult
	var readyc chan Ready
	var advancec chan struct{}
	var rd Ready

	r := n.rn.raft

	lead := None

	for {
		if advancec != nil {
			readyc = nil
		} else if n.rn.HasReady() {
			// Populate a Ready. Note that this Ready is not guaranteed to
			// actually be handled. We will arm readyc, but there's no guarantee
			// that we will actually send on it. It's possible that we will
			// service another channel instead, loop around, and then populate
			// the Ready again. We could instead force the previous Ready to be
			// handled first, but it's generally good to emit larger Readys plus
			// it simplifies testing (by emitting less frequently and more
			// predictably).
			// 拿到raft层的消息
			rd = n.rn.readyWithoutAccept()
			readyc = n.readyc
		}
		if lead != r.lead {
			if r.hasLeader() {
				if lead == None {
					r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
				} else {
					r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
				}
				propc = n.propc
			} else {
				r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
				propc = nil
			}
			lead = r.lead
		}

		select {
		// TODO: maybe buffer the config propose if there exists one (the way
		// described in raft dissertation)
		// Currently it is dropped in Step silently.
		case pm := <-propc:
			m := pm.m
			m.From = r.id
			// 通过node将消息传入raft协议层
			err := r.Step(m)
			if pm.result != nil {
				// 将结果返回给node层->具体节点
				pm.result <- err
				close(pm.result)
			}
		case m := <-n.recvc:
			// filter out response message from unknown From.
			if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
				r.Step(m)
			}
		case cc := <-n.confc:
			_, okBefore := r.prs.Progress[r.id]
			cs := r.applyConfChange(cc)
			// If the node was removed, block incoming proposals. Note that we
			// only do this if the node was in the config before. Nodes may be
			// a member of the group without knowing this (when they're catching
			// up on the log and don't have the latest config) and we don't want
			// to block the proposal channel in that case.
			//
			// NB: propc is reset when the leader changes, which, if we learn
			// about it, sort of implies that we got readded, maybe? This isn't
			// very sound and likely has bugs.
			if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
				var found bool
			outer:
				for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
					for _, id := range sl {
						if id == r.id {
							found = true
							break outer
						}
					}
				}
				if !found {
					propc = nil
				}
			}
			select {
			case n.confstatec <- cs:
			case <-n.done:
			}
		case <-n.tickc:
			n.rn.Tick()
		case readyc <- rd:
			// 这里只是负责将消息传给应用层,并不处理消息
			n.rn.acceptReady(rd)

			advancec = n.advancec
		case <-advancec:
			// 交给RawNode处理消息
			n.rn.Advance(rd)
			// 表表示应用已经处理好Ready中的数据,告知raft可以开始接收新的消息
			rd = Ready{}
			advancec = nil
		case c := <-n.status:
			c <- getStatus(r)
		case <-n.stop:
			close(n.done)
			return
		}
	}
}

到这里整个raft算是启动成功,启动的第一件事情就是开始选主。这个就候选慢慢细说啦,每次学习的内容控制在有效的范围内,学习过多也不能吸收,反而失去的效率。下面我们主要来看看第二种方案启动的流程,一般用的比较多的也是这种方式,毕竟一个系统都是需要可以重启,不然保存状态的意义就失去了。

newRaft

RestartNode这个方法其实比较简单,和前面的流程差不多,但是在启动前需要做一些事情。先看看newRaft中做了些什么

func newRaft(c *Config) *raft {
	if err := c.validate(); err != nil {
		panic(err.Error())
	}
	// 实例化raftLog,这个结构我们在学习五中了解了,主要是保存raft的日志数据
	raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
	// 从Storage中加载配置,则需要我们需要初始化好整个持久化的代码
	hs, cs, err := c.Storage.InitialState()
	if err != nil {
		panic(err) // TODO(bdarnell)
	}

	r := &raft{
		id:                        c.ID,
		lead:                      None,
		isLearner:                 false,
		raftLog:                   raftlog,
		maxMsgSize:                c.MaxSizePerMsg,
		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
		prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs),
		electionTimeout:           c.ElectionTick,
		heartbeatTimeout:          c.HeartbeatTick,
		logger:                    c.Logger,
		checkQuorum:               c.CheckQuorum,
		preVote:                   c.PreVote,
		readOnly:                  newReadOnly(c.ReadOnlyOption),
		disableProposalForwarding: c.DisableProposalForwarding,
	}
	// 重新加载配置,
	cfg, prs, err := confchange.Restore(confchange.Changer{
		Tracker:   r.prs,
		LastIndex: raftlog.lastIndex(),
	}, cs)
	if err != nil {
		panic(err)
	}
	assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))

	if !IsEmptyHardState(hs) {
		r.loadState(hs)
	}
	if c.Applied > 0 {
		raftlog.appliedTo(c.Applied)
	}
	// 初始化为follower节点
	r.becomeFollower(r.Term, None)

	var nodesStrs []string
	// 参与投票的节点,其中的节点是从Storage中加载上来的
	for _, n := range r.prs.VoterNodes() {
		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
	}

	r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
		r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
	return r
}

我们看看Storage中InitialState接口:

// raft 状态配置
type ConfState struct {
	// The voters in the incoming config. (If the configuration is not joint,
	// then the outgoing config is empty).
	Voters []uint64 `protobuf:"varint,1,rep,name=voters" json:"voters,omitempty"`
	// The learners in the incoming config.
	Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
	// The voters in the outgoing config.
	VotersOutgoing []uint64 `protobuf:"varint,3,rep,name=voters_outgoing,json=votersOutgoing" json:"voters_outgoing,omitempty"`
	// The nodes that will become learners when the outgoing config is removed.
	// These nodes are necessarily currently in nodes_joint (or they would have
	// been added to the incoming config right away).
	LearnersNext []uint64 `protobuf:"varint,4,rep,name=learners_next,json=learnersNext" json:"learners_next,omitempty"`
	// If set, the config is joint and Raft will automatically transition into
	// the final config (i.e. remove the outgoing config) when this is safe.
	AutoLeave bool `protobuf:"varint,5,opt,name=auto_leave,json=autoLeave" json:"auto_leave"`
}
func (s *raftStorage) InitialState() (pb.HardState, pb.ConfState, error) {
	hs := s.wal.InitialState()
	return hs, s.cs, nil
}

通过这个接口可以知道,我们在启动节点之前需要先初始化这个ConfState。这样我们在加载的时候才能够正常的启动,如果peer为空由于无法完成选主。。就会导致启动阻塞或者失败,具体取决于应用层的处理方式。

小结

本次我们顺着etcd/raft的逻辑,学习了一个raft节点的启动流程,配置的加载以及一些初始化工作。主要流程如下:

  1. 初始化应用层自定义的Storage实体类;
  2. 调用RestartNode方法;
  3. 在RestartNode中,实例化一个RawNode,并包装为Node;
  4. 在NewRawNode中,实例化一个RawNode,并实例化raft,将raft包装为RawNode,返回一个RawNode;
  5. 这样raft的核心工作就已经准备完成,最终返回一个Node接口,用于获取raft层的数据以及向raft层数据传入数据。

还有很多细节值得探讨,建议对raft感兴趣的朋友可以在自己业余时间,读读源码,整个过程可能艰难因人而异。不过也有很多大佬针对源码做了思路上的解读,可以找来看看。总的来说,学习raft源码,结合raft论文的学习,能够从理论到实践 层面对一致性算法的有更好地理解与把握。除此之外,在学习源码的过程中对Golang的channel机制,会有更深刻的认知。整个raft的消息传递,将channel用得出神入化。好啦,今天的分享就这么多啦!

=-=-=-=-=-=-=-=-=-=-撒花!=-=-=-=-=-=-=-=-=-=-=

  游戏开发 最新文章
6、英飞凌-AURIX-TC3XX: PWM实验之使用 GT
泛型自动装箱
CubeMax添加Rtthread操作系统 组件STM32F10
python多线程编程:如何优雅地关闭线程
数据类型隐式转换导致的阻塞
WebAPi实现多文件上传,并附带参数
from origin ‘null‘ has been blocked by
UE4 蓝图调用C++函数(附带项目工程)
Unity学习笔记(一)结构体的简单理解与应用
【Memory As a Programming Concept in C a
上一篇文章      下一篇文章      查看所有文章
加:2022-04-18 18:15:32  更:2022-04-18 18:17:07 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/16 20:52:45-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码