• Zookeeper系列——3Zookeeper源码分析之Session管理及请求处理


    学习目标

    1. 理清Zookeeper的Session创建、刷新和过期流程分析

    2. 明确Zookeeper的核心业务调用链

    第1章 Session创建

    上文给大家讲过Zookeeper的应用了,实际上Zookeeper分为两个模块,server端和client端,server端实现了所有的zookeeper业务逻辑,而client端就是封装了server端的一些方法调用。既然存在两个模块,那肯定涉及到了网络通信,ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现;使用ServerCnxn代表一个客户端与服务端的连接。从单机版启动中可以发现Zookeeper默认通信组件为NIOServerCnxnFactory。接下来我们先来看看Zookeeper中通信的流程图,然后再详细分析源码。

     下面给出了我们客户端发起连接请求及服务端建立会话的全流程图

    1.1 客户端发送请求

    上文中已经讲过,建立连接是通过new ZooKeeper方法完成的,在ZooKeeper的构造方法中会创建一个ClientCnxn对象,并调用该对象的start方法,在该方法中会启动两个线程任务:sendThread和eventThread。

    而sendThread线程就是我们去建立连接的核心线程,在该线程的run方法中实际上是通过一个while循环,不断的执行,如果是第一次进来会去创建连接,如果连接状态是CONNECTED的话,则会最大不超过10秒去发送一次Ping请求保证连接不断开。

    源码比较长,有些不重要的代码就直接省略了。

    1. public void run() {
    2. //发送Ping的间隔
    3. final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    4. while (state.isAlive()) {
    5. try {
    6. //如果状态是CONNECTING的话就去创建连接
    7. if (!clientCnxnSocket.isConnected()) {
    8. startConnect(serverAddress);
    9. }
    10. //如果已经连接成功,则最大不超过10秒发送一次心跳
    11. if (state.isConnected()) {
    12. //这段逻辑实际上就是控制心跳的是发送间隔,避免过多的发送
    13. int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
    14. ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
    15. if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
    16. sendPing();
    17. clientCnxnSocket.updateLastSend();
    18. } else {
    19. if (timeToNextPing < to) {
    20. to = timeToNextPing;
    21. }
    22. }
    23. }
    24. }
    25. }
    1. void connect(InetSocketAddress addr) throws IOException {
    2. SocketChannel sock = createSock();
    3. try {
    4. //会调用ZK的服务端完成会话创建
    5. registerAndConnect(sock, addr);
    6. } catch (IOException e) {
    7. }
    8. }
    9. void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
    10. throws IOException {
    11. sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    12. //调用NIO开启会话
    13. boolean immediateConnect = sock.connect(addr);
    14. }

    1.2 服务端接收连接

    服务端由NIOServerCnxnFactory启动线程去接收请求,NIOServerCnxnFactory启动时会启动四类线程:

    • AcceptThread:该线程接收来自客户端的连接,并将其分配给SelectorThread(启动一个线程)。

    • SelectorThread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个SelectorThread,使用系统属性zookeeper.nio.numSelectorThreads配置该类线程数,默认个数为 核心数/2。

    • WorkerThread:该线程执行基本的套接字读写,使用系统属性zookeeper.nio.numWorkerThreads配置该类线程数,默认为核心数∗2核心数∗2.如果该类线程数为0,则另外启动一线程进行IO处理,见下文worker thread介绍。

    • ConnectionExpirationThread:若连接上的session已过期,则关闭该连接。

    1.2.1 AcceptThread

    该线程会接收客户端的请求

    1. public void run() {
    2. while (!stopped && !acceptSocket.socket().isClosed()) {
    3. select();
    4. }
    5. }
    1. private void select() {
    2. try {
    3. //查找就绪的连接
    4. selector.select();
    5. Iterator selectedKeys =
    6. selector.selectedKeys().iterator();
    7. while (!stopped && selectedKeys.hasNext()) {
    8. if (key.isAcceptable()) {
    9. //1:和当前服务建立链接。
    10. //2:获取远程客户端计算机地址信息。
    11. //3:判断当前链接是否超出最大限制。
    12. //4:调整为非阻塞模式。
    13. //5:轮询获取一个SelectorThread,将当前链接分配给该SelectorThread。
    14. //6:将当前请求添加到该SelectorThread的acceptedQueue中,并唤醒该SelectorThread。
    15. if (!doAccept()) {
    16. pauseAccept(10);
    17. }
    18. }
    19. }
    20. }
    21. }

    进入到doAccept方法中

    1. private boolean doAccept() {
    2. ...
    3. try {
    4. //建立连接
    5. sc = acceptSocket.accept();
    6. accepted = true;
    7. //获取远程计算机地址信息
    8. InetAddress ia = sc.socket().getInetAddress();
    9. int cnxncount = getClientCnxnCount(ia);
    10. //判断是否超出最大客户端连接的限制
    11. if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
    12. ...
    13. }
    14. LOG.debug("Accepted socket connection from "
    15. + sc.socket().getRemoteSocketAddress());
    16. //调整此通道的阻塞模式
    17. sc.configureBlocking(false);
    18. //轮询将此连接分配给一个SelectorThread
    19. if (!selectorIterator.hasNext()) {
    20. selectorIterator = selectorThreads.iterator();
    21. }
    22. SelectorThread selectorThread = selectorIterator.next();
    23. //将新连接加入SelectorThread的acceptedQueue中,并唤醒SelectorThread
    24. if (!selectorThread.addAcceptedConnection(sc)) {
    25. ...
    26. }
    27. acceptErrorLogger.flush();
    28. } catch (IOException e) {
    29. ...
    30. }
    31. return accepted;
    32. }
    33. }
    1. public boolean addAcceptedConnection(SocketChannel accepted) {
    2. //将accepted添加到acceptedQueue
    3. if (stopped || !acceptedQueue.offer(accepted)) {
    4. return false;
    5. }
    6. //唤醒SelectorThread
    7. wakeupSelector();
    8. return true;
    9. }

    在addAcceptedConnection方法中会唤醒SelectorThread,所以,接下来,逻辑会进入到SelectorThread.run方法中

    1.2.2 SelectorThread

    该线程的主要作用是从Socket读取数据,并封装成workRequest,并将workRequest交给workerPool工作线程池处理,同时将acceptedQueue中未处理的连接取出,并未每个连接绑定OP_READ读事件,并封装对应的上下文对象NIOServerCnxn。SelectorThread的run方法如下:

    1. public void run() {
    2. //读取就绪的IO事件,交由worker thread处理,在ZookeeperServer的processPacket()中处理数据
    3. select();
    4. //把acceptedQueue队列中接收的连接,取出来注册OP_READ事件,
    5. //并添加NIOServerCnxn对象与当前key绑定
    6. //相当于给每个连接添加附加对象NIOServerCnxn(上下文对象)
    7. processAcceptedConnections();
    8. //遍历所有updateQueue,更新updateQueue中连接的监听事件
    9. processInterestOpsUpdateRequests();
    10. }

    先来看看processAcceptedConnections方法,该方法中会为每个连接创建一个NIOServerCnxn对象,同时也会调用服务续约的逻辑

    1. private void processAcceptedConnections() {
    2. SocketChannel accepted;
    3. while (!stopped && (accepted = acceptedQueue.poll()) != null) {
    4. SelectionKey key = null;
    5. key = accepted.register(selector, SelectionKey.OP_READ);
    6. // 针对每个连接,创建一个NIOServerCnxn
    7. NIOServerCnxn cnxn = createConnection(accepted, key, this);
    8. key.attach(cnxn);
    9. addCnxn(cnxn);
    10. }
    11. }

    这块不是很重要,我们不往深挖,接着回去看select方法

    1. private void select() {
    2. selector.select();
    3. Set selected = selector.selectedKeys();
    4. ArrayList selectedList = new ArrayList(selected);
    5. Collections.shuffle(selectedList);
    6. Iterator selectedKeys = selectedList.iterator();
    7. while (!stopped && selectedKeys.hasNext()) {
    8. SelectionKey key = selectedKeys.next();
    9. selected.remove(key);
    10. if (key.isReadable() || key.isWritable()) {
    11. //核心逻辑
    12. handleIO(key);
    13. }
    14. }
    15. }

    handleIO()方法会封装当前SelectorThread为IOWorkRequest,并将IOWorkRequest交给workerPool来调度,而workerPool调度才是读数据的开始,源码如下:

    1. private void handleIO(SelectionKey key) {
    2. //将SelectorThread封装成workRequest对象
    3. IOWorkRequest workRequest = new IOWorkRequest(this, key);
    4. //处理服务续约的方法
    5. touchCnxn(cnxn);
    6. //将封装好的workRequest交给线程池去处理,在这里读取客户端数据
    7. workerPool.schedule(workRequest);
    8. }

    我们先来看看处理续约的方法,不只是在这里调用了NIOServerCnxnFactory.touchCnxn(NIOServerCnxn)方法。

    1. public void touchCnxn(NIOServerCnxn cnxn) {
    2. cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
    3. }

    进入到update方法中,会发现是ExpiryQueue中的一个方法,从名字上能看出来,ExpiryQueue实际上就是服务端管理session过期的队列

    1. // 维护每个NIOServerCnxn对应的过期时间
    2. private final ConcurrentHashMap elemMap = new ConcurrentHashMap();
    3. // 维护每个过期时间对应的桶里有哪些NIOServerCnxn
    4. private final ConcurrentHashMap> expiryMap = new ConcurrentHashMap>();
    5. private final AtomicLong nextExpirationTime = new AtomicLong();
    6. public Long update(E elem, int timeout) {
    7. Long prevExpiryTime = elemMap.get(elem);//获取当前NIOServerCnxn对应的过期时间
    8. long now = Time.currentElapsedTime();
    9. Long newExpiryTime = roundToNextInterval(now + timeout);//获取下次过期时间
    10. if (newExpiryTime.equals(prevExpiryTime)) {
    11. return null; // No change, so nothing to update
    12. }
    13. // First add the elem to the new expiry time bucket in expiryMap.
    14. Set set = expiryMap.get(newExpiryTime); //拿到下一个过期时间的桶
    15. if (set == null) {
    16. // Construct a ConcurrentHashSet using a ConcurrentHashMap
    17. set = Collections.newSetFromMap(new ConcurrentHashMap());
    18. // Put the new set in the map, but only if another thread hasn't beaten us to it
    19. Set existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
    20. if (existingSet != null) {
    21. set = existingSet;
    22. }
    23. }
    24. set.add(elem); //把原来的NIOServerCnxn移动到新的桶里
    25. // Map the elem to the new expiry time. If a different previous
    26. // mapping was present, clean up the previous expiry bucket.
    27. prevExpiryTime = elemMap.put(elem, newExpiryTime);
    28. if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
    29. Set prevSet = expiryMap.get(prevExpiryTime);
    30. if (prevSet != null) {
    31. prevSet.remove(elem); //清空之前过期的桶
    32. }
    33. }
    34. return newExpiryTime;
    35. }

    ok,简单了解了过期时间的更新,我们在回到之前讲的通过工作线程池去处理workRequest对象读取客户端数据的流程

    1.2.3 WorkerThread

    WorkerThread相比上面的线程而言,调用关系颇为复杂,设计到了多个对象方法调用,主要用于处理IO,但并未对数据做出处理,数据处理将有业务链对象RequestProcessor处理,调用关系图如下:

     

    1. public void schedule(WorkRequest workRequest) {
    2. schedule(workRequest, 0);
    3. }
    4. public void schedule(WorkRequest workRequest, long id) {
    5. ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
    6. int size = workers.size();
    7. int workerNum = ((int) (id % size) + size) % size;
    8. ExecutorService worker = workers.get(workerNum);
    9. worker.execute(scheduledWorkRequest);
    10. }
    11. WorkerService.ScheduledWorkRequest
    12. private class ScheduledWorkRequest implements Runnable {
    13. @Override
    14. public void run() {
    15. //IOWorkRequest.doWork
    16. workRequest.doWork();
    17. }
    18. }
    1. private class IOWorkRequest extends WorkerService.WorkRequest {
    2. public void doWork() throws InterruptedException {
    3. if (key.isReadable() || key.isWritable()) {
    4. //执行IO数据处理
    5. cnxn.doIO(key);
    6. //再次见到这个方法,做服务续约的
    7. touchCnxn(cnxn);
    8. }
    9. }
    10. }

    后面的一些细节我们就不展开了,通过doIO方法最终会调用到readPayload。

    1. private void readPayload() throws IOException, InterruptedException {
    2. if (incomingBuffer.remaining() != 0) { // have we read length bytes?
    3. int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
    4. if (rc < 0) {
    5. throw new EndOfStreamException(
    6. "Unable to read additional data from client sessionid 0x"
    7. + Long.toHexString(sessionId)
    8. + ", likely client has closed socket");
    9. }
    10. }
    11. if (incomingBuffer.remaining() == 0) { // have we read length bytes?
    12. packetReceived();
    13. incomingBuffer.flip();
    14. //第一次未初始化时,读取连接请求
    15. if (!initialized) {
    16. readConnectRequest();
    17. } else {
    18. readRequest();
    19. }
    20. lenBuffer.clear();
    21. incomingBuffer = lenBuffer;
    22. }
    23. }

    此时如果initialized=false,表示第一次连接 需要创建Session(createSession),此处调用readConnectRequest()后,在readConnectRequest()方法中会将initialized设置为true,只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令。

    1. private void readConnectRequest() throws IOException, InterruptedException {
    2. if (!isZKServerRunning()) {
    3. throw new IOException("ZooKeeperServer not running");
    4. }
    5. zkServer.processConnectRequest(this, incomingBuffer);
    6. //下次进来就不会再来创建了
    7. initialized = true;
    8. }

    上面方法还调用了processConnectRequest处理连接请求, processConnectRequest 第一次从请求中获取的sessionId=0,此时会把创建Session作为一个业务,会调用createSession()方法,processConnectRequest 方法部分关键代码如下:

    1. public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) {
    2. BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
    3. ConnectRequest connReq = new ConnectRequest();//创建连接请求
    4. connReq.deserialize(bia, "connect"); //反序列化连接请求参数
    5. long sessionId = connReq.getSessionId(); //创建一个sessionId
    6. int sessionTimeout = connReq.getTimeOut();
    7. byte[] passwd = connReq.getPasswd();
    8. cnxn.setSessionTimeout(sessionTimeout);
    9. if (sessionId == 0) {
    10. long id = createSession(cnxn, passwd, sessionTimeout); //创建session
    11. }
    12. }

    创建会话调用createSession(),该方法会首先创建一个sessionId,并把该sessionId作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理,createSession()源码如下:

    1. long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    2. if (passwd == null) {
    3. // Possible since it's just deserialized from a packet on the wire.
    4. passwd = new byte[0];
    5. }
    6. //sessionTracker去创建一个sessionId
    7. long sessionId = sessionTracker.createSession(timeout);
    8. Random r = new Random(sessionId ^ superSecret);
    9. r.nextBytes(passwd);
    10. ByteBuffer to = ByteBuffer.allocate(4);
    11. to.putInt(timeout);
    12. cnxn.setSessionId(sessionId);
    13. //创建一个OpCode.createSession请求(根据SessionId提交一个创建会话的业务)
    14. Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
    15. setLocalSessionFlag(si);
    16. //提交业务
    17. submitRequest(si);
    18. return sessionId;
    19. }

    上面方法用到的sessionTracker.createSession(timeout)做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:

    1. public long createSession(int sessionTimeout) {
    2. //获取下一个SessionId
    3. long sessionId = nextSessionId.getAndIncrement();
    4. //Session跟踪配置
    5. addSession(sessionId, sessionTimeout);
    6. return sessionId;
    7. }

    会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,addSession方法实现了Session创建、Session队列存储、Session过期队列存储,trackSession方法源码如下:

    1. public synchronized boolean addSession(long id, int sessionTimeout) {
    2. sessionsWithTimeout.put(id, sessionTimeout);
    3. boolean added = false;
    4. //获取一个Session,如果为空,则以SessionId创建一个Session
    5. SessionImpl session = sessionsById.get(id);
    6. if (session == null){
    7. session = new SessionImpl(id, sessionTimeout);
    8. }
    9. // findbugs2.0.3 complains about get after put.
    10. // long term strategy would be use computeIfAbsent after JDK 1.8
    11. //Session存入到sessionById中,可以根据ID获取到Session
    12. SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
    13. if (existedSession != null) {
    14. session = existedSession;
    15. } else {
    16. added = true;
    17. LOG.debug("Adding session 0x" + Long.toHexString(id));
    18. }
    19. if (LOG.isTraceEnabled()) {
    20. String actionStr = added ? "Adding" : "Existing";
    21. ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
    22. "SessionTrackerImpl --- " + actionStr + " session 0x"
    23. + Long.toHexString(id) + " " + sessionTimeout);
    24. }
    25. //将Session添加到失效队列中
    26. updateSessionExpiry(session, sessionTimeout);
    27. return added;
    28. }

    第2章 Session刷新

    也可以叫服务续约,客户端除了PING请求以外,其他正常的CRUD请求也会对session续约,这里以PING请求为例

    ClientCnxn.SendThread

    1. public void run() {
    2. clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    3. clientCnxnSocket.updateNow();
    4. clientCnxnSocket.updateLastSendAndHeard();
    5. while (state.isAlive()) {
    6. //如果连接建立,每隔段时间发送PING请求
    7. if (state.isConnected()) {
    8. //1000(1 second) is to prevent race condition missing to send the second ping
    9. //also make sure not to send too many pings when readTimeout is small
    10. int timeToNextPing = readTimeout / 2
    11. - clientCnxnSocket.getIdleSend()
    12. - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
    13. //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
    14. if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
    15. //发送PING请求
    16. sendPing();
    17. clientCnxnSocket.updateLastSend();
    18. } else {
    19. if (timeToNextPing < to) {
    20. to = timeToNextPing;
    21. }
    22. }
    23. }
    24. }
    25. }

    发送PING请求给服务端

    1. private void sendPing() {
    2. lastPingSentNs = System.nanoTime();
    3. RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
    4. queuePacket(h, null, null, null, null, null, null, null, null);
    5. }
    1. public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
    2. ...
    3. //这段逻辑实际上就是唤醒一个sendThread线程,其实再去调用一下sendThread.run方法,在这个方法里面会重新发请求到服务端
    4. sendThread.getClientCnxnSocket().packetAdded();
    5. return packet;
    6. }
    1. //重复执行sendThread.run
    2. public void run() {
    3. clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
    4. //发送请求
    5. clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
    6. }
    1. void doTransport(int waitTimeOut, List pendingQueue, LinkedList outgoingQueue,
    2. ClientCnxn cnxn)
    3. throws IOException, InterruptedException {
    4. for (SelectionKey k : selected) {
    5. SocketChannel sc = ((SocketChannel) k.channel());
    6. if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
    7. if (sc.finishConnect()) {
    8. updateLastSendAndHeard();
    9. //发送PING
    10. sendThread.primeConnection();
    11. }
    12. } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
    13. doIO(pendingQueue, outgoingQueue, cnxn);
    14. }
    15. }
    16. if (sendThread.getZkState().isConnected()) {
    17. synchronized(outgoingQueue) {
    18. if (findSendablePacket(outgoingQueue,
    19. cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
    20. enableWrite();
    21. }
    22. }
    23. }
    24. selected.clear();
    25. }

    服务端会收到客户端的PING请求,同样也是AcceptedThread接收请求,然后执行的逻辑同Session的创建流程,最后进入到了SelectorThread.run——>select()——>handleIO(key)——>touchCnxn(cnxn)。

    第3章 Session过期

    通过Session创建的源码分析其实大家应该也能看出来,对于Session的过期属性的管理的是SessionTrackerImpl这个类,而它也是一个线程类,继承了 ZooKeeperCriticalThread ,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭。

    1. public void run() {
    2. try {
    3. while (running) {
    4. //获取下一个失效时间
    5. long waitTime = sessionExpiryQueue.getWaitTime();
    6. if (waitTime > 0) {
    7. //休眠
    8. Thread.sleep(waitTime);
    9. continue;
    10. }
    11. //获取失效的客户端会话集合
    12. for (SessionImpl s : sessionExpiryQueue.poll()) {
    13. //把Session会话的 isClosing 状态设置为了true
    14. setSessionClosing(s.sessionId);
    15. //让客户端会话失效
    16. expirer.expire(s);
    17. }
    18. }
    19. } catch (InterruptedException e) {
    20. handleException(this.getName(), e);
    21. }
    22. LOG.info("SessionTrackerImpl exited loop!");
    23. }

    让客户端失效的方法 expirer.expire(s); 其实也是一个业务操作,主要调用了ZooKeeperServer.expire() 方法,而该方法获取SessionId后,又创建了一个OpCode.closeSession 的请求,并交给业务链处理,我们查看 ZooKeeperServer.expire() 方法源码如下:

    1. public void expire(Session session) {
    2. long sessionId = session.getSessionId();
    3. LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
    4. + ", timeout of " + session.getTimeout() + "ms exceeded");
    5. close(sessionId);
    6. }
    1. private void close(long sessionId) {
    2. //创建一个OpCode.closeSession业务请求
    3. Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
    4. setLocalSessionFlag(si);
    5. //提交给业务链处理
    6. submitRequest(si);
    7. }

    道理发现又出现了submitRequest方法,这里暂时先不讲,在下文的请求处理中再详细介绍,在这里只需要知道,我们会调用该方法将我们的session关闭就好了。

    第4章 请求处理

    zookeeper 的业务处理流程就像工作流一样,其实就是一个单链表;在zookeeper启动的时候,会确立各个节点的角色特性,即leader、follower和observer,每个角色确立后,就会初始化它的工作责任链;

    4.1 RequestProcessor结构

    客户端请求过来,每次执行不同事务操作的时候,Zookeeper也提供了一套业务处理流程RequestProcessor。

    我们来看一下RequestProcessor初始化流程,ZooKeeperServer.setupRequestProcessors()方法源码如下:

    1. /**
    2. * 初始化业务处理流程
    3. */
    4. protected void setupRequestProcessors() {
    5. //创建FinalRequestProcessor
    6. RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    7. //创建SyncRequestProcessor,并将FinalProcessor作为它下一个业务链
    8. RequestProcessor syncProcessor = new SyncRequestProcessor(this,
    9. finalProcessor);
    10. //启动syncProcessor
    11. ((SyncRequestProcessor)syncProcessor).start();
    12. //创建PrepRequestProcessor,并作为第一个处理业务的RequestProcessor,将syncProcessor作为它的下一个业务链
    13. firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    14. //启动firstProcessor
    15. ((PrepRequestProcessor)firstProcessor).start();
    16. }

    syncProcessor创建时,将finalProcessor作为参数传递进来源码如下:

    1. /**
    2. * 创建SyncRequestProcessor,下一个责任链 FinalRequestProcessor
    3. * @param zks
    4. * @param nextProcessor
    5. */
    6. public SyncRequestProcessor(ZooKeeperServer zks,
    7. RequestProcessor nextProcessor) {
    8. super("SyncThread:" + zks.getServerId(), zks
    9. .getZooKeeperServerListener());
    10. this.zks = zks;
    11. //下一个责任链
    12. this.nextProcessor = nextProcessor;
    13. running = true;
    14. }

    firstProcessor创建时,将syncProcessor作为参数传递进来源码如下:

    1. public PrepRequestProcessor(ZooKeeperServer zks,
    2. RequestProcessor nextProcessor) {
    3. super("ProcessThread(sid:" + zks.getServerId() + " cport:"
    4. + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
    5. this.nextProcessor = nextProcessor;
    6. this.zks = zks;
    7. }

    PrepRequestProcessor/SyncRequestProcessor关系图:

     

    PrepRequestProcessor和SyncRequestProcessor的结构一样,都是实现了Thread的一个线程,所以在这里初始化时便启动了这两个线程。

    4.2 PrepRequestProcessor

    PrepRequestProcessor是请求处理器的第1个处理器,我们把之前的请求业务处理衔接起来,一步一步分析。ZooKeeperServer.processPacket()>submitRequest()>enqueueRequest()>RequestThrottler.submitRequest() ,我们来看下RequestThrottler.submitRequest()源码,它将当前请求添加到submittedRequests队列中了,源码如下:

    在submitRequest中会执行firstProcessor.processRequest方法,会进入到PrepRequestProcessor.processRequest(request)

    1. public void processRequest(Request request) {
    2. submittedRequests.add(request);
    3. }
    4. public void run() {
    5. while (true) {
    6. Request request = submittedRequests.take();
    7. pRequest(request);
    8. }
    9. }
    10. protected void pRequest(Request request) throws RequestProcessorException {
    11. request.setHdr(null);
    12. request.setTxn(null);
    13. switch (request.type) {
    14. case OpCode.createSession: //针对连接请求做处理
    15. case OpCode.closeSession:
    16. if (!request.isLocalSession()) {
    17. pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
    18. }
    19. break;
    20. }
    21. }
    22. protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
    23. switch (type) {
    24. case OpCode.createSession:
    25. int to = request.request.getInt();
    26. request.setTxn(new CreateSessionTxn(to));
    27. zks.sessionTracker.trackSession(request.sessionId, to);
    28. zks.setOwner(request.sessionId, request.getOwner());
    29. break;
    30. }
    31. }

    从代码可以看出pRequest2Txn()方法主要做了权限校验、快照记录、事务信息记录相关的事,还并未涉及数据处理,也就是说PrepRequestProcessor其实是做了操作前权限校验、快照记录、事务信息记录相关的事。

    4.3 SyncRequestProcessor

    分析了PrepRequestProcessor处理器后,接着来分析SyncRequestProcessor,该处理器主要是将请求数据高效率存入磁盘,并且请求在写入磁盘之前是不会被转发到下个处理器的。

    我们先看请求被添加到队列的方法:

    1. public void processRequest(Request request) {
    2. // request.addRQRec(">sync");
    3. //将请求添加到queueRequest队列中
    4. queuedRequests.add(request);
    5. }

    同样SyncRequestProcessor是一个线程,执行队列中的请求也在线程中触发,我们看它的run方法,源码如下:

    1. public void run() {
    2. try {
    3. int logCount = 0;
    4. // we do this in an attempt to ensure that not all of the servers
    5. // in the ensemble take a snapshot at the same time
    6. int randRoll = r.nextInt(snapCount/2);
    7. while (true) {
    8. Request si = null;
    9. if (toFlush.isEmpty()) {
    10. //阻塞方法获取一个请求
    11. si = queuedRequests.take();
    12. } else {
    13. si = queuedRequests.poll();
    14. if (si == null) {
    15. flush(toFlush);
    16. continue;
    17. }
    18. }
    19. if (si == requestOfDeath) {
    20. break;
    21. }
    22. if (si != null) {
    23. // track the number of records written to the log
    24. if (zks.getZKDatabase().append(si)) {
    25. logCount++;
    26. if (logCount > (snapCount / 2 + randRoll)) {
    27. randRoll = r.nextInt(snapCount/2);
    28. // roll the log
    29. //重置上次rollLog以来的txn数量
    30. zks.getZKDatabase().rollLog();
    31. // take a snapshot
    32. if (snapInProcess != null && snapInProcess.isAlive()) {
    33. LOG.warn("Too busy to snap, skipping");
    34. } else {
    35. snapInProcess = new ZooKeeperThread("Snapshot Thread") {
    36. public void run() {
    37. try {
    38. //保存快照数据
    39. zks.takeSnapshot();
    40. } catch(Exception e) {
    41. LOG.warn("Unexpected exception", e);
    42. }
    43. }
    44. };
    45. snapInProcess.start();
    46. }
    47. logCount = 0;
    48. }
    49. } else if (toFlush.isEmpty()) {
    50. // optimization for read heavy workloads
    51. // iff this is a read, and there are no pending
    52. // flushes (writes), then just pass this to the next
    53. // processor
    54. if (nextProcessor != null) {
    55. nextProcessor.processRequest(si);
    56. if (nextProcessor instanceof Flushable) {
    57. ((Flushable)nextProcessor).flush();
    58. }
    59. }
    60. continue;
    61. }
    62. //将当前请求添加到toFlush队列中,toFlush队列是已经写入并等待刷新到磁盘的事务
    63. toFlush.add(si);
    64. if (toFlush.size() > 1000) {
    65. //提交数据
    66. flush(toFlush);
    67. }
    68. }
    69. }
    70. } catch (Throwable t) {
    71. handleException(this.getName(), t);
    72. } finally{
    73. running = false;
    74. }
    75. LOG.info("SyncRequestProcessor exited!");
    76. }

    run方法会从queuedRequests队列中获取一个请求,如果获取不到就会阻塞等待直到获取到一个请求对象,程序才会继续往下执行,接下来会调用Snapshot Thread线程实现将客户端发送的数据以快照的方式写入磁盘,最终调用flush()方法实现数据提交,flush()方法源码如下:

    1. private void flush(LinkedList toFlush)
    2. throws IOException, RequestProcessorException
    3. {
    4. if (toFlush.isEmpty())
    5. return;
    6. //数据提交
    7. zks.getZKDatabase().commit();
    8. while (!toFlush.isEmpty()) {
    9. Request i = toFlush.remove();
    10. if (nextProcessor != null) {
    11. //调用下一个业务链
    12. nextProcessor.processRequest(i);
    13. }
    14. }
    15. if (nextProcessor != null && nextProcessor instanceof Flushable) {
    16. ((Flushable)nextProcessor).flush();
    17. }
    18. }

    flush()方法实现了数据提交,并且会将请求交给下一个业务链,下一个业务链为FinalRequestProcessor

    4.4 FinalRequestProcessor

    前面分析了SyncReqeustProcessor,接着分析请求处理链中最后的一个处理器FinalRequestProcessor,该业务处理对象主要用于返回Response。

    在SyncRequestProcessor对txn(创建session的操作)进行持久化,在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去。

    1. public void processRequest(Request request) {
    2. ProcessTxnResult rc = zks.processTxn(request);
    3. switch (request.type) {
    4. case OpCode.createSession: {
    5. lastOp = "SESS";
    6. updateStats(request, lastOp, lastZxid);
    7. zks.finishSessionInit(request.cnxn, true);
    8. return;
    9. }
    10. }
    11. if (path == null || rsp == null) {
    12. cnxn.sendResponse(hdr, rsp, "response"); //服务端将请求返回,这时客户端会收到服务端响应
    13. }
    14. }
    15. public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
    16. // register with JMX
    17. if (valid) {
    18. if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
    19. serverCnxnFactory.registerConnection(cnxn);
    20. } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
    21. secureServerCnxnFactory.registerConnection(cnxn);
    22. }
    23. }
    24. }

    调用sendResponse方法后会将请求信息返回给客户端

    客户端收到服务端响应

    1. ClientCnxnSocketNIO.doIO(Queue, ClientCnxn)
    2. void doIO(Queue pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
    3. if (sockKey.isReadable()) {
    4. if (!initialized) {
    5. readConnectResult();
    6. }
    7. }
    8. }
    9. void readConnectResult() throws IOException {
    10. ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    11. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    12. ConnectResponse conRsp = new ConnectResponse();
    13. conRsp.deserialize(bbia, "connect");
    14. this.sessionId = conRsp.getSessionId(); //连接建立完成
    15. sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
    16. }

    下文预告

    1. 理解Zookeeper的watcher机制原理

  • 相关阅读:
    9.复杂的例子:模块的使用和自定义模块
    C语言结构体的存储空间分配
    Nginx反向代理服务器搭建(超详细)
    【SpringBoot实战】视图技术-Thymeleaf
    修改el-radio-group样式,自定义单选组件
    vue日历插件vue-full-calendar
    flex布局回顾
    [C#]使用C#部署yolov8的目标检测tensorrt模型
    Java后端接口幂等的方案
    Kubernetes家族容器小管家Pod在线答疑?
  • 原文地址:https://blog.csdn.net/Eclipse_2019/article/details/126362657