• Zookeeper临时节点删除时机解析


    前言:

    Zookeeper中的节点主要分为临时节点和持久节点。

    持久节点在创建之后,除非主动发起删除,否则节点会一直存在;

    而临时节点则不同,创建该节点的Session过期后,则该Session创建的所有临时节点都会被删除。

    本文主要来从源码的角度来分析下临时节点删除的全过程。

    1.SessionTrackImpl的心跳检测

    既然当Session过期后,Zookeeper会删除该Session创建的所有临时节点,那么我们就可以从Session的管理器SessionTrackImpl入手。

    1.1 SessionTrackImpl.run()

    1. public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
    2. synchronized public void run() {
    3. try {
    4. while (running) {
    5. currentTime = Time.currentElapsedTime();
    6. if (nextExpirationTime > currentTime) {
    7. this.wait(nextExpirationTime - currentTime);
    8. continue;
    9. }
    10. SessionSet set;
    11. // 直接删除到期时间的所有Session
    12. set = sessionSets.remove(nextExpirationTime);
    13. if (set != null) {
    14. for (SessionImpl s : set.sessions) {
    15. // 设置Session isClosing=true
    16. setSessionClosing(s.sessionId);
    17. // 设置session过期处理,重点在这里,具体见1.2
    18. expirer.expire(s);
    19. }
    20. }
    21. nextExpirationTime += expirationInterval;
    22. }
    23. } catch (InterruptedException e) {
    24. handleException(this.getName(), e);
    25. }
    26. LOG.info("SessionTrackerImpl exited loop!");
    27. }
    28. }

    SessionTrackImpl本质上还是session桶管理的模式,所以针对到期的session桶,则清理桶中的全部session。

    1.2 ZooKeeperServer.expire() 处理session过期信息

    1. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
    2. public void expire(Session session) {
    3. long sessionId = session.getSessionId();
    4. LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
    5. + ", timeout of " + session.getTimeout() + "ms exceeded");
    6. // 关闭session
    7. close(sessionId);
    8. }
    9. private void close(long sessionId) {
    10. // 提交一个关闭请求
    11. submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
    12. }
    13. private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
    14. int xid, ByteBuffer bb, List authInfo) {
    15. // 请求主要就是sessionId和操作类型 closeSession
    16. Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
    17. submitRequest(si);
    18. }
    19. public void submitRequest(Request si) {
    20. ...
    21. try {
    22. touch(si.cnxn);
    23. boolean validpacket = Request.isValid(si.type);
    24. if (validpacket) {
    25. // 直接交由firstProcessor处理
    26. firstProcessor.processRequest(si);
    27. if (si.cnxn != null) {
    28. incInProcess();
    29. }
    30. } else {
    31. LOG.warn("Received packet at server of unknown type " + si.type);
    32. new UnimplementedRequestProcessor().processRequest(si);
    33. }
    34. } ...
    35. }
    36. }

    从代码分析中可以看出,closeSession也被当做一个事务请求,请求体主要包含sessionId和操作类型。

    然后交由firstProcessor来处理。

    2.Processor处理closeSession请求

    2.1 PrepRequestProcessor.pRequest2Txn() 处理事务请求

    1. public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
    2. RequestProcessor {
    3. final List outstandingChanges = new ArrayList();
    4. final HashMap outstandingChangesForPath = new HashMap();
    5. // 处理请求
    6. protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize)
    7. throws KeeperException, IOException, RequestProcessorException {
    8. request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
    9. Time.currentWallTime(), type);
    10. ...
    11. switch (type) {
    12. case OpCode.create:
    13. ...
    14. case OpCode.closeSession:
    15. // 获取当前session创建的所有临时节点
    16. HashSet es = zks.getZKDatabase()
    17. .getEphemerals(request.sessionId);
    18. synchronized (zks.outstandingChanges) {
    19. for (ChangeRecord c : zks.outstandingChanges) {
    20. if (c.stat == null) {
    21. // Doing a delete
    22. es.remove(c.path);
    23. } else if (c.stat.getEphemeralOwner() == request.sessionId) {
    24. es.add(c.path);
    25. }
    26. }
    27. // 将临时节点删除事件包装成ChangeRecord对象放入outstandingChanges
    28. for (String path2Delete : es) {
    29. addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
    30. path2Delete, null, 0, null));
    31. }
    32. zks.sessionTracker.setSessionClosing(request.sessionId);
    33. }
    34. LOG.info("Processed session termination for sessionid: 0x"
    35. + Long.toHexString(request.sessionId));
    36. break;
    37. }
    38. }
    39. void addChangeRecord(ChangeRecord c) {
    40. synchronized (zks.outstandingChanges) {
    41. zks.outstandingChanges.add(c);
    42. zks.outstandingChangesForPath.put(c.path, c);
    43. }
    44. }

    PrepRequestProcessor只是对当前session创建的临时节点进行预处理,将这些临时节点的包装成ChangeRecord对象,并添加到zks.outstandingChanges、zks.outstandingChangesForPath两个集合中,用于后续processor处理

    2.2 FinalRequestProcessor.processRequest() 最终处理请求

    1. public class FinalRequestProcessor implements RequestProcessor {
    2. public void processRequest(Request request) {
    3. ...
    4. if (request.hdr != null) {
    5. TxnHeader hdr = request.hdr;
    6. Record txn = request.txn;
    7. // 重要处理在这里
    8. // 交由ZookeeperServer处理
    9. rc = zks.processTxn(hdr, txn);
    10. }
    11. }
    12. }

    2.2.1 ZooKeeperServer.processTxn() 处理事务请求

    1. public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
    2. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    3. ProcessTxnResult rc;
    4. int opCode = hdr.getType();
    5. long sessionId = hdr.getClientId();
    6. // 这里交由ZKDatabase处理,具体见2.2.2
    7. rc = getZKDatabase().processTxn(hdr, txn);
    8. if (opCode == OpCode.createSession) {
    9. if (txn instanceof CreateSessionTxn) {
    10. CreateSessionTxn cst = (CreateSessionTxn) txn;
    11. sessionTracker.addSession(sessionId, cst
    12. .getTimeOut());
    13. } else {
    14. LOG.warn("*****>>>>> Got "
    15. + txn.getClass() + " "
    16. + txn.toString());
    17. }
    18. } else if (opCode == OpCode.closeSession) {
    19. sessionTracker.removeSession(sessionId);
    20. }
    21. return rc;
    22. }
    23. }

    2.2.2 ZKDatabase.processTxn() 

    1. public class ZKDatabase {
    2. public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
    3. // 交由DataTree处理
    4. return dataTree.processTxn(hdr, txn);
    5. }
    6. }

    2.2.3 DataTree.processTxn() 处理事务请求

    1. public class DataTree {
    2. public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
    3. ProcessTxnResult rc = new ProcessTxnResult();
    4. try {
    5. rc.clientId = header.getClientId();
    6. rc.cxid = header.getCxid();
    7. rc.zxid = header.getZxid();
    8. rc.type = header.getType();
    9. rc.err = 0;
    10. rc.multiResult = null;
    11. switch (header.getType()) {
    12. case OpCode.create:
    13. ...
    14. case OpCode.closeSession:
    15. killSession(header.getClientId(), header.getZxid());
    16. break;
    17. }
    18. }
    19. }
    20. void killSession(long session, long zxid) {
    21. // 获取当前session所创建的临时节点
    22. HashSet list = ephemerals.remove(session);
    23. if (list != null) {
    24. for (String path : list) {
    25. try {
    26. // 具体处理
    27. deleteNode(path, zxid);
    28. if (LOG.isDebugEnabled()) {
    29. ...
    30. }
    31. } catch (NoNodeException e) {
    32. LOG.warn("Ignoring NoNodeException for path " + path
    33. + " while removing ephemeral for dead session 0x"
    34. + Long.toHexString(session));
    35. }
    36. }
    37. }
    38. }
    39. public void deleteNode(String path, long zxid)
    40. throws KeeperException.NoNodeException {
    41. int lastSlash = path.lastIndexOf('/');
    42. String parentName = path.substring(0, lastSlash);
    43. String childName = path.substring(lastSlash + 1);
    44. DataNode node = nodes.get(path);
    45. if (node == null) {
    46. throw new KeeperException.NoNodeException();
    47. }
    48. nodes.remove(path);
    49. synchronized (node) {
    50. aclCache.removeUsage(node.acl);
    51. }
    52. DataNode parent = nodes.get(parentName);
    53. if (parent == null) {
    54. throw new KeeperException.NoNodeException();
    55. }
    56. synchronized (parent) {
    57. // 删除父节点下该子节点信息
    58. parent.removeChild(childName);
    59. parent.stat.setPzxid(zxid);
    60. long eowner = node.stat.getEphemeralOwner();
    61. if (eowner != 0) {
    62. HashSet nodes = ephemerals.get(eowner);
    63. if (nodes != null) {
    64. // 删除该临时节点
    65. synchronized (nodes) {
    66. nodes.remove(path);
    67. }
    68. }
    69. }
    70. node.parent = null;
    71. }
    72. ...
    73. // 触发该临时节点的watch监听
    74. Set processed = dataWatches.triggerWatch(path,
    75. EventType.NodeDeleted);
    76. childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
    77. childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
    78. EventType.NodeChildrenChanged);
    79. }
    80. }

    总结:

    最终在FinalRequestProcessor中删除该session创建所有的临时节点。

    删除临时节点包含三个步骤:

    1.清理其父节点下当前节点信息

    2.删除当前临时节点信息

    3.触发当前节点的所有监听

  • 相关阅读:
    [C/C++]数据结构 链表OJ题:随机链表的复制
    【软件测试】理论知识基础第一章
    java八大包装类
    位图BitMap不好用?那来看看进化版本的RoaringBitmap,包您满意
    webgl 系列 —— 初识 WebGL
    前端悬浮菜单的实现方法及完整代码示例
    【JS】数据结构之队列
    我这两年的CSDN博客创作经历
    类型体系与基本数据类型(题目)
    单片机判断语句与位运算的坑
  • 原文地址:https://blog.csdn.net/qq_26323323/article/details/128158382