• zookeeper源码(09)follower处理客户端请求


    zookeeper中,follower也可以接收客户端连接,处理客户端请求,本文将分析follower处理客户端请求的流程:

    • 读请求处理
    • 写请求转发与响应

    follower接收转发客户端请求

    网络层接收客户端数据包

    leader、follower都会启动ServerCnxnFactory组件,用来接收客户端连接、读取客户端数据包、将客户端数据包转发给zk应用层。

    在"zookeeper源码(08)请求处理及数据读写流程"一文中已经介绍,ServerCnxn在读取到客户端数据包之后,会调用zookeeperServer的processConnectRequest或processPacket方法:

    • processConnectRequest方法:创建session
    • processPacket方法:处理业务请求

    processConnectRequest创建session

    • 会使用sessionTracker生成sessionId、创建session对象
    • 生成一个密码
    • 提交一个createSession类型Request并提交给业务处理器
    long createSession(ServerCnxn cnxn, byte[] passwd, int timeout) {
        // 生成sessionId、创建session对象
        long sessionId = sessionTracker.createSession(timeout);
        // 生成密码
        Random r = new Random(sessionId ^ superSecret);
        r.nextBytes(passwd);
        // 提交createSession类型Request
        CreateSessionTxn txn = new CreateSessionTxn(timeout);
        cnxn.setSessionId(sessionId);
        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
        submitRequest(si);
        return sessionId;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    processPacket处理业务请求

    • 封装Request
    • 验证largeRequest
    • 提交业务层处理器
    Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
    int length = request.limit();
    if (isLargeRequest(length)) {
        // checkRequestSize will throw IOException if request is rejected
        checkRequestSizeWhenMessageReceived(length);
        si.setLargeRequestSize(length);
    }
    si.setOwner(ServerCnxn.me);
    submitRequest(si);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    FollowerRequestProcessor处理器

    在follower端,客户端请求会由FollowerRequestProcessor处理:

    1. 把请求提交下游CommitProcessor处理器
    2. 写请求转发给leader处理
    3. 读请求经过CommitProcessor直接转发给FinalRequestProcessor处理器,直接查询数据返回给客户端
    public void run() {
        try {
            while (!finished) {
    
                Request request = queuedRequests.take();
    
                // Screen quorum requests against ACLs first 略
    
                // 转发给CommitProcessor处理器
                // 提交到queuedRequests队列
                // 写请求还会提交到queuedWriteRequests队列
                maybeSendRequestToNextProcessor(request);
    
                // ...
    
                // 写请求需要转发给leader处理
                switch (request.type) {
                case OpCode.sync:
                    zks.pendingSyncs.add(request); // 待同步命令
                    zks.getFollower().request(request);
                    break;
                case OpCode.create:
                case OpCode.create2:
                case OpCode.createTTL:
                case OpCode.createContainer:
                case OpCode.delete:
                case OpCode.deleteContainer:
                case OpCode.setData:
                case OpCode.reconfig:
                case OpCode.setACL:
                case OpCode.multi:
                case OpCode.check:
                    zks.getFollower().request(request);
                    break;
                case OpCode.createSession:
                case OpCode.closeSession:
                    if (!request.isLocalSession()) {
                        zks.getFollower().request(request);
                    }
                    break;
                }
            }
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    转发leader

    zks.getFollower().request(request);
    
    • 1

    Learner转发请求:

    void request(Request request) throws IOException {
        // 略
    
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream oa = new DataOutputStream(baos);
        oa.writeLong(request.sessionId); // sessionId
        oa.writeInt(request.cxid); // 客户端xid
        oa.writeInt(request.type); // 业务类型
        byte[] payload = request.readRequestBytes(); // 请求体
        if (payload != null) {
            oa.write(payload);
        }
        oa.close();
        // 封装REQUEST数据包
        QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);
        writePacket(qp, true); // 通过网络发给leader服务器
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    leader处理follower请求

    LearnerHandler接收REQUEST请求

    case Leader.REQUEST:
        bb = ByteBuffer.wrap(qp.getData());
        sessionId = bb.getLong(); // 解析请求信息
        cxid = bb.getInt();
        type = bb.getInt();
        bb = bb.slice();
        Request si;
        if (type == OpCode.sync) {
            si = new LearnerSyncRequest(
                this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
        } else {
            si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
        }
        si.setOwner(this); // 用来判断请求来自follower
        learnerMaster.submitLearnerRequest(si); // 提交给业务处理器
        requestsReceived.incrementAndGet();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    submitLearnerRequest提交业务处理器:

    public void submitLearnerRequest(Request si) {
        zk.submitLearnerRequest(si);
    }
    
    • 1
    • 2
    • 3

    LeaderZooKeeperServer提交业务处理器:

    public void submitLearnerRequest(Request request) {
        // 提交给PrepRequestProcessor处理器
        prepRequestProcessor.processRequest(request);
    }
    
    • 1
    • 2
    • 3
    • 4

    从此处开始走leader处理写请求流程。

    leader处理写请求流程回顾

    • PrepRequestProcessor - 做事务设置
    • ProposalRequestProcessor - 发起proposal,将Request转发给SyncRequestProcessor写事务log、本地ack
    • CommitProcessor - 读请求直接调用下游处理器,写请求需要等待足够的ack之后commit再调用下游RequestProcessor处理器
    • ToBeAppliedRequestProcessor - 维护toBeApplied列表
    • FinalRequestProcessor - 把事务应用到ZKDatabase,提供查询功能,返回响应

    follower处理leader数据

    在follower中,Follower使用processPacket方法处理来自leader的数据包,此处看一下PROPOSAL和COMMIT的逻辑。

    PROPOSAL数据包

    fzk.logRequest(hdr, txn, digest);
    
    • 1

    logRequest会使用syncProcessor将事务写入到txnlog文件,之后调用SendAckRequestProcessor处理器给leader发ack数据包。

    leader收到超过半数的ack之后会发COMMIT数据包让各个节点将事务应用到ZKDatabase中。

    COMMIT数据包

    fzk.commit(qp.getZxid());
    
    • 1

    CommitProcessor处理器会将其提交到committedRequests队列,之后客户端Request会继续向下游FinalRequestProcessor处理器传递。

    FinalRequestProcessor处理器

    • 把事务应用到ZKDatabase中
    • 提供查询功能
    • 给客户端返回响应
  • 相关阅读:
    数据开发对部分报表的同步时效提出了很高的要求
    【VIM】VIm初步使用
    Unity WebGL RuntimeError: integer overflow(整数溢出问题)
    计算机网络——数据链路层(流量传输与可靠传输机制)
    为什么不直接操作State,而是要额外定义一个变量
    太可了,刷透这份“架构师养成手册”成就自己的架构之路
    1-4 Linux 标准目录结构FHS
    洛谷刷题C语言:数字游戏、面积、日期、苹果采购、字母转换
    电影【忠犬帕尔玛】
    盘点:专业OKR管理工具有哪些?
  • 原文地址:https://blog.csdn.net/xuguofeng2016/article/details/136290587