理清Zookeeper的Session创建、刷新和过期流程分析
明确Zookeeper的核心业务调用链
上文给大家讲过Zookeeper的应用了,实际上Zookeeper分为两个模块,server端和client端,server端实现了所有的zookeeper业务逻辑,而client端就是封装了server端的一些方法调用。既然存在两个模块,那肯定涉及到了网络通信,ZooKeeper中使用ServerCnxnFactory管理与客户端的连接,其有两个实现,一个是NIOServerCnxnFactory,使用Java原生NIO实现;一个是NettyServerCnxnFactory,使用netty实现;使用ServerCnxn代表一个客户端与服务端的连接。从单机版启动中可以发现Zookeeper默认通信组件为NIOServerCnxnFactory。接下来我们先来看看Zookeeper中通信的流程图,然后再详细分析源码。

下面给出了我们客户端发起连接请求及服务端建立会话的全流程图

上文中已经讲过,建立连接是通过new ZooKeeper方法完成的,在ZooKeeper的构造方法中会创建一个ClientCnxn对象,并调用该对象的start方法,在该方法中会启动两个线程任务:sendThread和eventThread。
而sendThread线程就是我们去建立连接的核心线程,在该线程的run方法中实际上是通过一个while循环,不断的执行,如果是第一次进来会去创建连接,如果连接状态是CONNECTED的话,则会最大不超过10秒去发送一次Ping请求保证连接不断开。
源码比较长,有些不重要的代码就直接省略了。
- public void run() {
- //发送Ping的间隔
- final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
- while (state.isAlive()) {
- try {
- //如果状态是CONNECTING的话就去创建连接
- if (!clientCnxnSocket.isConnected()) {
- startConnect(serverAddress);
- }
-
- //如果已经连接成功,则最大不超过10秒发送一次心跳
- if (state.isConnected()) {
- //这段逻辑实际上就是控制心跳的是发送间隔,避免过多的发送
- int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
- ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
- if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
- sendPing();
- clientCnxnSocket.updateLastSend();
- } else {
- if (timeToNextPing < to) {
- to = timeToNextPing;
- }
- }
- }
- }
- }
- void connect(InetSocketAddress addr) throws IOException {
- SocketChannel sock = createSock();
- try {
- //会调用ZK的服务端完成会话创建
- registerAndConnect(sock, addr);
- } catch (IOException e) {
-
- }
- }
-
- void registerAndConnect(SocketChannel sock, InetSocketAddress addr)
- throws IOException {
- sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
- //调用NIO开启会话
- boolean immediateConnect = sock.connect(addr);
-
- }
服务端由NIOServerCnxnFactory启动线程去接收请求,NIOServerCnxnFactory启动时会启动四类线程:
AcceptThread:该线程接收来自客户端的连接,并将其分配给SelectorThread(启动一个线程)。
SelectorThread:该线程执行select(),由于在处理大量连接时,select()会成为性能瓶颈,因此启动多个SelectorThread,使用系统属性zookeeper.nio.numSelectorThreads配置该类线程数,默认个数为 核心数/2。
WorkerThread:该线程执行基本的套接字读写,使用系统属性zookeeper.nio.numWorkerThreads配置该类线程数,默认为核心数∗2核心数∗2.如果该类线程数为0,则另外启动一线程进行IO处理,见下文worker thread介绍。
ConnectionExpirationThread:若连接上的session已过期,则关闭该连接。
该线程会接收客户端的请求
- public void run() {
- while (!stopped && !acceptSocket.socket().isClosed()) {
- select();
- }
- }
- private void select() {
- try {
- //查找就绪的连接
- selector.select();
-
- Iterator
selectedKeys = - selector.selectedKeys().iterator();
- while (!stopped && selectedKeys.hasNext()) {
-
- if (key.isAcceptable()) {
- //1:和当前服务建立链接。
- //2:获取远程客户端计算机地址信息。
- //3:判断当前链接是否超出最大限制。
- //4:调整为非阻塞模式。
- //5:轮询获取一个SelectorThread,将当前链接分配给该SelectorThread。
- //6:将当前请求添加到该SelectorThread的acceptedQueue中,并唤醒该SelectorThread。
- if (!doAccept()) {
- pauseAccept(10);
- }
- }
- }
- }
- }
进入到doAccept方法中
- private boolean doAccept() {
- ...
- try {
- //建立连接
- sc = acceptSocket.accept();
- accepted = true;
- //获取远程计算机地址信息
- InetAddress ia = sc.socket().getInetAddress();
- int cnxncount = getClientCnxnCount(ia);
-
- //判断是否超出最大客户端连接的限制
- if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
- ...
- }
-
- LOG.debug("Accepted socket connection from "
- + sc.socket().getRemoteSocketAddress());
- //调整此通道的阻塞模式
- sc.configureBlocking(false);
-
- //轮询将此连接分配给一个SelectorThread
- if (!selectorIterator.hasNext()) {
- selectorIterator = selectorThreads.iterator();
- }
- SelectorThread selectorThread = selectorIterator.next();
- //将新连接加入SelectorThread的acceptedQueue中,并唤醒SelectorThread
- if (!selectorThread.addAcceptedConnection(sc)) {
- ...
- }
- acceptErrorLogger.flush();
- } catch (IOException e) {
- ...
- }
- return accepted;
- }
- }
- public boolean addAcceptedConnection(SocketChannel accepted) {
- //将accepted添加到acceptedQueue
- if (stopped || !acceptedQueue.offer(accepted)) {
- return false;
- }
- //唤醒SelectorThread
- wakeupSelector();
- return true;
- }
在addAcceptedConnection方法中会唤醒SelectorThread,所以,接下来,逻辑会进入到SelectorThread.run方法中
该线程的主要作用是从Socket读取数据,并封装成workRequest,并将workRequest交给workerPool工作线程池处理,同时将acceptedQueue中未处理的连接取出,并未每个连接绑定OP_READ读事件,并封装对应的上下文对象NIOServerCnxn。SelectorThread的run方法如下:
- public void run() {
-
- //读取就绪的IO事件,交由worker thread处理,在ZookeeperServer的processPacket()中处理数据
- select();
- //把acceptedQueue队列中接收的连接,取出来注册OP_READ事件,
- //并添加NIOServerCnxn对象与当前key绑定
- //相当于给每个连接添加附加对象NIOServerCnxn(上下文对象)
- processAcceptedConnections();
- //遍历所有updateQueue,更新updateQueue中连接的监听事件
- processInterestOpsUpdateRequests();
- }
先来看看processAcceptedConnections方法,该方法中会为每个连接创建一个NIOServerCnxn对象,同时也会调用服务续约的逻辑
- private void processAcceptedConnections() {
- SocketChannel accepted;
- while (!stopped && (accepted = acceptedQueue.poll()) != null) {
- SelectionKey key = null;
- key = accepted.register(selector, SelectionKey.OP_READ);
- // 针对每个连接,创建一个NIOServerCnxn
- NIOServerCnxn cnxn = createConnection(accepted, key, this);
- key.attach(cnxn);
- addCnxn(cnxn);
- }
- }
这块不是很重要,我们不往深挖,接着回去看select方法
- private void select() {
- selector.select();
- Set
selected = selector.selectedKeys(); - ArrayList
selectedList = new ArrayList(selected); - Collections.shuffle(selectedList);
- Iterator
selectedKeys = selectedList.iterator(); - while (!stopped && selectedKeys.hasNext()) {
- SelectionKey key = selectedKeys.next();
- selected.remove(key);
- if (key.isReadable() || key.isWritable()) {
- //核心逻辑
- handleIO(key);
- }
- }
- }
handleIO()方法会封装当前SelectorThread为IOWorkRequest,并将IOWorkRequest交给workerPool来调度,而workerPool调度才是读数据的开始,源码如下:
- private void handleIO(SelectionKey key) {
- //将SelectorThread封装成workRequest对象
- IOWorkRequest workRequest = new IOWorkRequest(this, key);
- //处理服务续约的方法
- touchCnxn(cnxn);
- //将封装好的workRequest交给线程池去处理,在这里读取客户端数据
- workerPool.schedule(workRequest);
- }
我们先来看看处理续约的方法,不只是在这里调用了NIOServerCnxnFactory.touchCnxn(NIOServerCnxn)方法。
- public void touchCnxn(NIOServerCnxn cnxn) {
- cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
- }
进入到update方法中,会发现是ExpiryQueue中的一个方法,从名字上能看出来,ExpiryQueue实际上就是服务端管理session过期的队列
- // 维护每个NIOServerCnxn对应的过期时间
- private final ConcurrentHashMap
elemMap = new ConcurrentHashMap(); - // 维护每个过期时间对应的桶里有哪些NIOServerCnxn
- private final ConcurrentHashMap
> expiryMap = new ConcurrentHashMap>(); - private final AtomicLong nextExpirationTime = new AtomicLong();
-
- public Long update(E elem, int timeout) {
- Long prevExpiryTime = elemMap.get(elem);//获取当前NIOServerCnxn对应的过期时间
- long now = Time.currentElapsedTime();
- Long newExpiryTime = roundToNextInterval(now + timeout);//获取下次过期时间
- if (newExpiryTime.equals(prevExpiryTime)) {
- return null; // No change, so nothing to update
- }
- // First add the elem to the new expiry time bucket in expiryMap.
- Set
set = expiryMap.get(newExpiryTime); //拿到下一个过期时间的桶 - if (set == null) {
- // Construct a ConcurrentHashSet using a ConcurrentHashMap
- set = Collections.newSetFromMap(new ConcurrentHashMap
()); - // Put the new set in the map, but only if another thread hasn't beaten us to it
- Set
existingSet = expiryMap.putIfAbsent(newExpiryTime, set); - if (existingSet != null) {
- set = existingSet;
- }
- }
- set.add(elem); //把原来的NIOServerCnxn移动到新的桶里
-
- // Map the elem to the new expiry time. If a different previous
- // mapping was present, clean up the previous expiry bucket.
- prevExpiryTime = elemMap.put(elem, newExpiryTime);
- if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
- Set
prevSet = expiryMap.get(prevExpiryTime); - if (prevSet != null) {
- prevSet.remove(elem); //清空之前过期的桶
- }
- }
- return newExpiryTime;
- }
ok,简单了解了过期时间的更新,我们在回到之前讲的通过工作线程池去处理workRequest对象读取客户端数据的流程
WorkerThread相比上面的线程而言,调用关系颇为复杂,设计到了多个对象方法调用,主要用于处理IO,但并未对数据做出处理,数据处理将有业务链对象RequestProcessor处理,调用关系图如下:

