• nacos注册发现原理


     注册发现流程

    1. 客户端将ip,端口号,命名空间等封装成instance;
    2. 客户端定时发送instance给服务端;
    3. 服务端接收请求,判断instance是否已经存在;
    4. 服务端判断已存在刷新心跳;
    5. 服务端判断instance不存在则存进注册表中,一个set集合;

     客户端

    注册自身服务

    nacos客户端监听了启动事件,调用start()方法,封装注册信息成instance对象(ip,port,weight,group,namspace...),用nacosRestTemplate发送rest请求到服务端

    spring.factories

    ​ 监听事件

    start()方法

    ​ 将instance放到map中去

     发送rest请求

     服务端接口地址

     与官网手册的注册接口一致 

    服务端

    查找接口/nacos/v1/ns/instance 的post方法

    ​创建service 时,里边有一个双层map,就是隔离命名空间,group的数据结构

    1. /**
    2. * Map(namespace, Map(group::serviceName, Service)).
    3. */
    4. private final Map> serviceMap = new ConcurrentHashMap<>();

    1. /*
    2. * Copyright 1999-2018 Alibaba Group Holding Ltd.
    3. *
    4. * Licensed under the Apache License, Version 2.0 (the "License");
    5. * you may not use this file except in compliance with the License.
    6. * You may obtain a copy of the License at
    7. *
    8. * http://www.apache.org/licenses/LICENSE-2.0
    9. *
    10. * Unless required by applicable law or agreed to in writing, software
    11. * distributed under the License is distributed on an "AS IS" BASIS,
    12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13. * See the License for the specific language governing permissions and
    14. * limitations under the License.
    15. */
    16. package com.alibaba.nacos.naming.consistency.ephemeral.distro;
    17. import com.alibaba.nacos.api.common.Constants;
    18. import com.alibaba.nacos.api.exception.NacosException;
    19. import com.alibaba.nacos.consistency.DataOperation;
    20. import com.alibaba.nacos.core.distributed.distro.DistroConfig;
    21. import com.alibaba.nacos.core.distributed.distro.DistroProtocol;
    22. import com.alibaba.nacos.core.distributed.distro.component.DistroDataProcessor;
    23. import com.alibaba.nacos.core.distributed.distro.entity.DistroData;
    24. import com.alibaba.nacos.core.distributed.distro.entity.DistroKey;
    25. import com.alibaba.nacos.naming.cluster.ServerStatus;
    26. import com.alibaba.nacos.naming.cluster.transport.Serializer;
    27. import com.alibaba.nacos.naming.consistency.Datum;
    28. import com.alibaba.nacos.naming.consistency.KeyBuilder;
    29. import com.alibaba.nacos.naming.consistency.RecordListener;
    30. import com.alibaba.nacos.naming.consistency.ephemeral.EphemeralConsistencyService;
    31. import com.alibaba.nacos.naming.consistency.ephemeral.distro.combined.DistroHttpCombinedKey;
    32. import com.alibaba.nacos.naming.core.DistroMapper;
    33. import com.alibaba.nacos.naming.core.Instances;
    34. import com.alibaba.nacos.naming.core.Service;
    35. import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
    36. import com.alibaba.nacos.naming.misc.GlobalConfig;
    37. import com.alibaba.nacos.naming.misc.GlobalExecutor;
    38. import com.alibaba.nacos.naming.misc.Loggers;
    39. import com.alibaba.nacos.naming.misc.SwitchDomain;
    40. import com.alibaba.nacos.naming.pojo.Record;
    41. import com.alibaba.nacos.sys.utils.ApplicationUtils;
    42. import com.alibaba.nacos.common.utils.StringUtils;
    43. import org.javatuples.Pair;
    44. import org.springframework.context.annotation.DependsOn;
    45. import javax.annotation.PostConstruct;
    46. import java.util.ArrayList;
    47. import java.util.List;
    48. import java.util.Map;
    49. import java.util.Objects;
    50. import java.util.Optional;
    51. import java.util.concurrent.ArrayBlockingQueue;
    52. import java.util.concurrent.BlockingQueue;
    53. import java.util.concurrent.ConcurrentHashMap;
    54. import java.util.concurrent.ConcurrentLinkedQueue;
    55. @DependsOn("ProtocolManager")
    56. @org.springframework.stereotype.Service("distroConsistencyService")
    57. public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
    58. private static final String ON_RECEIVE_CHECKSUMS_PROCESSING_TAG = "1";
    59. private final DistroMapper distroMapper;
    60. private final DataStore dataStore;
    61. private final Serializer serializer;
    62. private final SwitchDomain switchDomain;
    63. private final GlobalConfig globalConfig;
    64. private final DistroProtocol distroProtocol;
    65. private volatile Notifier notifier = new Notifier();
    66. private Map> listeners = new ConcurrentHashMap<>();
    67. private Map syncChecksumTasks = new ConcurrentHashMap<>(16);
    68. public DistroConsistencyServiceImpl(DistroMapper distroMapper, DataStore dataStore, Serializer serializer,
    69. SwitchDomain switchDomain, GlobalConfig globalConfig, DistroProtocol distroProtocol) {
    70. this.distroMapper = distroMapper;
    71. this.dataStore = dataStore;
    72. this.serializer = serializer;
    73. this.switchDomain = switchDomain;
    74. this.globalConfig = globalConfig;
    75. this.distroProtocol = distroProtocol;
    76. }
    77. //初始化加载
    78. @PostConstruct
    79. public void init() {
    80. GlobalExecutor.submitDistroNotifyTask(notifier);
    81. }
    82. @Override
    83. public void put(String key, Record value) throws NacosException {
    84. //单机注册
    85. onPut(key, value);
    86. // If upgrade to 2.0.X, do not sync for v1.
    87. if (ApplicationUtils.getBean(UpgradeJudgement.class).isUseGrpcFeatures()) {
    88. return;
    89. }
    90. distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
    91. DistroConfig.getInstance().getSyncDelayMillis());
    92. }
    93. @Override
    94. public void remove(String key) throws NacosException {
    95. onRemove(key);
    96. listeners.remove(key);
    97. }
    98. @Override
    99. public Datum get(String key) throws NacosException {
    100. return dataStore.get(key);
    101. }
    102. /**
    103. * Put a new record.
    104. *
    105. * @param key key of record
    106. * @param value record
    107. */
    108. public void onPut(String key, Record value) {
    109. if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
    110. Datum datum = new Datum<>();
    111. datum.value = (Instances) value;
    112. datum.key = key;
    113. datum.timestamp.incrementAndGet();
    114. dataStore.put(key, datum);
    115. }
    116. if (!listeners.containsKey(key)) {
    117. return;
    118. }
    119. //丢进阻塞队列中去
    120. notifier.addTask(key, DataOperation.CHANGE);
    121. }
    122. public boolean isInitialized() {
    123. return distroProtocol.isInitialized() || !globalConfig.isDataWarmup();
    124. }
    125. public class Notifier implements Runnable {
    126. private ConcurrentHashMap services = new ConcurrentHashMap<>(10 * 1024);
    127. private BlockingQueue> tasks = new ArrayBlockingQueue<>(1024 * 1024);
    128. /**
    129. * Add new notify task to queue.
    130. *
    131. * @param datumKey data key
    132. * @param action action for data
    133. */
    134. public void addTask(String datumKey, DataOperation action) {
    135. if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
    136. return;
    137. }
    138. if (action == DataOperation.CHANGE) {
    139. services.put(datumKey, StringUtils.EMPTY);
    140. }
    141. tasks.offer(Pair.with(datumKey, action));
    142. }
    143. public int getTaskSize() {
    144. return tasks.size();
    145. }
    146. @Override
    147. public void run() {
    148. Loggers.DISTRO.info("distro notifier started");
    149. for (; ; ) {
    150. try {
    151. Pair pair = tasks.take();
    152. // 注册流程
    153. handle(pair);
    154. } catch (Throwable e) {
    155. Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    156. }
    157. }
    158. }
    159. //注册核心流程
    160. private void handle(Pair pair) {
    161. try {
    162. String datumKey = pair.getValue0();
    163. DataOperation action = pair.getValue1();
    164. services.remove(datumKey);
    165. int count = 0;
    166. if (!listeners.containsKey(datumKey)) {
    167. return;
    168. }
    169. for (RecordListener listener : listeners.get(datumKey)) {
    170. count++;
    171. try {
    172. if (action == DataOperation.CHANGE) {
    173. //注册核心流程
    174. listener.onChange(datumKey, dataStore.get(datumKey).value);
    175. continue;
    176. }
    177. if (action == DataOperation.DELETE) {
    178. listener.onDelete(datumKey);
    179. continue;
    180. }
    181. } catch (Throwable e) {
    182. Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
    183. }
    184. }
    185. if (Loggers.DISTRO.isDebugEnabled()) {
    186. Loggers.DISTRO
    187. .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
    188. datumKey, count, action.name());
    189. }
    190. } catch (Throwable e) {
    191. Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    192. }
    193. }
    194. }
    195. }

     listener.onChange(datumKey, dataStore.get(datumKey).value);方法里边调用到写时复制方法public void updateIps(List ips, boolean ephemeral);

    1. //写时复制代码
    2. public void updateIps(List ips, boolean ephemeral) {
    3. //是不是临时节点,是复制ephemeralInstances一份出来,ephemeralInstances是nacos所有临时节点
    4. Set toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    5. //写入代码...
    6. //复制代码
    7. if (ephemeral) {
    8. ephemeralInstances = toUpdateInstances;
    9. } else {
    10. persistentInstances = toUpdateInstances;
    11. }
    12. }

  • 相关阅读:
    Docker安装Jenkins
    【基于C的排序算法】插入排序之希尔排序
    节省时间,创造价值:人工智能在工作中的实际应用
    Apache基于IP和端口
    数据分析 - 机器学习
    YOLO系列简单汇总【个人笔记】
    操作系统——进程同步、进程互斥及其实现方式、信号量机制
    通过SpringBoot+Vue+ElementUI+EasyExcel实现文件的导入导出
    IDEA版SSM入门到实战(Maven+MyBatis+Spring+SpringMVC) -Maven核心概念
    使用POI实现基于Excel的考试成绩分析
  • 原文地址:https://blog.csdn.net/yanjianpeng_2018/article/details/126701008