• raft在etcd中的实现


    Raft概述

    raft是一个在分布式系统中维护状态一致的协议

    • 动画演示:http://thesecretlivesofdata.com/raft/
    • 论文:https://raft.github.io/raft.pdf
    • lecture:https://github.com/maemual/raft-zh_cn

    源码阅读(latest)

    ➜  raft git:(master) tree --dirsfirst -L 2 -I '*test*' -P '*.go'
    .
    ├── confchange
    │   ├── confchange.go
    │   └── restore.go
    ├── quorum
    │   ├── joint.go
    │   ├── majority.go
    │   ├── quorum.go
    │   └── voteresult_string.go
    ├── raftpb
    │   ├── confchange.go
    │   ├── confstate.go
    │   └── raft.pb.go
    ├── tracker
    │   ├── inflights.go
    │   ├── progress.go
    │   ├── state.go
    │   └── tracker.go
    ├── bootstrap.go
    ├── doc.go
    ├── log.go
    ├── log_unstable.go
    ├── logger.go
    ├── node.go
    ├── raft.go
    ├── rawnode.go
    ├── read_only.go
    ├── status.go
    ├── storage.go
    └── util.go
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    raftpb

    Raft中的序列化是借助于Protocol Buffer来实现的,这个文件夹就定义了需要序列化的几个数据结构,从EntryMessage开始看起

    Entry

    type Entry struct {
    	Term  uint64    `protobuf:"varint,2,opt,name=Term" json:"Term"`
    	Index uint64    `protobuf:"varint,3,opt,name=Index" json:"Index"`
    	Type  EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
    	Data  []byte    `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • Term:选举任期,每次选举之后递增1。它的主要作用是标记信息的时效性,比方说当一个节点发出来的消息中携带的term是2,而另一个节点携带的term是3,那我们就认为第一个节点的信息过时了。
    • Index:当前这个entry在整个raft日志中的位置索引。有了TermIndex之后,一个log entry就能被唯一标识。
    • Type:当前entry的类型,目前etcd支持两种类型:EntryNormalEntryConfChange,EntryNormal代表当前Entry是对状态机的操作,EntryConfChange则代表对当前集群配置进行更改的操作,比如增加或者减少节点。
    • Data:一个被序列化后的byte数组,代表当前entry真正要执行的操作,比方说如果上面的TypeEntryNormal,那这里的Data就可能是具体要更改的key-value pair,如果TypeEntryConfChange,那Data就是具体的配置更改项ConfChange。raft算法本身并不关心这个数据是什么,它只是把这段数据当做log同步过程中的payload来处理,具体对这个数据的解析则有上层应用来完成。

    Message

    type Message struct {
    	Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
    	To   uint64      `protobuf:"varint,2,opt,name=to" json:"to"`
    	From uint64      `protobuf:"varint,3,opt,name=from" json:"from"`
    	Term uint64      `protobuf:"varint,4,opt,name=term" json:"term"`
    	// logTerm is generally used for appending Raft logs to followers. For example,
    	// (type=MsgApp,index=100,logTerm=5) means leader appends entries starting at
    	// index=101, and the term of entry at index 100 is 5.
    	// (type=MsgAppResp,reject=true,index=100,logTerm=5) means follower rejects some
    	// entries from its leader as it already has an entry with term 5 at index 100.
    	LogTerm    uint64   `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
    	Index      uint64   `protobuf:"varint,6,opt,name=index" json:"index"`
    	Entries    []Entry  `protobuf:"bytes,7,rep,name=entries" json:"entries"`
    	Commit     uint64   `protobuf:"varint,8,opt,name=commit" json:"commit"`
    	Snapshot   Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
    	Reject     bool     `protobuf:"varint,10,opt,name=reject" json:"reject"`
    	RejectHint uint64   `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
    	Context    []byte   `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • Type:当前传递的消息类型,它的取值有

      很多个

      ,但大致可以分成两类:

      1. Raft 协议相关的,包括心跳MsgHeartbeat、日志MsgApp、投票消息MsgVote等。
      2. 上层应用触发的(没错,上层应用并不是通过api与raft库交互的,而是通过发消息),比如应用对数据更改的消息MsgProp(osal)。

    不同类型的消息会用到下面不同的字段:

    • To, From分别代表了这个消息的接受者和发送者。
    • Term:这个消息发出时整个集群所处的任期。
    • LogTerm:消息发出者所保存的日志中最后一条的任期号,一般MsgVote会用到这个字段。
    • Index:日志索引号。如果当前消息是MsgVote的话,代表这个candidate最后一条日志的索引号,它跟上面的LogTerm一起代表这个candidate所拥有的最新日志信息,这样别人就可以比较自己的日志是不是比candidata的日志要新,从而决定是否投票。
    • Entries:需要存储的日志。
    • Commit:已经提交的日志的索引值,用来向别人同步日志的提交信息。
    • Snapshot:一般跟MsgSnap合用,用来放置具体的Snapshot值。
    • Reject,RejectHint:代表对方节点拒绝了当前节点的请求(MsgVote/MsgApp/MsgSnap…)

    log_unstable.go

    unstable数据结构用于还没有被用户层持久化的数据,它维护了两部分内容snapshotentries

    // unstable.entries[i] has raft log position i+unstable.offset.
    // Note that unstable.offset may be less than the highest log
    // position in storage; this means that the next write to storage
    // might need to truncate the log before persisting unstable.entries.
    type unstable struct {
    	// the incoming unstable snapshot, if any.
    	snapshot *pb.Snapshot
    	// all entries that have not yet been written to storage.
    	entries []pb.Entry
    	offset  uint64
    
    	logger Logger
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    entries代表的是要进行操作的日志,但日志不可能无限增长,在特定的情况下,某些过期的日志会被清空。那这就引入一个新问题了,如果此后一个新的follower加入,而leader只有一部分操作日志,那这个新follower不是没法跟别人同步了吗?所以这个时候snapshot就登场了 - 我无法给你之前的日志,但我给你所有之前日志应用后的结果,之后的日志你再以这个snapshot为基础进行应用,那我们的状态就可以同步了。因此它们的结构关系可以用下图表示:

    在这里插入图片描述

    这里的前半部分是快照数据,而后半部分是日志条目组成的数组entries,另外unstable.offset成员保存的是entries数组中的第一条数据在raft日志中的索引,即第i条entries在raft日志中的索引为i + unstable.offset

    storage.go

    这个文件定义了一个Storage接口,因为etcd中的raft实现并不负责数据的持久化,所以它希望上面的应用层能实现这个接口,以便提供给它查询log的能力。

    另外,这个文件也提供了Storage接口的一个内存版本的实现MemoryStorage,这个实现同样也维护了snapshotentries这两部分,他们的排列跟unstable中的类似,也是snapshot在前,entries在后。从代码中看来etcdserverraftexample都是直接用的这个实现来提供log的查询功能的

    log.go

    有了以上的介绍unstable、Storage的准备之后,下面可以来介绍raftLog的实现,这个结构体承担了raft日志相关的操作。

    raftLog由以下成员组成:

    • storage Storage:前面提到的存放已经持久化数据的Storage接口。
    • unstable unstable:前面分析过的unstable结构体,用于保存应用层还没有持久化的数据。
    • committed uint64:保存当前提交的日志数据索引。
    • applied uint64:保存当前传入状态机的数据最高索引。

    需要说明的是,一条日志数据,首先需要被提交(committed)成功,然后才能被应用(applied)到状态机中。因此,以下不等式一直成立:applied <= committed

    raftLog结构体中,几部分数据的排列如下图所示:

    在这里插入图片描述

    这个数据排布的情况,可以从raftLog的初始化函数中看出来:

    // newLogWithSize returns a log using the given storage and max
    // message size.
    func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
    	if storage == nil {
    		log.Panic("storage must not be nil")
    	}
    	log := &raftLog{
    		storage:         storage,
    		logger:          logger,
    		maxNextEntsSize: maxNextEntsSize,
    	}
    	firstIndex, err := storage.FirstIndex()
    	if err != nil {
    		panic(err) // TODO(bdarnell)
    	}
    	lastIndex, err := storage.LastIndex()
    	if err != nil {
    		panic(err) // TODO(bdarnell)
    	}
    	log.unstable.offset = lastIndex + 1
    	log.unstable.logger = logger
    	// Initialize our committed and applied pointers to the time of the last compaction.
    	log.committed = firstIndex - 1
    	log.applied = firstIndex - 1
    
    	return log
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    因此,从这里的代码可以看出,raftLog的两部分,持久化存储和非持久化存储,它们之间的分界线就是lastIndex,在此之前都是Storage管理的已经持久化的数据,而在此之后都是unstable管理的还没有持久化的数据。

    以上分析中还有一个疑问,为什么并没有初始化unstable.snapshot成员,也就是unstable结构体的快照数据?原因在于,上面这个是初始化函数,也就是节点刚启动的时候调用来初始化存储状态的函数,而unstable.snapshot数据,是在启动之后同步数据的过程中,如果需要同步快照数据时才会去进行赋值修改的数据,因此在这里并没有对它进行操作的地方。

    type Progress struct {
    	Match, Next uint64
    	// State defines how the leader should interact with the follower.
    	//
    	// When in StateProbe, leader sends at most one replication message
    	// per heartbeat interval. It also probes actual progress of the follower.
    	//
    	// When in StateReplicate, leader optimistically increases next
    	// to the latest entry sent after sending replication message. This is
    	// an optimized state for fast replicating log entries to the follower.
    	//
    	// When in StateSnapshot, leader should have sent out snapshot
    	// before and stops sending any replication message.
    	State StateType
    
    	// PendingSnapshot is used in StateSnapshot.
    	// If there is a pending snapshot, the pendingSnapshot will be set to the
    	// index of the snapshot. If pendingSnapshot is set, the replication process of
    	// this Progress will be paused. raft will not resend snapshot until the pending one
    	// is reported to be failed.
    	PendingSnapshot uint64
    
    	// RecentActive is true if the progress is recently active. Receiving any messages
    	// from the corresponding follower indicates the progress is active.
    	// RecentActive can be reset to false after an election timeout.
    	//
    	// TODO(tbg): the leader should always have this set to true.
    	RecentActive bool
    
    	// ProbeSent is used while this follower is in StateProbe. When ProbeSent is
    	// true, raft should pause sending replication message to this peer until
    	// ProbeSent is reset. See ProbeAcked() and IsPaused().
    	ProbeSent bool
    
    	// Inflights is a sliding window for the inflight messages.
    	// Each inflight message contains one or more log entries.
    	// The max number of entries per message is defined in raft config as MaxSizePerMsg.
    	// Thus inflight effectively limits both the number of inflight messages
    	// and the bandwidth each Progress can use.
    	// When inflights is Full, no more message should be sent.
    	// When a leader sends out a message, the index of the last
    	// entry should be added to inflights. The index MUST be added
    	// into inflights in order.
    	// When a leader receives a reply, the previous inflights should
    	// be freed by calling inflights.FreeLE with the index of the last
    	// received entry.
    	Inflights *Inflights
    
    	// IsLearner is true if this progress is tracked for a learner.
    	IsLearner bool
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    1. 用来保存当前follower节点的日志状态的属性:

      • Match:保存目前为止,已复制给该follower的日志的最高索引值。如果leader对该follower上的日志情况一无所知的话,这个值被设为0。
      • Next:保存下一次leader发送append消息给该follower的日志索引,即下一次复制日志时,leader会从Next开始发送日志。

      在正常情况下,Next = Match + 1,也就是下一个要同步的日志应当是对方已有日志的下一条。

    2. State属性用来保存该节点当前的同步状态,它会有一下几种取值3

      • ProgressStateProbe

      探测状态,当follower拒绝了最近的append消息时,那么就会进入探测状态,此时leader会试图继续往前追溯该follower的日志从哪里开始丢失的。在probe状态时,leader每次最多append一条日志,如果收到的回应中带有RejectHint信息,则回退Next索引,以便下次重试。在初始时,leader会把所有follower的状态设为probe,因为它并不知道各个follower的同步状态,所以需要慢慢试探。

      • ProgressStateReplicate

      当leader确认某个follower的同步状态后,它就会把这个follower的state切换到这个状态,并且用pipeline的方式快速复制日志。leader在发送复制消息之后,就修改该节点的Next索引为发送消息的最大索引+1。

      • ProgressStateSnapshot

      接收快照状态。当leader向某个follower发送append消息,试图让该follower状态跟上leader时,发现此时leader上保存的索引数据已经对不上了,比如leader在index为10之前的数据都已经写入快照中了,但是该follower需要的是10之前的数据,此时就会切换到该状态下,发送快照给该follower。当快照数据同步追上之后,并不是直接切换到Replicate状态,而是首先切换到Probe状态。

    3. ins属性用来做流量控制,因为如果同步请求非常多,再碰上网络分区时,leader可能会累积很多待发送消息,一旦网络恢复,可能会有非常大流量发送给follower,所以这里要做flow control。它的实现有点类似TCP的滑动窗口,这里不再赘述。

    综上,Progress其实也是个状态机,下面是它的状态转移图

    在这里插入图片描述

    node.go

    node的主要作用是应用层(etcdserver)和共识模块(raft)的衔接。将应用层的消息传递给底层共识模块,并将底层共识模块共识后的结果反馈给应用层。所以它的初始化函数创建了很多用来通信的channel,然后就在另一个goroutine里面开始了事件循环,不停的在各种channel中倒腾数据(貌似这种由for-select-channel组成的事件循环在Go里面很受欢迎)

    	for {
    		if advancec != nil {
    			readyc = nil
    		} else if n.rn.HasReady() {
    			// Populate a Ready. Note that this Ready is not guaranteed to
    			// actually be handled. We will arm readyc, but there's no guarantee
    			// that we will actually send on it. It's possible that we will
    			// service another channel instead, loop around, and then populate
    			// the Ready again. We could instead force the previous Ready to be
    			// handled first, but it's generally good to emit larger Readys plus
    			// it simplifies testing (by emitting less frequently and more
    			// predictably).
    			rd = n.rn.readyWithoutAccept()
    			readyc = n.readyc
    		}
    
    		if lead != r.lead {
    			if r.hasLeader() {
    				if lead == None {
    					r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
    				} else {
    					r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
    				}
    				propc = n.propc
    			} else {
    				r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
    				propc = nil
    			}
    			lead = r.lead
    		}
    
    		select {
    		// TODO: maybe buffer the config propose if there exists one (the way
    		// described in raft dissertation)
    		// Currently it is dropped in Step silently.
    		case pm := <-propc:
    			m := pm.m
    			m.From = r.id
    			err := r.Step(m)
    			if pm.result != nil {
    				pm.result <- err
    				close(pm.result)
    			}
    		case m := <-n.recvc:
    			// filter out response message from unknown From.
    			if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
    				r.Step(m)
    			}
    		case cc := <-n.confc:
    			_, okBefore := r.prs.Progress[r.id]
    			cs := r.applyConfChange(cc)
    			// If the node was removed, block incoming proposals. Note that we
    			// only do this if the node was in the config before. Nodes may be
    			// a member of the group without knowing this (when they're catching
    			// up on the log and don't have the latest config) and we don't want
    			// to block the proposal channel in that case.
    			//
    			// NB: propc is reset when the leader changes, which, if we learn
    			// about it, sort of implies that we got readded, maybe? This isn't
    			// very sound and likely has bugs.
    			if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
    				var found bool
    			outer:
    				for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
    					for _, id := range sl {
    						if id == r.id {
    							found = true
    							break outer
    						}
    					}
    				}
    				if !found {
    					propc = nil
    				}
    			}
    			select {
    			case n.confstatec <- cs:
    			case <-n.done:
    			}
    		case <-n.tickc:
    			n.rn.Tick()
    		case readyc <- rd:
    			n.rn.acceptReady(rd)
    			advancec = n.advancec
    		case <-advancec:
    			n.rn.Advance(rd)
    			rd = Ready{}
    			advancec = nil
    		case c := <-n.status:
    			c <- getStatus(r)
    		case <-n.stop:
    			close(n.done)
    			return
    		}
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    propcrecvc中拿到的是从上层应用传进来的消息,这个消息会被交给raft层的Step函数处理,具体处理逻辑我上面有过介绍。

    下面来解释下readyc的作用。在etcd的这个实现中,node并不负责数据的持久化、网络消息的通信、以及将已经提交的log应用到状态机中,所以node使用readyc这个channel对外通知有数据要处理了,并将这些需要外部处理的数据打包到一个Ready结构体中:

    // Ready encapsulates the entries and messages that are ready to read,
    // be saved to stable storage, committed or sent to other peers.
    // All fields in Ready are read-only.
    type Ready struct {
    	// The current volatile state of a Node.
    	// SoftState will be nil if there is no update.
    	// It is not required to consume or store SoftState.
    	*SoftState
    
    	// The current state of a Node to be saved to stable storage BEFORE
    	// Messages are sent.
    	// HardState will be equal to empty state if there is no update.
    	pb.HardState
    
    	// ReadStates can be used for node to serve linearizable read requests locally
    	// when its applied index is greater than the index in ReadState.
    	// Note that the readState will be returned when raft receives msgReadIndex.
    	// The returned is only valid for the request that requested to read.
    	ReadStates []ReadState
    
    	// Entries specifies entries to be saved to stable storage BEFORE
    	// Messages are sent.
    	Entries []pb.Entry
    
    	// Snapshot specifies the snapshot to be saved to stable storage.
    	Snapshot pb.Snapshot
    
    	// CommittedEntries specifies entries to be committed to a
    	// store/state-machine. These have previously been committed to stable
    	// store.
    	CommittedEntries []pb.Entry
    
    	// Messages specifies outbound messages to be sent AFTER Entries are
    	// committed to stable storage.
    	// If it contains a MsgSnap message, the application MUST report back to raft
    	// when the snapshot has been received or has failed by calling ReportSnapshot.
    	Messages []pb.Message
    
    	// MustSync indicates whether the HardState and Entries must be synchronously
    	// written to disk or if an asynchronous write is permissible.
    	MustSync bool
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    应用程序得到这个Ready之后,需要:

    1. 将HardState, Entries, Snapshot持久化到storage。
    2. 将Messages广播给其他节点。
    3. 将CommittedEntries(已经commit还没有apply)应用到状态机。
    4. 如果发现CommittedEntries中有成员变更类型的entry,调用node.ApplyConfChange()方法让node知道。
    5. 最后再调用node.Advance()告诉raft,这批状态更新处理完了,状态已经演进了,可以给我下一批Ready让我处理。

    Life of a Request

    前面我们把整个包的结构过了一遍,下面来结合具体的代码看看raft对一个请求的处理过程是怎样的。我一直觉得,如果能从代码的层面追踪到一个请求的处理过程,那无论是从宏观还是微观的角度,对理解整个系统都是非常有帮助的。

    • 首先,在node的大循环里,有一个会定时输出的tick channel,它来触发raft.tick()函数,根据上面的介绍可知,如果当前节点是follower,那它的tick函数会指向tickElectiontickElection的处理逻辑是给自己发送一个MsgHup的内部消息,Step函数看到这个消息后会调用campaign函数,进入竞选状态。

      // tickElection is run by followers and candidates after r.electionTimeout.
      func (r *raft) tickElection() {
      	r.electionElapsed++
      
      	if r.promotable() && r.pastElectionTimeout() {
      		r.electionElapsed = 0
      		r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
      	}
      }
      func (r *raft) Step(m pb.Message) error {
          //...
          switch m.Type {
          case pb.MsgHup:
              r.campaign(campaignElection)
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
    • campaign则会调用becomeCandidate把自己切换到candidate模式,并递增Term值。然后再将自己的Term及日志信息发送给其他的节点,请求投票。

      // campaign transitions the raft instance to candidate state. This must only be
      // called after verifying that this is a legitimate transition.
      func (r *raft) campaign(t CampaignType) {
      	if !r.promotable() {
      		// This path should not be hit (callers are supposed to check), but
      		// better safe than sorry.
      		r.logger.Warningf("%x is unpromotable; campaign() should have been called", r.id)
      	}
      	var term uint64
      	var voteMsg pb.MessageType
      	if t == campaignPreElection {
      		r.becomePreCandidate()
      		voteMsg = pb.MsgPreVote
      		// PreVote RPCs are sent for the next term before we've incremented r.Term.
      		term = r.Term + 1
      	} else {
      		r.becomeCandidate()
      		voteMsg = pb.MsgVote
      		term = r.Term
      	}
      	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
      		// We won the election after voting for ourselves (which must mean that
      		// this is a single-node cluster). Advance to the next state.
      		if t == campaignPreElection {
      			r.campaign(campaignElection)
      		} else {
      			r.becomeLeader()
      		}
      		return
      	}
      	var ids []uint64
      	{
      		idMap := r.prs.Voters.IDs()
      		ids = make([]uint64, 0, len(idMap))
      		for id := range idMap {
      			ids = append(ids, id)
      		}
      		sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
      	}
      	for _, id := range ids {
      		if id == r.id {
      			continue
      		}
      		r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
      			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
      
      		var ctx []byte
      		if t == campaignTransfer {
      			ctx = []byte(t)
      		}
      		r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
      	}
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
    • 另一方面,其他节点在接受到这个请求后,会首先比较接收到的Term是不是比自己的大,以及接受到的日志信息是不是比自己的要新,从而决定是否投票。这个逻辑我们还是可以从Step函数中找到:

      func (r *raft) Step(m pb.Message) error {
          //...
          switch m.Type {
          case pb.MsgVote, pb.MsgPreVote:
              // We can vote if this is a repeat of a vote we've already cast...
              canVote := r.Vote == m.From ||
                  // ...we haven't voted and we don't think there's a leader yet in this term...
                  (r.Vote == None && r.lead == None) ||
                  // ...or this is a PreVote for a future term...
                  (m.Type == pb.MsgPreVote && m.Term > r.Term)
              // ...and we believe the candidate is up to date.
              if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
                  r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
              } else {
                  r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
              }
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
    • 最后当candidate节点收到投票回复后,就会计算收到的选票数目是否大于所有节点数的一半,如果大于则自己成为leader,并昭告天下,否则将自己置为follower:

      func (r *raft) Step(m pb.Message) error {
          //...
          switch m.Type {
          case myVoteRespType:
              gr := r.poll(m.From, m.Type, !m.Reject)
              switch r.quorum() {
              case gr:
                  if r.state == StatePreCandidate {
                      r.campaign(campaignElection)
                  } else {
                      r.becomeLeader()
                      r.bcastAppend()
                  }
              case len(r.votes) - gr:
                  r.becomeFollower(r.Term, None)
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17

    Life of a Write Request

    1. 一个写请求一般会通过调用node.Propose开始,Propose方法将这个写请求封装到一个MsgProp消息里面,发送给自己处理。

    2. 消息处理函数Step无法直接处理这个消息,它会调用那个小写的step函数,来根据当前的状态进行处理。

      • 如果当前是follower,那它会把这个消息转发给leader。

        func stepFollower(r *raft, m pb.Message) error {
            switch m.Type {
            case pb.MsgProp:
                //...
                m.To = r.lead
                r.send(m)
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
    3. Leader收到这个消息后(不管是follower转发过来的还是自己内部产生的)会有两步操作:

      1. 将这个消息添加到自己的log里

      2. 向其他follower广播这个消息

        func stepLeader(r *raft, m pb.Message) error {
            switch m.Type {
            case pb.MsgProp:
                //...
                if !r.appendEntry(m.Entries...) {
                    return ErrProposalDropped
                }
                r.bcastAppend()
                return nil
            }
        }
        
        • 1
        • 2
        • 3
        • 4
        • 5
        • 6
        • 7
        • 8
        • 9
        • 10
        • 11
    4. 在follower接受完这个log后,会返回一个MsgAppResp消息。

    5. 当leader确认已经有足够多的follower接受了这个log后,它首先会commit这个log,然后再广播一次,告诉别人它的commit状态。这里的实现就有点像两阶段提交了。

      func stepLeader(r *raft, m pb.Message) error {
          switch m.Type {
          case pb.MsgAppResp:
              //...
              if r.maybeCommit() {
                  r.bcastAppend()
              }
          }
      }
      
      // maybeCommit attempts to advance the commit index. Returns true if
      // the commit index changed (in which case the caller should call
      // r.bcastAppend).
      func (r *raft) maybeCommit() bool {
          //...
          mis := r.matchBuf[:len(r.prs)]
          idx := 0
          for _, p := range r.prs {
              mis[idx] = p.Match
              idx++
          }
          sort.Sort(mis)
          mci := mis[len(mis)-r.quorum()]
          return r.raftLog.maybeCommit(mci, r.Term)
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
  • 相关阅读:
    本地生活小程序有什么功能_本地生活小程序的优势
    极智开发 | 讲解 Nginx 特性之三:动静分离
    微信支付步骤
    【Linux】进程的地址空间
    有数BI开发
    NVIDIA 7th SkyHackathon(二)开发套件的安装与测试
    Python中通过property设置类属性的访问
    APIView视图的添加与查看
    Redis(单/多)线程
    请协助我搭建 bert
  • 原文地址:https://blog.csdn.net/weixin_43568060/article/details/126364806