- public void schedule(WorkRequest workRequest) {
- schedule(workRequest, 0);
- }
- public void schedule(WorkRequest workRequest, long id) {
- ScheduledWorkRequest scheduledWorkRequest = new ScheduledWorkRequest(workRequest);
- int size = workers.size();
- int workerNum = ((int) (id % size) + size) % size;
- ExecutorService worker = workers.get(workerNum);
- worker.execute(scheduledWorkRequest);
- }
- WorkerService.ScheduledWorkRequest
-
- private class ScheduledWorkRequest implements Runnable {
- @Override
- public void run() {
- //IOWorkRequest.doWork
- workRequest.doWork();
- }
- }
- private class IOWorkRequest extends WorkerService.WorkRequest {
- public void doWork() throws InterruptedException {
- if (key.isReadable() || key.isWritable()) {
- //执行IO数据处理
- cnxn.doIO(key);
- //再次见到这个方法,做服务续约的
- touchCnxn(cnxn);
- }
- }
- }
后面的一些细节我们就不展开了,通过doIO方法最终会调用到readPayload。
- private void readPayload() throws IOException, InterruptedException {
- if (incomingBuffer.remaining() != 0) { // have we read length bytes?
- int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
- if (rc < 0) {
- throw new EndOfStreamException(
- "Unable to read additional data from client sessionid 0x"
- + Long.toHexString(sessionId)
- + ", likely client has closed socket");
- }
- }
-
- if (incomingBuffer.remaining() == 0) { // have we read length bytes?
- packetReceived();
- incomingBuffer.flip();
- //第一次未初始化时,读取连接请求
- if (!initialized) {
- readConnectRequest();
- } else {
- readRequest();
- }
- lenBuffer.clear();
- incomingBuffer = lenBuffer;
- }
- }
此时如果initialized=false,表示第一次连接 需要创建Session(createSession),此处调用readConnectRequest()后,在readConnectRequest()方法中会将initialized设置为true,只有在处理完连接请求之后才会把initialized设置为true,才可以处理客户端其他命令。
- private void readConnectRequest() throws IOException, InterruptedException {
- if (!isZKServerRunning()) {
- throw new IOException("ZooKeeperServer not running");
- }
- zkServer.processConnectRequest(this, incomingBuffer);
- //下次进来就不会再来创建了
- initialized = true;
- }
上面方法还调用了processConnectRequest处理连接请求, processConnectRequest 第一次从请求中获取的sessionId=0,此时会把创建Session作为一个业务,会调用createSession()方法,processConnectRequest 方法部分关键代码如下:
- public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) {
- BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
- ConnectRequest connReq = new ConnectRequest();//创建连接请求
- connReq.deserialize(bia, "connect"); //反序列化连接请求参数
- long sessionId = connReq.getSessionId(); //创建一个sessionId
- int sessionTimeout = connReq.getTimeOut();
- byte[] passwd = connReq.getPasswd();
- cnxn.setSessionTimeout(sessionTimeout);
- if (sessionId == 0) {
- long id = createSession(cnxn, passwd, sessionTimeout); //创建session
- }
- }
创建会话调用createSession(),该方法会首先创建一个sessionId,并把该sessionId作为会话ID创建一个创建session会话的请求,并将该请求交给业务链作为一个业务处理,createSession()源码如下:
- long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
- if (passwd == null) {
- // Possible since it's just deserialized from a packet on the wire.
- passwd = new byte[0];
- }
- //sessionTracker去创建一个sessionId
- long sessionId = sessionTracker.createSession(timeout);
- Random r = new Random(sessionId ^ superSecret);
- r.nextBytes(passwd);
- ByteBuffer to = ByteBuffer.allocate(4);
- to.putInt(timeout);
- cnxn.setSessionId(sessionId);
- //创建一个OpCode.createSession请求(根据SessionId提交一个创建会话的业务)
- Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
- setLocalSessionFlag(si);
- //提交业务
- submitRequest(si);
- return sessionId;
- }
上面方法用到的sessionTracker.createSession(timeout)做了2个操作分别是创建sessionId和配置sessionId的跟踪信息,方法源码如下:
- public long createSession(int sessionTimeout) {
- //获取下一个SessionId
- long sessionId = nextSessionId.getAndIncrement();
- //Session跟踪配置
- addSession(sessionId, sessionTimeout);
- return sessionId;
- }
会话信息的跟踪其实就是将会话信息添加到队列中,任何地方可以根据会话ID找到会话信息,addSession方法实现了Session创建、Session队列存储、Session过期队列存储,trackSession方法源码如下:
- public synchronized boolean addSession(long id, int sessionTimeout) {
- sessionsWithTimeout.put(id, sessionTimeout);
-
- boolean added = false;
- //获取一个Session,如果为空,则以SessionId创建一个Session
- SessionImpl session = sessionsById.get(id);
- if (session == null){
- session = new SessionImpl(id, sessionTimeout);
- }
-
- // findbugs2.0.3 complains about get after put.
- // long term strategy would be use computeIfAbsent after JDK 1.8
- //Session存入到sessionById中,可以根据ID获取到Session
- SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
-
- if (existedSession != null) {
- session = existedSession;
- } else {
- added = true;
- LOG.debug("Adding session 0x" + Long.toHexString(id));
- }
-
- if (LOG.isTraceEnabled()) {
- String actionStr = added ? "Adding" : "Existing";
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
- "SessionTrackerImpl --- " + actionStr + " session 0x"
- + Long.toHexString(id) + " " + sessionTimeout);
- }
-
- //将Session添加到失效队列中
- updateSessionExpiry(session, sessionTimeout);
- return added;
- }
也可以叫服务续约,客户端除了PING请求以外,其他正常的CRUD请求也会对session续约,这里以PING请求为例
ClientCnxn.SendThread
- public void run() {
- clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
- clientCnxnSocket.updateNow();
- clientCnxnSocket.updateLastSendAndHeard();
- while (state.isAlive()) {
- //如果连接建立,每隔段时间发送PING请求
- if (state.isConnected()) {
- //1000(1 second) is to prevent race condition missing to send the second ping
- //also make sure not to send too many pings when readTimeout is small
- int timeToNextPing = readTimeout / 2
- - clientCnxnSocket.getIdleSend()
- - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
- //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
- if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
- //发送PING请求
- sendPing();
- clientCnxnSocket.updateLastSend();
- } else {
- if (timeToNextPing < to) {
- to = timeToNextPing;
- }
- }
- }
- }
- }
发送PING请求给服务端
- private void sendPing() {
- lastPingSentNs = System.nanoTime();
- RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
- queuePacket(h, null, null, null, null, null, null, null, null);
- }
- public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
- ...
- //这段逻辑实际上就是唤醒一个sendThread线程,其实再去调用一下sendThread.run方法,在这个方法里面会重新发请求到服务端
- sendThread.getClientCnxnSocket().packetAdded();
- return packet;
- }
- //重复执行sendThread.run
- public void run() {
- clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
- //发送请求
- clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
- }
- void doTransport(int waitTimeOut, List
pendingQueue, LinkedList outgoingQueue, - ClientCnxn cnxn)
- throws IOException, InterruptedException {
-
- for (SelectionKey k : selected) {
- SocketChannel sc = ((SocketChannel) k.channel());
- if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
- if (sc.finishConnect()) {
- updateLastSendAndHeard();
- //发送PING
- sendThread.primeConnection();
- }
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- doIO(pendingQueue, outgoingQueue, cnxn);
- }
- }
- if (sendThread.getZkState().isConnected()) {
- synchronized(outgoingQueue) {
- if (findSendablePacket(outgoingQueue,
- cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
- enableWrite();
- }
- }
- }
- selected.clear();
- }
服务端会收到客户端的PING请求,同样也是AcceptedThread接收请求,然后执行的逻辑同Session的创建流程,最后进入到了SelectorThread.run——>select()——>handleIO(key)——>touchCnxn(cnxn)。
通过Session创建的源码分析其实大家应该也能看出来,对于Session的过期属性的管理的是SessionTrackerImpl这个类,而它也是一个线程类,继承了 ZooKeeperCriticalThread ,我们可以看它的run方法,它首先获取了下一个会话过期时间,并休眠等待会话过期时间到期,然后获取过期的客户端会话集合并循环关闭。
- public void run() {
- try {
- while (running) {
- //获取下一个失效时间
- long waitTime = sessionExpiryQueue.getWaitTime();
- if (waitTime > 0) {
- //休眠
- Thread.sleep(waitTime);
- continue;
- }
- //获取失效的客户端会话集合
- for (SessionImpl s : sessionExpiryQueue.poll()) {
- //把Session会话的 isClosing 状态设置为了true
- setSessionClosing(s.sessionId);
- //让客户端会话失效
- expirer.expire(s);
- }
- }
- } catch (InterruptedException e) {
- handleException(this.getName(), e);
- }
- LOG.info("SessionTrackerImpl exited loop!");
- }
让客户端失效的方法 expirer.expire(s); 其实也是一个业务操作,主要调用了ZooKeeperServer.expire() 方法,而该方法获取SessionId后,又创建了一个OpCode.closeSession 的请求,并交给业务链处理,我们查看 ZooKeeperServer.expire() 方法源码如下:
- public void expire(Session session) {
- long sessionId = session.getSessionId();
- LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
- + ", timeout of " + session.getTimeout() + "ms exceeded");
- close(sessionId);
- }
- private void close(long sessionId) {
- //创建一个OpCode.closeSession业务请求
- Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
- setLocalSessionFlag(si);
- //提交给业务链处理
- submitRequest(si);
- }
道理发现又出现了submitRequest方法,这里暂时先不讲,在下文的请求处理中再详细介绍,在这里只需要知道,我们会调用该方法将我们的session关闭就好了。
zookeeper 的业务处理流程就像工作流一样,其实就是一个单链表;在zookeeper启动的时候,会确立各个节点的角色特性,即leader、follower和observer,每个角色确立后,就会初始化它的工作责任链;
客户端请求过来,每次执行不同事务操作的时候,Zookeeper也提供了一套业务处理流程RequestProcessor。
我们来看一下RequestProcessor初始化流程,ZooKeeperServer.setupRequestProcessors()方法源码如下:
- /**
- * 初始化业务处理流程
- */
- protected void setupRequestProcessors() {
- //创建FinalRequestProcessor
- RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- //创建SyncRequestProcessor,并将FinalProcessor作为它下一个业务链
- RequestProcessor syncProcessor = new SyncRequestProcessor(this,
- finalProcessor);
- //启动syncProcessor
- ((SyncRequestProcessor)syncProcessor).start();
- //创建PrepRequestProcessor,并作为第一个处理业务的RequestProcessor,将syncProcessor作为它的下一个业务链
- firstProcessor = new PrepRequestProcessor(this, syncProcessor);
- //启动firstProcessor
- ((PrepRequestProcessor)firstProcessor).start();
- }
syncProcessor创建时,将finalProcessor作为参数传递进来源码如下:
- /**
- * 创建SyncRequestProcessor,下一个责任链 FinalRequestProcessor
- * @param zks
- * @param nextProcessor
- */
- public SyncRequestProcessor(ZooKeeperServer zks,
- RequestProcessor nextProcessor) {
- super("SyncThread:" + zks.getServerId(), zks
- .getZooKeeperServerListener());
- this.zks = zks;
- //下一个责任链
- this.nextProcessor = nextProcessor;
- running = true;
- }
firstProcessor创建时,将syncProcessor作为参数传递进来源码如下:
- public PrepRequestProcessor(ZooKeeperServer zks,
- RequestProcessor nextProcessor) {
- super("ProcessThread(sid:" + zks.getServerId() + " cport:"
- + zks.getClientPort() + "):", zks.getZooKeeperServerListener());
- this.nextProcessor = nextProcessor;
- this.zks = zks;
- }
PrepRequestProcessor/SyncRequestProcessor关系图:

