Zookeeper中的节点主要分为临时节点和持久节点。
持久节点在创建之后,除非主动发起删除,否则节点会一直存在;
而临时节点则不同,创建该节点的Session过期后,则该Session创建的所有临时节点都会被删除。
本文主要来从源码的角度来分析下临时节点删除的全过程。
既然当Session过期后,Zookeeper会删除该Session创建的所有临时节点,那么我们就可以从Session的管理器SessionTrackImpl入手。
- public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
-
- synchronized public void run() {
- try {
- while (running) {
- currentTime = Time.currentElapsedTime();
- if (nextExpirationTime > currentTime) {
- this.wait(nextExpirationTime - currentTime);
- continue;
- }
- SessionSet set;
- // 直接删除到期时间的所有Session
- set = sessionSets.remove(nextExpirationTime);
- if (set != null) {
- for (SessionImpl s : set.sessions) {
- // 设置Session isClosing=true
- setSessionClosing(s.sessionId);
- // 设置session过期处理,重点在这里,具体见1.2
- expirer.expire(s);
- }
- }
- nextExpirationTime += expirationInterval;
- }
- } catch (InterruptedException e) {
- handleException(this.getName(), e);
- }
- LOG.info("SessionTrackerImpl exited loop!");
- }
- }
SessionTrackImpl本质上还是session桶管理的模式,所以针对到期的session桶,则清理桶中的全部session。
- public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
- public void expire(Session session) {
- long sessionId = session.getSessionId();
- LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
- + ", timeout of " + session.getTimeout() + "ms exceeded");
- // 关闭session
- close(sessionId);
- }
-
- private void close(long sessionId) {
- // 提交一个关闭请求
- submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
- }
-
- private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
- int xid, ByteBuffer bb, List
authInfo) { - // 请求主要就是sessionId和操作类型 closeSession
- Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
- submitRequest(si);
- }
-
- public void submitRequest(Request si) {
- ...
- try {
- touch(si.cnxn);
- boolean validpacket = Request.isValid(si.type);
- if (validpacket) {
- // 直接交由firstProcessor处理
- firstProcessor.processRequest(si);
- if (si.cnxn != null) {
- incInProcess();
- }
- } else {
- LOG.warn("Received packet at server of unknown type " + si.type);
- new UnimplementedRequestProcessor().processRequest(si);
- }
- } ...
- }
- }
从代码分析中可以看出,closeSession也被当做一个事务请求,请求体主要包含sessionId和操作类型。
然后交由firstProcessor来处理。
- public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
- RequestProcessor {
-
- final List
outstandingChanges = new ArrayList(); - final HashMap
outstandingChangesForPath = new HashMap(); -
- // 处理请求
- protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
- throws KeeperException, IOException, RequestProcessorException {
-
- request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
- Time.currentWallTime(), type);
- ...
- switch (type) {
- case OpCode.create:
- ...
- case OpCode.closeSession:
- // 获取当前session创建的所有临时节点
- HashSet
es = zks.getZKDatabase() - .getEphemerals(request.sessionId);
- synchronized (zks.outstandingChanges) {
- for (ChangeRecord c : zks.outstandingChanges) {
- if (c.stat == null) {
- // Doing a delete
- es.remove(c.path);
- } else if (c.stat.getEphemeralOwner() == request.sessionId) {
- es.add(c.path);
- }
- }
- // 将临时节点删除事件包装成ChangeRecord对象放入outstandingChanges
- for (String path2Delete : es) {
- addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
- path2Delete, null, 0, null));
- }
-
- zks.sessionTracker.setSessionClosing(request.sessionId);
- }
-
- LOG.info("Processed session termination for sessionid: 0x"
- + Long.toHexString(request.sessionId));
- break;
- }
- }
-
- void addChangeRecord(ChangeRecord c) {
- synchronized (zks.outstandingChanges) {
- zks.outstandingChanges.add(c);
- zks.outstandingChangesForPath.put(c.path, c);
- }
- }
PrepRequestProcessor只是对当前session创建的临时节点进行预处理,将这些临时节点的包装成ChangeRecord对象,并添加到zks.outstandingChanges、zks.outstandingChangesForPath两个集合中,用于后续processor处理
- public class FinalRequestProcessor implements RequestProcessor {
- public void processRequest(Request request) {
- ...
- if (request.hdr != null) {
- TxnHeader hdr = request.hdr;
- Record txn = request.txn;
- // 重要处理在这里
- // 交由ZookeeperServer处理
- rc = zks.processTxn(hdr, txn);
- }
- }
- }
2.2.1 ZooKeeperServer.processTxn() 处理事务请求
- public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
- public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
- ProcessTxnResult rc;
- int opCode = hdr.getType();
- long sessionId = hdr.getClientId();
- // 这里交由ZKDatabase处理,具体见2.2.2
- rc = getZKDatabase().processTxn(hdr, txn);
- if (opCode == OpCode.createSession) {
- if (txn instanceof CreateSessionTxn) {
- CreateSessionTxn cst = (CreateSessionTxn) txn;
- sessionTracker.addSession(sessionId, cst
- .getTimeOut());
- } else {
- LOG.warn("*****>>>>> Got "
- + txn.getClass() + " "
- + txn.toString());
- }
- } else if (opCode == OpCode.closeSession) {
- sessionTracker.removeSession(sessionId);
- }
- return rc;
- }
- }
2.2.2 ZKDatabase.processTxn()
- public class ZKDatabase {
- public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
- // 交由DataTree处理
- return dataTree.processTxn(hdr, txn);
- }
- }
2.2.3 DataTree.processTxn() 处理事务请求
- public class DataTree {
- public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
- ProcessTxnResult rc = new ProcessTxnResult();
-
- try {
- rc.clientId = header.getClientId();
- rc.cxid = header.getCxid();
- rc.zxid = header.getZxid();
- rc.type = header.getType();
- rc.err = 0;
- rc.multiResult = null;
- switch (header.getType()) {
- case OpCode.create:
- ...
- case OpCode.closeSession:
- killSession(header.getClientId(), header.getZxid());
- break;
- }
- }
- }
-
- void killSession(long session, long zxid) {
- // 获取当前session所创建的临时节点
- HashSet
list = ephemerals.remove(session); - if (list != null) {
- for (String path : list) {
- try {
- // 具体处理
- deleteNode(path, zxid);
- if (LOG.isDebugEnabled()) {
- ...
- }
- } catch (NoNodeException e) {
- LOG.warn("Ignoring NoNodeException for path " + path
- + " while removing ephemeral for dead session 0x"
- + Long.toHexString(session));
- }
- }
- }
- }
-
- public void deleteNode(String path, long zxid)
- throws KeeperException.NoNodeException {
- int lastSlash = path.lastIndexOf('/');
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- DataNode node = nodes.get(path);
- if (node == null) {
- throw new KeeperException.NoNodeException();
- }
- nodes.remove(path);
- synchronized (node) {
- aclCache.removeUsage(node.acl);
- }
- DataNode parent = nodes.get(parentName);
- if (parent == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (parent) {
- // 删除父节点下该子节点信息
- parent.removeChild(childName);
- parent.stat.setPzxid(zxid);
- long eowner = node.stat.getEphemeralOwner();
- if (eowner != 0) {
- HashSet
nodes = ephemerals.get(eowner); - if (nodes != null) {
- // 删除该临时节点
- synchronized (nodes) {
- nodes.remove(path);
- }
- }
- }
- node.parent = null;
- }
- ...
-
- // 触发该临时节点的watch监听
- Set
processed = dataWatches.triggerWatch(path, - EventType.NodeDeleted);
- childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
- childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
- EventType.NodeChildrenChanged);
- }
- }
最终在FinalRequestProcessor中删除该session创建所有的临时节点。
删除临时节点包含三个步骤:
1.清理其父节点下当前节点信息
2.删除当前临时节点信息
3.触发当前节点的所有监听