• 【ZooKeeper】zookeeper源码7-FastLeaderElection网络通信机制&选票交换机制&lookForLeader执行选举


    lookForLeader执行选举

    参看下面,第1步至第10步

    FastLeaderElection网络通信机制

    涉及网络连接的只和QuorumCnxnManager有关系,和FastLeaderElection没太多关系。
    多个服务间,只能myid大的作为客户端,myid小的作为服务端。
    一个节点中既有客户端又有服务端(排除最小,最大)

    
    QuorumCnxManager.java
    	-run()
    		-new ListenerHandler
    			-run()
    				-acceptConnections();
    					-while((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry))
    					//带重试机制的创建BIO服务端和绑定端口
    						-serverSocket = createNewServerSocket();
    						//BIO的服务端创建
    							-socket = new ServerSocket();
    							-socket.bind(address);
    						-client = serverSocket.accept();
    						//服务端连接的建立
    						-receiveConnection(client);
    						//服务端接收客户端发来的连接,完成连接
    							-din = new DataInputStream(new BufferedInputStream(sock.getInputStream));
    							//初始化一个输入流,读取myid
    							-handleConnection(sock,din);
    							//客户端会写四个信息过来 1.protocolVersion 2.myid(关键) 3.address length 4.address
    								-protocolVersion = din.readLong();
    								//读取8个字节的数据,协议版本号
    								-InitialMessage init = InitialMessage.parse(protocolVersion,din);
    								//对输入流封装成一个对象
    								-sid = init.sid;
    								//对象中有个sid是myid 对方id客户端id,现在是服务端,当对比myid大小后确定谁是客户端谁是服务端
    								-if(sid < self.getId())
    								//如果对方myid小于自己的myid,对建立的连接进行关闭,并向对方发送连接请求
    									-connectOne(sid);
    									//此方法只要在当前节点执行,说明为客户端,即服务端变成客户端
    									-connectOne(sid,electionAddr);
    										-initiateConnectionAsync(electionAddr,sid);
    											-connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr,sid));
    											//往线程池中提交任务
    												-QuorumConnectionReqThread
    												-run()
    													-initiateConnection(electionAddr,sid);//两个参数为对方的
    														-sock = SOCKET_FACTORY.get();
    														//创建socket,创建BIO的客户端
    														-sock.connect(electionAddr.getReachableOrOne(),cnxTO);
    														//发起连接请求,客户端向服务端,完成物理连接:建立了TCP连接
    														//对应服务端代码Listener里ListenerHandler线程的ServerSocket.accept()
    														-startConnection(sock,sid);
    														//开始逻辑连接,抽象,封装,基本动作
    														//此处为写出四个信息给服务端
    															-BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
    															//初始化输出流
    															-dout = new DataOutputStream(buf);
    															-dout.writeLong(protocoVersion);
    															-dout.writeLong(self.getId());
    															-dout.writeInt(addr_bytes.length);
    															-dout.write(addr_bytes);
    															-if(sid>self.getId())
    															//如果对方mydi比自己大,关闭连接(此时为客户端)
    															-else
    															//初始化一组工作组件,即发送线程(其中还有个接收线程作为成员变量),发送队列
    														
    								-else if(sid == self.getId())
    								//如果对方myid等于自己的myid 不太可能
    								-else
    								//此处才是合法的,然后做各种逻辑处理
    								//发送线程(其中还有个接收线程作为成员变量),发送队列
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    FastLeaderElection选票交换机制

    ZooKeeperServer四种状态:LOOKING LEADING FOLLOWING OBSERVING
    ZAB四种状态:ELECTION DISCOVERY SYNCRONIZATION BROADCAST
    崩溃恢复:本来时LEADING/FOLLOWING状态,变成LOOKING状态,发起选举,不能对外提供服务

    while(running){
    	switch(getPeerState()){
    		case LOOKING:	
    			lookForLeader();	//ELECTION
    		case LEADING:
    			lead();				//DISCOVERY SYNCRONIZATION BROADCAST
    		case FOLLOWING:
    			followerLeader();	//DISCOVERY SYNCRONIZATION BROADCAST
    		case OBSERVING:
    			observerLeader();
    	}
    }
    
    QuorumPeer.java
    -run()
    	-while(running)
    		-switch(getPeerState)
    			-case LOOKING
    				-setCurrentVote(makeLEStrategy().lookForLeader());
    				//存储选举算法推举的Leader的信息
    				//lookForLeader()执行逻辑选举的入口
    					-lookForLeader()
    					//进入FastLeaderElection.java
    			-case OBSERVING
    				-observer.observeLeader();//observer跟随leader
    			-case FOLLOWING
    				-setFollower(makeFollower(logFactory));//创建一个follower,设置为QuorumPeer的一个变量
    				-follower.followLeader();
    			-case LEADING
    				-setLeader(makeLeader(logFactory));
    				-leader.lead();
    				-setLeader(null);
    				
    FastLeaderElection.java
    -lookForLeader
    	-self.start_fle = Time.currentElapsedTime();//选举开始时间
    	-Map<Long,Vote> recvset = new HashMap<~>();//合法投票的集合,入宫zk有7台服务器,则recvset最多有7个,标记是myid
    	-logicalclock.incrementAndGet();
    	//逻辑时钟自增,初始化为0,incrementAndGet为1,执行第一次选举
    	-updateProposal(getInitId(),getInitLastLoggedZxid(),getPeerEpoch());//更新选票
    		-proposedLeader = leader;//三个成员变量,被推举者的信息,票信息
    		-proposedZxid = zxid;
    		-proposedEpoch = epoch;
    	-sendNotifications();//广播选票给其他服务器,选票交换的入口,Vote、ToSend、Message都可理解为选票,此时对象为Notification,触发与其他服务器的连接动作
    	//第1步
    		-for(long sid:self.getCurrentAndNextConfigVotes())//给每一个服务器发一张票
    			-ToSend notmsg = new ToSend(ToSend.mType.notification,proposedLeader,proposedZxid,logicalclock.get(),QuorumPeer.ServerState.LOOKING,sid,proposedEpoch,qv.toString().getBytes());
    			//第2步,构建ToSend对象
    			-sendqueue.offer(notmsg);
    			//第2步,放入队列,WorkerSender线程接收队列
    	-Notification n = recvqueue.poll(notTimeout,TimeUnit.MILLISECONDS);
    	//获取投票,等待所有其他发自己的选票,sendNotifications和recvqueue构成一个完整的选票交换,中间经历10步
    	-//后面是选举逻辑执行
    	-if(n==null)
    	//为空情况:自己给对方发,对方没有给自己发,重发,
    		-if(manager.haveDelivered())//如果发过了,再发一次sendNotifications()
    		-else//自己没有发,manager.connectAll()连接所有节点重发
    		-int tmpTimeOut = notTimeout * 2;//不超过60s,超时时间每次翻倍,不是固定间隔时间
    	-else if(validVoter(n.sid) && validVoter(n.leader))//做合法性校验
    	//发选票的节点有选举权和被选举权,并且被选举的节点有被选举权和选举权,必须再votingMembers集合中
    		-return self.getCurrentAndNextConfigVoters().contains(sid);
    			-Set<Long> voterIds = new HashSet<>(getQuorumVerifier().getVotingMembers().keySet());
    		-switch(n.state)//对方服务器状态,WorkerRecevier线程中处理的是自己状态
    			-case LOOKING://对方也在选举
    				-if(n.electionEpoch>logicalclock.get())
    				//先统一逻辑时钟,和对方统一,清空自己投票
    				//然后做选票对比,对方更优,更新选票为对方的,否则,用自己的
    				//最后重新广播选票
    				-else if(n.electionEpoch < logicalclock.get())//不进行处理
    				-else if()//逻辑时钟相同,也会进行选票更新,为万一下一轮选举做准备
    				-recvset.put(n.sid,new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch))
    				//经过合法校验,经过逻辑时钟校验,放入recvset票箱
    				-voteSet = getVoteTracker(recvset,new Vote(proposedLeader,proposedZxid,logicalclock.get(),proposedEpoch));
    				//接收到的是n,统计n的选票集合,为什么统计n的?因为如果在n之前有合适的选票就不会进入这轮了,因此只有n的票能触发结束
    				-if(voteSet.hasAllQuorums)
    				//用n的票数去判断是否超过半数,如果是就结束了
    					-hasAllQuorums()
    					//进入SyncedLearnerTracker.java
    					-while((n==recvqueue.poll(finalizeWait,TimeUnit.MILLSECONDS))
    					//额外获取票,做一个保险
    					-if(n==null)//表示没有更多的票了
    						-Vote endVote = new Vote(proposedLeader,proposedZxid,logicalclock.get(),proposedEpoch);
    						//存储最终胜选结果
    						-return endVote;//lookForLeader方法的结果
    						//什么情况下回返回呢?
    						//当接收到一张新的票,会对这张票做合法性校验
    						//校验通过后,再做逻辑时钟的统一
    						//统一后,把票放到票箱voteSet
    						//把票的个数做个统计,做超过半数的判断
    						//超过半数,再获取一张票(保险那里)
    						//如果没有更多票,即n==null,返回结果
    						-setPeerState(proposedLeader,voteSet);//更改服务器的状态
    							-ServerState ss = (proposedLeader == self.getId())?ServerState.LEADING:learningState();
    							//如果推举的myid等自己的myid,把当前服务器置为LEADING状态
    								-learningState();
    									-return ServerState.FOLLOWING;
    									-return ServerState.OBSERVING;
    						
    			-case OBSERVING://不处理
    			-case FOLLOWING://直接返回follower信息,此处返回n的信息
    			-case LEADING://直接返回leader信息
    				-Vote endVote = new Vote(n.leader,n.zxid,n.electionEpoch,n.peerEpoch);//endVote就是n的信息,此处返回n的信息
    			
    -WorkerSender
    	-run()
    		-ToSend m = sendqueue.poll(3000,TimeUnit.MILLISECONDS);
    		//第3步,获取sendqueue队列中的选票,获取ToSend
    		-process(m);
    		//第3步,处理
    			-ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),m.leader,m.zxid,m.electionEpoch,m.configData);
    			//将ToSend构建成ByteBuffer
    			-manager.toSend(m.sid,requestBuffer);//发送,m.sid对方服务器myid,manager为QuorumCnxManager
    -WorkerReceiver
    //第8步
    	-run()
    		-response = manager.pollRecvQueue(1000,TimeUnit.MILLSECONDS);
    		//负责消费recvQueue队列,response为Message
    		-Notification n = new Notification();//构建对象,response经过校验,设置到n中
    		-if(!validVoter(response.sid))
    		//校验对方myid是否有选举权,即是否在votingMembers集合中,如果没有重发,加入sendqueue队列,也是构建成ToSend
    			-ToSend notmsg = new ToSend();
    			-sendqueue.offer(notmsg);
    		//第10步
    		-else
    		//第9步
    			-if(self.getPeerState() == QuorumPeer.ServerState.LOOKING)
    			//如果自己的状态等于LOOKING,证明正在执行选举
    				-recvqueue.offer(n);//存起合法的票
    				-if((ackstate == QuorumPeer.ServerState.LOOKING)&&(n.electionEpoch < logicalclock.get()))
    				//如果对方是LOOKING,但是对方逻辑时钟小于当前轮次,则重发,也是构建ToSend对象发送
    				-else
    					-Vote current = self.getCurrentVote();//获取当前推举的结果,将leader信息返回,也是ToSend对象放入队列
    
    QuorumCnxManager.java
    -toSend
    	-if(this.mySid == sid)//sid为目标id,等于表示是自己,此处发给自己
    	//第4步
    		-addToRecvQueue(new Message(b.duplicate(),sid));//ByteBuffer构建成Message对象
    			-final boolean success = this.recvQueue.offer(msg);//
    	-else
    	//第4步
    		-BlockingQueue<ByteBuffer> bq = queueSendMap.computeIfAbsent(sid,serverId->new CircularBlockingQueue<>(SEND_CAPACITY));
    		//如果队列不存在,new CircularBlockingQueue初始化
    		-addToSendQueue(bq,b);//发给别人,依然是ByteBuffer没变
    		-connectOne(sid);//真正去联系对方,去找connectOne(xx,xx)
    			-initiateConnectionAsync(electionAddr,sid);
    				-connectionExecutor.execute(new QuorumConnectionReqThread(electionAddr,sid));
    					-QuorumConnectionReqThread
    					-run()
    						-initiateConnection(electionAddr,sid);
    							-startConnection(sock,id);
    								-BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
    								-dout = new DataOutputStream(buf);
    								-dout.writeLong(protocolVersion);
    								-dout.writeLong(self.getId());
    								-dout.writeInt(addr_bytes.length);
    								-dout.write(addr_bytes);
    								-dout.flush();
    								-SendWorker sw = new SendWorker(sock,id);
    								//第5步
    									-run()
    										-BlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
    										//根据myid找Map中的队列
    										-ByteBuffer b = lastMessageSent.get(sid);//如果bq队列为空,无票
    										-send(b);//如果bq队列为空
    										-b = pollSendQueue(bq,1000,TimeUnit.MILLISECONDS);//如果bq队列不为空,有票
    										-lastMessageSent.put(sid,b);//用来记录最近一个票
    										-send(b);//如果bq队列不为空
    											-dout.writeInt(b.capacity());//输出流,选票长度
    											-dout.write(b.array());//选票数据
    											-dout.flush();//发送
    								-RecvWorker rw = new RecvWorker(sock,din,sid,sw);
    								//第6步
    									-run()
    										-int lenght = din.readInt();//长度
    										-din.readFully(msgArray,0,length);//数据
    										-addToRecvQueue(new Message(ByteBuffer.wrap(msgArray),sid));
    										//第7步
    										//将二进制字节包装成Message入队
    											-final boolean success = this.recvQueue.offer(msg);
    									 	-pollRecvQueue//消费recvQueue队列
    									 	//FastLeaderElection.java中WorkerReceiver线程manager调用该方法
    									 		-return this.recvQueue.poll(timeout,unit);
    
    SyncedLearnerTracker.java
    -hasAllQuorums()
    	-if(!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
    		-containsQuorum
    		//进入QuorumMaj.java
    		
    QuorumMaj.java
    -containsQuorum(Set<Long> ackSet)
    	-return (ackSet.size()>half);//ackSet选票集合,half是votingMembers半数
    					
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
  • 相关阅读:
    前端测试体系和最佳实践
    拓端tecdat|R语言提取时间序列的周期性成分应用EMD,小波滤波器,Baxter过滤器等
    [C语言、C++]数据结构作业:用递归实现走迷宫(打印正确通路即可)
    linux入门学习17
    2022最新“Java面试宝典”,8大模块+18个专题,看完你就能进大厂
    Transformer丨基础Transformer模型和代码详解
    华为路由器如何配置静态路由
    用遍历和边界的思考来总结final关键字
    反绎学习简介
    MM32F0140 UART1空闲中断接收
  • 原文地址:https://blog.csdn.net/qq_36679460/article/details/127937246