PrepRequestProcessor和SyncRequestProcessor的结构一样,都是实现了Thread的一个线程,所以在这里初始化时便启动了这两个线程。
PrepRequestProcessor是请求处理器的第1个处理器,我们把之前的请求业务处理衔接起来,一步一步分析。ZooKeeperServer.processPacket()>submitRequest()>enqueueRequest()>RequestThrottler.submitRequest() ,我们来看下RequestThrottler.submitRequest()源码,它将当前请求添加到submittedRequests队列中了,源码如下:
在submitRequest中会执行firstProcessor.processRequest方法,会进入到PrepRequestProcessor.processRequest(request)
- public void processRequest(Request request) {
- submittedRequests.add(request);
- }
-
- public void run() {
- while (true) {
- Request request = submittedRequests.take();
- pRequest(request);
- }
- }
-
- protected void pRequest(Request request) throws RequestProcessorException {
- request.setHdr(null);
- request.setTxn(null);
- switch (request.type) {
- case OpCode.createSession: //针对连接请求做处理
- case OpCode.closeSession:
- if (!request.isLocalSession()) {
- pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
- }
- break;
- }
- }
-
- protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) {
- switch (type) {
- case OpCode.createSession:
- int to = request.request.getInt();
- request.setTxn(new CreateSessionTxn(to));
- zks.sessionTracker.trackSession(request.sessionId, to);
- zks.setOwner(request.sessionId, request.getOwner());
- break;
- }
- }
从代码可以看出pRequest2Txn()方法主要做了权限校验、快照记录、事务信息记录相关的事,还并未涉及数据处理,也就是说PrepRequestProcessor其实是做了操作前权限校验、快照记录、事务信息记录相关的事。
分析了PrepRequestProcessor处理器后,接着来分析SyncRequestProcessor,该处理器主要是将请求数据高效率存入磁盘,并且请求在写入磁盘之前是不会被转发到下个处理器的。
我们先看请求被添加到队列的方法:
- public void processRequest(Request request) {
- // request.addRQRec(">sync");
- //将请求添加到queueRequest队列中
- queuedRequests.add(request);
- }
同样SyncRequestProcessor是一个线程,执行队列中的请求也在线程中触发,我们看它的run方法,源码如下:
- public void run() {
- try {
- int logCount = 0;
-
- // we do this in an attempt to ensure that not all of the servers
- // in the ensemble take a snapshot at the same time
- int randRoll = r.nextInt(snapCount/2);
- while (true) {
- Request si = null;
- if (toFlush.isEmpty()) {
- //阻塞方法获取一个请求
- si = queuedRequests.take();
- } else {
- si = queuedRequests.poll();
- if (si == null) {
- flush(toFlush);
- continue;
- }
- }
- if (si == requestOfDeath) {
- break;
- }
- if (si != null) {
- // track the number of records written to the log
- if (zks.getZKDatabase().append(si)) {
- logCount++;
- if (logCount > (snapCount / 2 + randRoll)) {
- randRoll = r.nextInt(snapCount/2);
- // roll the log
- //重置上次rollLog以来的txn数量
- zks.getZKDatabase().rollLog();
- // take a snapshot
- if (snapInProcess != null && snapInProcess.isAlive()) {
- LOG.warn("Too busy to snap, skipping");
- } else {
- snapInProcess = new ZooKeeperThread("Snapshot Thread") {
- public void run() {
- try {
- //保存快照数据
- zks.takeSnapshot();
- } catch(Exception e) {
- LOG.warn("Unexpected exception", e);
- }
- }
- };
- snapInProcess.start();
- }
- logCount = 0;
- }
- } else if (toFlush.isEmpty()) {
- // optimization for read heavy workloads
- // iff this is a read, and there are no pending
- // flushes (writes), then just pass this to the next
- // processor
- if (nextProcessor != null) {
- nextProcessor.processRequest(si);
- if (nextProcessor instanceof Flushable) {
- ((Flushable)nextProcessor).flush();
- }
- }
- continue;
- }
- //将当前请求添加到toFlush队列中,toFlush队列是已经写入并等待刷新到磁盘的事务
- toFlush.add(si);
- if (toFlush.size() > 1000) {
- //提交数据
- flush(toFlush);
- }
- }
- }
- } catch (Throwable t) {
- handleException(this.getName(), t);
- } finally{
- running = false;
- }
- LOG.info("SyncRequestProcessor exited!");
- }
run方法会从queuedRequests队列中获取一个请求,如果获取不到就会阻塞等待直到获取到一个请求对象,程序才会继续往下执行,接下来会调用Snapshot Thread线程实现将客户端发送的数据以快照的方式写入磁盘,最终调用flush()方法实现数据提交,flush()方法源码如下:
- private void flush(LinkedList
toFlush) - throws IOException, RequestProcessorException
- {
- if (toFlush.isEmpty())
- return;
- //数据提交
- zks.getZKDatabase().commit();
- while (!toFlush.isEmpty()) {
- Request i = toFlush.remove();
- if (nextProcessor != null) {
- //调用下一个业务链
- nextProcessor.processRequest(i);
- }
- }
- if (nextProcessor != null && nextProcessor instanceof Flushable) {
- ((Flushable)nextProcessor).flush();
- }
- }
flush()方法实现了数据提交,并且会将请求交给下一个业务链,下一个业务链为FinalRequestProcessor。
前面分析了SyncReqeustProcessor,接着分析请求处理链中最后的一个处理器FinalRequestProcessor,该业务处理对象主要用于返回Response。
在SyncRequestProcessor对txn(创建session的操作)进行持久化,在FinalRequestProcessor会对Session进行提交,其实就是把Session的ID和Timeout存到sessionsWithTimeout中去。
- public void processRequest(Request request) {
- ProcessTxnResult rc = zks.processTxn(request);
- switch (request.type) {
- case OpCode.createSession: {
- lastOp = "SESS";
- updateStats(request, lastOp, lastZxid);
- zks.finishSessionInit(request.cnxn, true);
- return;
- }
- }
- if (path == null || rsp == null) {
- cnxn.sendResponse(hdr, rsp, "response"); //服务端将请求返回,这时客户端会收到服务端响应
- }
- }
-
- public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
- // register with JMX
- if (valid) {
- if (serverCnxnFactory != null && serverCnxnFactory.cnxns.contains(cnxn)) {
- serverCnxnFactory.registerConnection(cnxn);
- } else if (secureServerCnxnFactory != null && secureServerCnxnFactory.cnxns.contains(cnxn)) {
- secureServerCnxnFactory.registerConnection(cnxn);
- }
- }
- }
调用sendResponse方法后会将请求信息返回给客户端
客户端收到服务端响应
- ClientCnxnSocketNIO.doIO(Queue
, ClientCnxn) -
- void doIO(Queue
pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException { - if (sockKey.isReadable()) {
- if (!initialized) {
- readConnectResult();
- }
- }
- }
-
- void readConnectResult() throws IOException {
- ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
- BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ConnectResponse conRsp = new ConnectResponse();
- conRsp.deserialize(bbia, "connect");
- this.sessionId = conRsp.getSessionId(); //连接建立完成
- sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
- }
理解Zookeeper的watcher机制原理