• Nacos注册中心8-Server端(处理注册请求)


    0. 环境

    • nacos版本:1.4.1
    • Spring Cloud : 2020.0.2
    • Spring Boot :2.4.4
    • Spring Cloud alibaba: 2.2.5.RELEASE

    测试代码:github.com/hsfxuebao/s…

    1. InstanceController#register

    nacos注册中心功能是在naming这个子项目下面的,是个springboot项目,然后我们直接找controller就可以了,找到这个InstanceController,第一个方法就是服务注册方法(register方法):

    1. @CanDistro
    2. @PostMapping
    3. @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    4. public String register(HttpServletRequest request) throws Exception {
    5. // 从请求中获取指定属性值
    6. final String namespaceId = WebUtils
    7. .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    8. // 从请求中获取指定属性值
    9. final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    10. // 检测serviceName是否合法
    11. NamingUtils.checkServiceNameFormat(serviceName);
    12. // 通过请求参数组装出instance
    13. final Instance instance = parseInstance(request);
    14. // todo 将instance写到注册表
    15. serviceManager.registerInstance(namespaceId, serviceName, instance);
    16. return "ok";
    17. }
    18. 复制代码

    先是解析出来instance,就是根据client发送的那堆参数解析出来的。接着就是调用serviceManager组件进行实例注册,这个serviceManager 组件在注册中心是个核心组件(服务注册,下线,获取服务列表等),都是找这个组件的。

    2. ServiceManager#registerInstance

    1. public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    2. // 创建 空service
    3. // 第三个参数 true表示临时实例
    4. createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    5. // 从注册表获取到service
    6. Service service = getService(namespaceId, serviceName);
    7. // 这里指定不能为null
    8. if (service == null) {
    9. throw new NacosException(NacosException.INVALID_PARAM,
    10. "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    11. }
    12. // todo 将instance写入到service,即写入到了注册表
    13. addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    14. }
    15. 复制代码

    这几个步骤都很重要,我们挨个看看,createEmptyService这个方法就是当service 不存在的时候,创建一个空的serivce,这service你可以理解为服务,其实就是服务的意思,看下这个方法

    2.1 ServiceManager#createEmptyService

    1. public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
    2. // local为true,表示当前实例为临时实例
    3. createServiceIfAbsent(namespaceId, serviceName, local, null);
    4. }
    5. public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
    6. throws NacosException {
    7. // 从注册表中获取service
    8. Service service = getService(namespaceId, serviceName);
    9. // 若当前注册instance是其提供服务的第一个实例,则注册表中是没有该service的,
    10. // 此时会创建一个service实例
    11. if (service == null) {
    12. Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
    13. service = new Service();
    14. service.setName(serviceName);
    15. service.setNamespaceId(namespaceId);
    16. service.setGroupName(NamingUtils.getGroupName(serviceName));
    17. // now validate the service. if failed, exception will be thrown
    18. // 修改时间
    19. service.setLastModifiedMillis(System.currentTimeMillis());
    20. // todo 重新计算校验和
    21. service.recalculateChecksum();
    22. if (cluster != null) {
    23. // cluster与service建立联系
    24. cluster.setService(service);
    25. service.getClusterMap().put(cluster.getName(), cluster);
    26. }
    27. service.validate();
    28. // todo 将service写入到注册表
    29. putServiceAndInit(service);
    30. // 对持久实例的操作
    31. if (!local) {
    32. addOrReplaceService(service);
    33. }
    34. }
    35. }
    36. 复制代码

    先是根据namespace与serviceName 获取service ,如果没有的话,就创建,最开始的时候,肯定是没有的,然后就会创建一个service,看下这个getService 方法:

    1. public Service getService(String namespaceId, String serviceName) {
    2. if (serviceMap.get(namespaceId) == null) {
    3. return null;
    4. }
    5. return chooseServiceMap(namespaceId).get(serviceName);
    6. }
    7. 复制代码

    说白了其实就是去serviceMap 这个成员中获取,对应关系看下面这个注释就可以。

    1. // namespace ---》 serivceName ,service
    2. private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
    3. 复制代码

    createEmptyService 这个方法,最后putServiceAndInit(service);这行代码也需要注意下

    1. private void putServiceAndInit(Service service) throws NacosException {
    2. // 将service写入注册表
    3. putService(service);
    4. // 初始化service内部健康检测任务
    5. service.init();
    6. // 给nacos集合中的当前服务的持久实例、临时实例添加监听
    7. consistencyService
    8. .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    9. consistencyService
    10. .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    11. Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    12. }
    13. 复制代码

    就是将这个service 放到 serviceMap中,然后service进行初始化, 再就是添加两个监听器。 看下这个service 初始化的方法。

    1. public void init() {
    2. // 开启定时清除过期instance任务
    3. HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    4. // 开启了当前service所包含的所有cluster的健康检测任务
    5. for (Map.Entry entry : clusterMap.entrySet()) {
    6. entry.getValue().setService(this);
    7. // 开启当前遍历cluster的健康检测任务:
    8. // 将当前cluster包含的所有instance的心跳检测任务定时添加到一个任务队列
    9. // taskQueue,即将当前cluster所包含的持久实例的心跳任务添加到taskQueue
    10. entry.getValue().init();
    11. }
    12. }
    13. 复制代码

    这里这个初始化有个非常重要的地方就是往健康检查器中添加一个任务,健康检查的任务,这个任务其实就是扫描这个service里面长时间没有心跳的instance(服务实例),然后进行健康状态改变,服务下线。

    接着回到ServiceManager#registerInstance 方法中去

    1. public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    2. // 创建 空service
    3. // 第三个参数 true表示临时实例
    4. createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    5. // 从注册表获取到service
    6. Service service = getService(namespaceId, serviceName);
    7. // 这里指定不能为null
    8. if (service == null) {
    9. throw new NacosException(NacosException.INVALID_PARAM,
    10. "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    11. }
    12. // todo 将instance写入到service,即写入到了注册表
    13. addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    14. }
    15. 复制代码

    这个时候再获取service就能够获取到了,然后接着就是调用addInstance 方法添加实例了。

    1. public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
    2. throws NacosException {
    3. String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    4. // 从注册表中获取service
    5. Service service = getService(namespaceId, serviceName);
    6. synchronized (service) {
    7. // todo 将要注册的instance写入到service,即写入到了注册表
    8. List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    9. // 塞到instance
    10. Instances instances = new Instances();
    11. instances.setInstanceList(instanceList);
    12. // todo 将本次变更同步给其它Nacos
    13. consistencyService.put(key, instances);
    14. }
    15. }
    16. 复制代码

    首先是生成一个key,这个key是根据你namespace,serviceName ,是否临时节点来决定的,我们这里是临时的,直接看下生成的结果就可以了

    1. com.alibaba.nacos.naming.iplist.ephemeral.{namespace}##{serviceName}
    2. 复制代码

    接着就是获取service, 上锁,调用addIpAddresses 得到一个instance集合

    1. private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    2. // 修改当前service的instance列表,这个修改一共有两种操作:
    3. // 添加实例 与 删除实例
    4. return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
    5. }
    6. public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
    7. throws NacosException {
    8. // 从其它nacos获取当前服务数据(临时实例数据)
    9. Datum datum = consistencyService
    10. .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    11. // 获取本地注册表中当前服务的所有临时实例
    12. List<Instance> currentIPs = service.allIPs(ephemeral);
    13. Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    14. Set<String> currentInstanceIds = Sets.newHashSet();
    15. // 遍历注册表中获取到的实例
    16. for (Instance instance : currentIPs) {
    17. // 将当前遍历的instance写入到map,key为ip:port,value为instance
    18. currentInstances.put(instance.toIpAddr(), instance);
    19. // 将当前遍历的instanceId写入到一个set
    20. currentInstanceIds.add(instance.getInstanceId());
    21. }
    22. Map<String, Instance> instanceMap;
    23. if (datum != null && null != datum.value) {
    24. // todo 将注册表中主机的instance数据替换掉外来的相同主机的instance数据
    25. instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    26. } else {
    27. instanceMap = new HashMap<>(ips.length);
    28. }
    29. for (Instance instance : ips) {
    30. // 若当前service中不包含当前要注册的instance所属cluster,则创建一个
    31. if (!service.getClusterMap().containsKey(instance.getClusterName())) {
    32. Cluster cluster = new Cluster(instance.getClusterName(), service);
    33. // todo 初始化cluster的健康检测任务
    34. cluster.init();
    35. service.getClusterMap().put(instance.getClusterName(), cluster);
    36. Loggers.SRV_LOG
    37. .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
    38. instance.getClusterName(), instance.toJson());
    39. }
    40. // 若当前操作为清除操作,则将当前instance从instanceMap中清除,
    41. // 否则就是添加操作,即将当前instance添加到instanceMap中
    42. if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
    43. instanceMap.remove(instance.getDatumKey());
    44. } else {
    45. Instance oldInstance = instanceMap.get(instance.getDatumKey());
    46. if (oldInstance != null) {
    47. instance.setInstanceId(oldInstance.getInstanceId());
    48. } else {
    49. instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
    50. }
    51. instanceMap.put(instance.getDatumKey(), instance);
    52. }
    53. }
    54. if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
    55. throw new IllegalArgumentException(
    56. "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
    57. .toJson(instanceMap.values()));
    58. }
    59. return new ArrayList<>(instanceMap.values());
    60. }
    61. 复制代码

    UPDATE_INSTANCE_ACTION_ADD是add,主要就是新的instance 与之前的instance进行合并啥的,生成一个新的instance集合。

    接着就是创建一个instances 对象,将instance集合塞进去,最后调用consiitencyService 组件进行保存。

    3. DelegateConsistencyServiceImpl#put

    1. @Override
    2. public void put(String key, Record value) throws NacosException {
    3. mapConsistencyService(key).put(key, value);
    4. }
    5. 复制代码

    这个方法会根据你这个key是临时的还是永久的选择一个consisitencyService:

    1. private ConsistencyService mapConsistencyService(String key) {
    2. // 判断是不是临时key 临时key就走ephemeralConsistencyService服务,否则就走persistentConsistencyService服务
    3. return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    4. }
    5. 复制代码

    这里我们是临时的,所以就走EphemeralConsistencyService 的实现类DistroConsistencyServiceImpl 的put方法。

    3.1 DistroConsistencyServiceImpl#put

    1. // put 往存储器中放入数据
    2. @Override
    3. public void put(String key, Record value) throws NacosException {
    4. // 往本地里面存储
    5. onPut(key, value);
    6. // todo 应该是同步
    7. distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
    8. globalConfig.getTaskDispatchPeriod() / 2);
    9. }
    10. 复制代码

    我们直接看onPut方法就可以了,后面这个方法我们现在先不研究:

    1. public void onPut(String key, Record value) {
    2. // 判断是否是临时
    3. if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
    4. // 封装到datnum
    5. Datum<Instances> datum = new Datum<>();
    6. datum.value = (Instances) value;
    7. datum.key = key;
    8. // 自增
    9. datum.timestamp.incrementAndGet();
    10. // dataStore存储
    11. dataStore.put(key, datum);
    12. }
    13. // 如果listener里面没有这个key 直接返回,发布订阅,进行通知,这个监听器会在Service初始化的时候添加进去
    14. if (!listeners.containsKey(key)) {
    15. return;
    16. }
    17. // 添加通知任务
    18. notifier.addTask(key, DataOperation.CHANGE);
    19. }
    20. 复制代码

    这里需要判断一下是否是临时节点,如果是的话,就封装一个Datum,这个东西就是个kv,不用太多care它,接着就是调用datastoreput方法进行存储。

    最后如果有这个key的监听器的话,就会接着往下走,没有的话就返回,我们在初始化service 的时候是注册了2个监听器的,往上翻翻就可以看到。接着就是调用notifier添加任务。

    先看下这个datastore:

    1. private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024);
    2. public void put(String key, Datum value) {
    3. dataMap.put(key, value);
    4. }
    5. 复制代码

    这个特别简单,就是个map,然后往map里面塞东西,接着看下这个notifier.addTask(key, DataOperation.CHANGE);

    1. // 添加通知任务到队列中
    2. public void addTask(String datumKey, DataOperation action) {
    3. // 如果已经存在,并且是change事件
    4. if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
    5. return;
    6. }
    7. if (action == DataOperation.CHANGE) {
    8. // 往services缓存中放
    9. services.put(datumKey, StringUtils.EMPTY);
    10. }
    11. // 加入任务队列中
    12. tasks.offer(Pair.with(datumKey, action));
    13. }
    14. private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
    15. 复制代码

    这里其实就是往任务队列中添加了一个任务。到这按理说我们服务注册就该结束了,但是,我们发现生成了新的instance集合并没有更新到service对象里面去,所以还得继续往下看,看看这个通知任务是怎么回事。 其实DistroConsistencyServiceImpl 这个类在初始化的时候,然后提交了一个任务:

    1. // 初始化操作
    2. @PostConstruct
    3. public void init() {
    4. GlobalExecutor.submitDistroNotifyTask(notifier);
    5. }
    6. 复制代码

    我们看下这个Notifier 的run方法

    1. @Override
    2. public void run() {
    3. Loggers.DISTRO.info("distro notifier started");
    4. for (; ; ) {
    5. try {
    6. // 取出任务
    7. Pair<String, DataOperation> pair = tasks.take();
    8. // todo 任务处理
    9. handle(pair);
    10. } catch (Throwable e) {
    11. Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    12. }
    13. }
    14. }
    15. 复制代码

    就是取出任务队列中的任务,调用handle方法进行处理:

    1. private void handle(Pair<String, DataOperation> pair) {
    2. try {
    3. String datumKey = pair.getValue0();
    4. DataOperation action = pair.getValue1();
    5. // 从services缓存中移除
    6. services.remove(datumKey);
    7. int count = 0;
    8. if (!listeners.containsKey(datumKey)) {
    9. return;
    10. }
    11. for (RecordListener listener : listeners.get(datumKey)) {
    12. count++;
    13. try {
    14. // 通知数据已经改变
    15. if (action == DataOperation.CHANGE) {
    16. // todo 将key对应的实例列表传过去
    17. listener.onChange(datumKey, dataStore.get(datumKey).value);
    18. continue;
    19. }
    20. // 通知数据已经删除
    21. if (action == DataOperation.DELETE) {
    22. listener.onDelete(datumKey);
    23. continue;
    24. }
    25. } catch (Throwable e) {
    26. Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
    27. }
    28. }
    29. if (Loggers.DISTRO.isDebugEnabled()) {
    30. Loggers.DISTRO
    31. .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
    32. datumKey, count, action.name());
    33. }
    34. } catch (Throwable e) {
    35. Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
    36. }
    37. }
    38. }
    39. 复制代码

    这里直接通知,调用listeneronChangeonDelete执行相关的工作。我们那个时候是将service作为listener注册进来了,所以我们看下service的onChange方法:

    1. public void onChange(String key, Instances value) throws Exception {
    2. Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    3. for (Instance instance : value.getInstanceList()) {
    4. if (instance == null) {
    5. // Reject this abnormal instance list:
    6. throw new RuntimeException("got null instance " + key);
    7. }
    8. // 处理权重问题
    9. if (instance.getWeight() > 10000.0D) {
    10. instance.setWeight(10000.0D);
    11. }
    12. if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
    13. instance.setWeight(0.01D);
    14. }
    15. }
    16. // todo 更新instance
    17. updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    18. recalculateChecksum();
    19. }
    20. 复制代码

    核心的在updateIPs这个方法方法中,参数1是新的instance集合,参数2是是否是临时节点:

    1. public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    2. Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    3. for (String clusterName : clusterMap.keySet()) {
    4. ipMap.put(clusterName, new ArrayList<>());
    5. }
    6. for (Instance instance : instances) {
    7. try {
    8. if (instance == null) {
    9. Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
    10. continue;
    11. }
    12. // cluster为null 就设置成默认DEFAULT
    13. if (StringUtils.isEmpty(instance.getClusterName())) {
    14. instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
    15. }
    16. // cluster不存在就创建对应的cluster
    17. if (!clusterMap.containsKey(instance.getClusterName())) {
    18. Loggers.SRV_LOG
    19. .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
    20. instance.getClusterName(), instance.toJson());
    21. Cluster cluster = new Cluster(instance.getClusterName(), this);
    22. cluster.init();
    23. // 加入map中
    24. getClusterMap().put(instance.getClusterName(), cluster);
    25. }
    26. List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
    27. if (clusterIPs == null) {
    28. clusterIPs = new LinkedList<>();
    29. ipMap.put(instance.getClusterName(), clusterIPs);
    30. }
    31. clusterIPs.add(instance);
    32. } catch (Exception e) {
    33. Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
    34. }
    35. }
    36. for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
    37. //make every ip mine
    38. List<Instance> entryIPs = entry.getValue();
    39. // 某个集群,更新下面的instance
    40. clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    41. }
    42. // 设置 最近的一次修改时间
    43. setLastModifiedMillis(System.currentTimeMillis());
    44. // 获取pushService 然后服务改变,通知改变
    45. getPushService().serviceChanged(this);
    46. StringBuilder stringBuilder = new StringBuilder();
    47. for (Instance instance : allIPs()) {
    48. stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
    49. }
    50. Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
    51. stringBuilder.toString());
    52. }
    53. 复制代码

    这个方法其实就是遍历instance集合,然后更新clusterMap 这个里面的内容,这个clusterMap 其实就是clusterNamecluster的对应关系,从代码上可以看到实现弄出所有的cluster,然后遍历instance集合,如果没有某个instance没有cluster,就设置成默认DEFAULT_CLUSTER_NAME,如果某个cluster没有的话就创建。然后塞到一个cluster与instance集合对应关系的map中。

    接着就是遍历clusterMap更新下instance列表,这个更新instance列表代码很多,我就不贴出来了,主要思想还是比对新老的,然后找出新的instance,与挂了的instance,注意这一步是更新 cluster对象里面的集合,其实就是2个set,一个存临时节点的,一个是存永久节点的。

    1. // 持久实例集合
    2. @JsonIgnore
    3. private Set<Instance> persistentInstances = new HashSet<>();
    4. // 临时实例集合
    5. @JsonIgnore
    6. private Set<Instance> ephemeralInstances = new HashSet<>();
    7. 复制代码

    好了,到这我们的服务注册就算是完事了,最后这个service.onChange方法中还有一行代码我们需要注意 getPushService().serviceChanged(this);这个就是service服务列表发生变化,然后进行通知的。

    4. 流程图

    4.1 调用关系图

    4.2 服务注册流程图

    4.3 异步事件通知修改注册信息

    4.4 图解namespace/group/service/cluster/instance的关系

    代码设计上关系:


    个人对namespace/group/service/cluster/instance之间关系的理解

  • 相关阅读:
    python学习之【文件读写】
    C语言实现图的拓扑排序算法
    C++容器string的运用和注意
    2023年上半年软考网工选择题易错总结
    决策树算法:原理与python实现案例
    王坚院士:云计算与 GPT 的关系,就是电和电动机的关系
    认识异步复位寄存器同时学习timing_arc约束
    山东大学2024深度学习期末考试回忆
    音频链接抓取技术在Lua中的实现
    PLC串口通讯和通讯接口知识汇总
  • 原文地址:https://blog.csdn.net/BASK2311/article/details/127685276