前言
在前面的raft学习中,探讨了基于etcd/raft的一些数据结构和raft的日志存储,以及Leader选举算法。随着对raft的使用和了解,本次将带着前面的学习,看看raft 节点的启动流程和一些准备工作,从而在使用raft时能够更加简单地将raft运用到我们的实际工作中。
准备工作
raft本身是一种一致性同步协议,因而需要实体节点来运行协议并提供输入和输出,RawNode是原生的 raft 节点的实体,里面包含了raft协议层和一些状态信息。
RawNode
type RawNode struct {
raft *raft
prevSoftSt *SoftState
prevHardSt pb.HardState
}
因此我们启动一个raft节点的时候会初始化一个RawNode:
func NewRawNode(config *Config) (*RawNode, error) {
r := newRaft(config)
rn := &RawNode{
raft: r,
}
rn.prevSoftSt = r.softState()
rn.prevHardSt = r.hardState()
return rn, nil
}
当初始化好后一个rawNode之后,就可以开启动这个节点,启动方法retcd/raft提供了两种方案:
func StartNode(c *Config, peers []Peer) Node {
if len(peers) == 0 {
panic("no peers given; use RestartNode instead")
}
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
err = rn.Bootstrap(peers)
if err != nil {
c.Logger.Warningf("error occurred during starting a new node: %v", err)
}
n := newNode(rn)
go n.run()
return &n
}
func RestartNode(c *Config) Node {
rn, err := NewRawNode(c)
if err != nil {
panic(err)
}
n := newNode(rn)
go n.run()
return &n
}
除此之外,启动前,还会初始化一个node节点,因为目前还没有输入输出的渠道,在前面的学习中,我们学习过Node接口,这里就不在赘述啦。这个node节点主要是提供一些输入输出的channel以及一些其他的控制变量:
func newNode(rn *RawNode) node {
return node{
propc: make(chan msgWithResult),
recvc: make(chan pb.Message),
confc: make(chan pb.ConfChangeV2),
confstatec: make(chan pb.ConfState),
readyc: make(chan Ready),
advancec: make(chan struct{}),
tickc: make(chan struct{}, 128),
done: make(chan struct{}),
stop: make(chan struct{}),
status: make(chan chan Status),
rn: rn,
}
}
初始化好这node实例,就可以开始启动raft节点了通过raft.run方法,因为需要循环监听应用层的输入以及raft向应用层传递的数据,这里开启一个协程去做这个工作。这种方式充分地应用golang的产的特性。
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() {
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
if lead != r.lead {
if r.hasLeader() {
if lead == None {
r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
} else {
r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
}
propc = n.propc
} else {
r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
propc = nil
}
lead = r.lead
}
select {
case pm := <-propc:
m := pm.m
m.From = r.id
err := r.Step(m)
if pm.result != nil {
pm.result <- err
close(pm.result)
}
case m := <-n.recvc:
if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
r.Step(m)
}
case cc := <-n.confc:
_, okBefore := r.prs.Progress[r.id]
cs := r.applyConfChange(cc)
if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
var found bool
outer:
for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
for _, id := range sl {
if id == r.id {
found = true
break outer
}
}
}
if !found {
propc = nil
}
}
select {
case n.confstatec <- cs:
case <-n.done:
}
case <-n.tickc:
n.rn.Tick()
case readyc <- rd:
n.rn.acceptReady(rd)
advancec = n.advancec
case <-advancec:
n.rn.Advance(rd)
rd = Ready{}
advancec = nil
case c := <-n.status:
c <- getStatus(r)
case <-n.stop:
close(n.done)
return
}
}
}
到这里整个raft算是启动成功,启动的第一件事情就是开始选主。这个就候选慢慢细说啦,每次学习的内容控制在有效的范围内,学习过多也不能吸收,反而失去的效率。下面我们主要来看看第二种方案启动的流程,一般用的比较多的也是这种方式,毕竟一个系统都是需要可以重启,不然保存状态的意义就失去了。
newRaft
RestartNode这个方法其实比较简单,和前面的流程差不多,但是在启动前需要做一些事情。先看看newRaft中做了些什么
func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err)
}
r := &raft{
id: c.ID,
lead: None,
isLearner: false,
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: tracker.MakeProgressTracker(c.MaxInflightMsgs),
electionTimeout: c.ElectionTick,
heartbeatTimeout: c.HeartbeatTick,
logger: c.Logger,
checkQuorum: c.CheckQuorum,
preVote: c.PreVote,
readOnly: newReadOnly(c.ReadOnlyOption),
disableProposalForwarding: c.DisableProposalForwarding,
}
cfg, prs, err := confchange.Restore(confchange.Changer{
Tracker: r.prs,
LastIndex: raftlog.lastIndex(),
}, cs)
if err != nil {
panic(err)
}
assertConfStatesEquivalent(r.logger, cs, r.switchToConfig(cfg, prs))
if !IsEmptyHardState(hs) {
r.loadState(hs)
}
if c.Applied > 0 {
raftlog.appliedTo(c.Applied)
}
r.becomeFollower(r.Term, None)
var nodesStrs []string
for _, n := range r.prs.VoterNodes() {
nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
}
r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
return r
}
我们看看Storage中InitialState接口:
type ConfState struct {
Voters []uint64 `protobuf:"varint,1,rep,name=voters" json:"voters,omitempty"`
Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
VotersOutgoing []uint64 `protobuf:"varint,3,rep,name=voters_outgoing,json=votersOutgoing" json:"voters_outgoing,omitempty"`
LearnersNext []uint64 `protobuf:"varint,4,rep,name=learners_next,json=learnersNext" json:"learners_next,omitempty"`
AutoLeave bool `protobuf:"varint,5,opt,name=auto_leave,json=autoLeave" json:"auto_leave"`
}
func (s *raftStorage) InitialState() (pb.HardState, pb.ConfState, error) {
hs := s.wal.InitialState()
return hs, s.cs, nil
}
通过这个接口可以知道,我们在启动节点之前需要先初始化这个ConfState。这样我们在加载的时候才能够正常的启动,如果peer为空由于无法完成选主。。就会导致启动阻塞或者失败,具体取决于应用层的处理方式。
小结
本次我们顺着etcd/raft的逻辑,学习了一个raft节点的启动流程,配置的加载以及一些初始化工作。主要流程如下:
- 初始化应用层自定义的Storage实体类;
- 调用RestartNode方法;
- 在RestartNode中,实例化一个RawNode,并包装为Node;
- 在NewRawNode中,实例化一个RawNode,并实例化raft,将raft包装为RawNode,返回一个RawNode;
- 这样raft的核心工作就已经准备完成,最终返回一个Node接口,用于获取raft层的数据以及向raft层数据传入数据。
还有很多细节值得探讨,建议对raft感兴趣的朋友可以在自己业余时间,读读源码,整个过程可能艰难因人而异。不过也有很多大佬针对源码做了思路上的解读,可以找来看看。总的来说,学习raft源码,结合raft论文的学习,能够从理论到实践 层面对一致性算法的有更好地理解与把握。除此之外,在学习源码的过程中对Golang的channel机制,会有更深刻的认知。整个raft的消息传递,将channel用得出神入化。好啦,今天的分享就这么多啦!
=-=-=-=-=-=-=-=-=-=-撒花!=-=-=-=-=-=-=-=-=-=-=
|