• Nacos源码分析专题(二)-服务注册


    1.引言

    服务注册到Nacos以后,会保存在一个本地注册表中,其结构如下:
    在这里插入图片描述
    首先最外层是一个Map,结构为:Map>

    • key:是namespace_id,起到环境隔离的作用。namespace下可以有多个group

    • value:又是一个Map,代表分组及组内的服务。一个组内可以有多个服务

      • key:代表group分组,不过作为key时格式是group_name:service_name

      • value:分组下的某一个服务,例如userservice,用户服务。类型为Service,内部也包含一个Map,一个服务下可以有多个集群

        • key:集群名称
        • valueCluster类型,包含集群的具体信息。一个集群中可能包含多个实例,也就是具体的节点信息,其中包含一个Set,就是该集群下的实例的集合
          - Instance:实例信息,包含实例的IPPort、健康状态、权重等等信息

    每一个服务去注册到Nacos时,就会把信息组织并存入这个Map中。

    2.服务注册接口

    Nacos提供了服务注册的API接口,客户端只需要向该接口发送请求,即可实现服务注册。

    接口说明:注册一个实例到Nacos服务。

    请求类型POST

    请求路径/nacos/v1/ns/instance

    请求参数

    名称类型是否必选描述
    ip字符串服务实例IP
    portint服务实例port
    namespaceId字符串命名空间ID
    weightdouble权重
    enabledboolean是否上线
    healthyboolean是否健康
    metadata字符串扩展信息
    clusterName字符串集群名
    serviceName字符串服务名
    groupName字符串分组名
    ephemeralboolean是否临时实例

    错误编码:

    错误代码描述语义
    400Bad Request客户端请求中的语法错误
    403Forbidden没有权限
    404Not Found无法找到资源
    500Internal Server Error服务器内部错误
    200OK正常

    3.客户端

    首先,我们需要找到服务注册的入口。

    3.1.NacosServiceRegistryAutoConfiguration

    因为Nacos的客户端是基于SpringBoot的自动装配实现的,我们可以在nacos-discovery依赖:

    spring-cloud-starter-alibaba-nacos-discovery-2.2.6.RELEASE.jar
    
    • 1

    这个包中找到Nacos自动装配信息:
    在这里插入图片描述
    可以看到,有很多个自动配置类被加载了,其中跟服务注册有关的就是NacosServiceRegistryAutoConfiguration这个类,我们跟入其中。

    可以看到,在NacosServiceRegistryAutoConfiguration这个类中,包含一个跟自动注册有关的Bean:

    在这里插入图片描述

    3.2.NacosAutoServiceRegistration

    NacosAutoServiceRegistration源码如图:
    在这里插入图片描述
    可以看到在初始化时,其父类AbstractAutoServiceRegistration也被初始化了。

    AbstractAutoServiceRegistration如图:
    在这里插入图片描述
    可以看到它实现了ApplicationListener接口,监听Spring容器启动过程中的事件。

    在监听到WebServerInitializedEvent(web服务初始化完成)的事件后,执行了bind 方法。
    在这里插入图片描述
    其中的bind方法如下:

    public void bind(WebServerInitializedEvent event) {
        // 获取 ApplicationContext
        ApplicationContext context = event.getApplicationContext();
        // 判断服务的 namespace,一般都是null
        if (context instanceof ConfigurableWebServerApplicationContext) {
            if ("management".equals(((ConfigurableWebServerApplicationContext) context)
                                    .getServerNamespace())) {
                return;
            }
        }
        // 记录当前 web 服务的端口
        this.port.compareAndSet(0, event.getWebServer().getPort());
        // 启动当前服务注册流程
        this.start();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    其中的start方法流程:

    public void start() {
    		if (!isEnabled()) {
    			if (logger.isDebugEnabled()) {
    				logger.debug("Discovery Lifecycle disabled. Not starting");
    			}
    			return;
    		}
    
    		// 当前服务处于未运行状态时,才进行初始化
    		if (!this.running.get()) {
                // 发布服务开始注册的事件
    			this.context.publishEvent(
    					new InstancePreRegisteredEvent(this, getRegistration()));
                // ☆☆☆☆开始注册☆☆☆☆
    			register();
    			if (shouldRegisterManagement()) {
    				registerManagement();
    			}
                // 发布注册完成事件
    			this.context.publishEvent(
    					new InstanceRegisteredEvent<>(this, getConfiguration()));
                // 服务状态设置为运行状态,基于AtomicBoolean
    			this.running.compareAndSet(false, true);
    		}
    
    	}
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    其中最关键的register()方法就是完成服务注册的关键,代码如下:

    protected void register() {
        this.serviceRegistry.register(getRegistration());
    }
    
    
    • 1
    • 2
    • 3
    • 4

    此处的this.serviceRegistry就是NacosServiceRegistry
    在这里插入图片描述

    3.3.NacosServiceRegistry

    NacosServiceRegistry是Spring的ServiceRegistry接口的实现类,而ServiceRegistry接口是服务注册、发现的规约接口,定义了registerderegister等方法的声明。

    NacosServiceRegistryregister的实现如下:

    @Override
    public void register(Registration registration) {
    	// 判断serviceId是否为空,也就是spring.application.name不能为空
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }
        // 获取Nacos的命名服务,其实就是注册中心服务
        NamingService namingService = namingService();
        // 获取 serviceId 和 Group
        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();
    	// 封装服务实例的基本信息,如 cluster-name、是否为临时实例、权重、IP、端口等
        Instance instance = getNacosInstanceFromRegistration(registration);
    
        try {
            // 开始注册服务
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
                     instance.getIp(), instance.getPort());
        }
        catch (Exception e) {
            if (nacosDiscoveryProperties.isFailFast()) {
                log.error("nacos registry, {} register failed...{},", serviceId,
                          registration.toString(), e);
                rethrowRuntimeException(e);
            }
            else {
                log.warn("Failfast is false. {} register failed...{},", serviceId,
                         registration.toString(), e);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    可以看到方法中最终是调用NamingServiceregisterInstance方法实现注册的。

    NamingService接口的默认实现就是NacosNamingService

    3.4.NacosNamingService

    NacosNamingService提供了服务注册、订阅等功能。

    其中registerInstance就是注册服务实例,源码如下:

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        // 检查超时参数是否异常。心跳超时时间(默认15秒)必须大于心跳周期(默认5秒)
        NamingUtils.checkInstanceIsLegal(instance);
        // 拼接得到新的服务名,格式为:groupName@@serviceId
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        // 判断是否为临时实例,默认为 true。
        if (instance.isEphemeral()) {
            // 如果是临时实例,需要定时向 Nacos 服务发送心跳
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        // 发送注册服务实例的请求
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    最终,由NacosProxyregisterService方法,完成服务注册。

    代码如下:

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    
        NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName,
                           instance);
    	// 组织请求参数
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
    	// 通过POST请求将上述参数,发送到 /nacos/v1/ns/instance
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    这里提交的信息就是Nacos服务注册接口需要的完整参数,核心参数有:

    • namespace_id:环境
    • service_name:服务名称
    • group_name:组名称
    • cluster_name:集群名称
    • ip: 当前实例的ip地址
    • port: 当前实例的端口

    而在NacosNamingServiceregisterInstance方法中,有一段是与服务心跳有关的代码,我们在后续会继续学习。
    在这里插入图片描述

    4.客户端注册的流程图

    如图:
    在这里插入图片描述

    5.服务端

    nacos-console的模块中,会引入nacos-naming这个模块

    在这里插入图片描述

    模块结构如下:
    在这里插入图片描述
    其中的com.alibaba.nacos.naming.controllers包下就有服务注册、发现等相关的各种接口,其中的服务注册是在InstanceController类中:

    5.1.InstanceController

    进入InstanceController类,可以看到一个register方法,就是服务注册的方法了:

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
    	// 尝试获取namespaceId
        final String namespaceId = WebUtils
            .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        // 尝试获取serviceName,其格式为 group_name@@service_name
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
    	// 解析出实例信息,封装为Instance对象
        final Instance instance = parseInstance(request);
    	// 注册实例
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这里,进入到了serviceManager.registerInstance()方法中。

    5.2.ServiceManager

    ServiceManager就是Nacos中管理服务、实例信息的核心API,其中就包含Nacos的服务注册表:
    在这里插入图片描述
    而其中的registerInstance方法就是注册服务实例的方法:

    /**
         * Register an instance to a service in AP mode.
         *
         * 

    This method creates service or cluster silently if they don't exist. * * @param namespaceId id of namespace * @param serviceName service name * @param instance instance to register * @throws Exception any error occurred in the process */ public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { // 创建一个空的service(如果是第一次来注册实例,要先创建一个空service出来,放入注册表) // 此时不包含实例信息 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); // 拿到创建好的service Service service = getService(namespaceId, serviceName); // 拿不到则抛异常 if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } // 添加要注册的实例到service中 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    创建好了服务,接下来就要添加实例到服务中:

    /**
         * Add instance to service.
         *
         * @param namespaceId namespace
         * @param serviceName service name
         * @param ephemeral   whether instance is ephemeral
         * @param ips         instances
         * @throws NacosException nacos exception
         */
    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
        throws NacosException {
    	// 监听服务列表用到的key,服务唯一标识,例如:com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@order-service
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        // 获取服务
        Service service = getService(namespaceId, serviceName);
        // 同步锁,避免并发修改的安全问题
        synchronized (service) {
            // 1)获取要更新的实例列表
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
    		// 2)封装实例列表到Instances对象
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
    		// 3)完成 注册表更新 以及 Nacos集群的数据同步
            consistencyService.put(key, instances);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    该方法中对修改服务列表的动作加锁处理,确保线程安全。而在同步代码块中,包含下面几步:

    1)先获取要更新的实例列表,addIpAddresses(service, ephemeral, ips);
    2)然后将更新后的数据封装到Instances对象中,后面更新注册表时使用
    3)最后,调用consistencyService.put()方法完成Nacos集群的数据同步,保证集群一致性。

    注意:在第1步的addIPAddress中,会拷贝旧的实例列表,添加新实例到列表中。在第3步中,完成对实例状态更新后,则会用新列表直接覆盖旧实例列表。而在更新过程中,旧实例列表不受影响,用户依然可以读取。
    这样在更新列表状态过程中,无需阻塞用户的读操作,也不会导致用户读取到脏数据,性能比较好。这种方案称为CopyOnWrite方案。

    5.2.1.更服务列表

    我们来看看实例列表的更新,对应的方法是addIpAddresses(service, ephemeral, ips);

    private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
    }
    
    
    • 1
    • 2
    • 3
    • 4

    继续进入updateIpAddresses方法:

    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)
        throws NacosException {
    	// 根据namespaceId、serviceName获取当前服务的实例列表,返回值是Datum
        // 第一次来,肯定是null
        Datum datum = consistencyService
            .get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    	// 得到服务中现有的实例列表
        List<Instance> currentIPs = service.allIPs(ephemeral);
        // 创建map,保存实例列表,key为ip地址,value是Instance对象
        Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
        // 创建Set集合,保存实例的instanceId
        Set<String> currentInstanceIds = Sets.newHashSet();
    	// 遍历要现有的实例列表
        for (Instance instance : currentIPs) {
            // 添加到map中
            currentInstances.put(instance.toIpAddr(), instance);
            // 添加instanceId到set中
            currentInstanceIds.add(instance.getInstanceId());
        }
    	
        // 创建map,用来保存更新后的实例列表
        Map<String, Instance> instanceMap;
        if (datum != null && null != datum.value) {
            // 如果服务中已经有旧的数据,则先保存旧的实例列表
            instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
        } else {
            // 如果没有旧数据,则直接创建新的map
            instanceMap = new HashMap<>(ips.length);
        }
    	// 遍历实例列表
        for (Instance instance : ips) {
            // 判断服务中是否包含要注册的实例的cluster信息
            if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                // 如果不包含,创建新的cluster
                Cluster cluster = new Cluster(instance.getClusterName(), service);
                cluster.init();
                // 将集群放入service的注册表
                service.getClusterMap().put(instance.getClusterName(), cluster);
                Loggers.SRV_LOG
                    .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                          instance.getClusterName(), instance.toJson());
            }
    		// 删除实例 or 新增实例 ?
            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                instanceMap.remove(instance.getDatumKey());
            } else {
                // 新增实例,instance生成全新的instanceId
                Instance oldInstance = instanceMap.get(instance.getDatumKey());
                if (oldInstance != null) {
                    instance.setInstanceId(oldInstance.getInstanceId());
                } else {
                    instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
                }
                // 放入instance列表
                instanceMap.put(instance.getDatumKey(), instance);
            }
    
        }
    
        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
            throw new IllegalArgumentException(
                "ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils
                .toJson(instanceMap.values()));
        }
    	// 将instanceMap中的所有实例转为List返回
        return new ArrayList<>(instanceMap.values());
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68

    简单来讲,就是先获取旧的实例列表,然后把新的实例信息与旧的做对比,新的实例就添加,老的实例同步ID。然后返回最新的实例列表。

    5.2.2.Nacos集群一致性

    在完成本地服务列表更新后,Nacos又实现了集群一致性更新,调用的是:

    consistencyService.put(key, instances);
    
    • 1

    这里的ConsistencyService接口,代表集群一致性的接口,有很多中不同实现:
    在这里插入图片描述
    我们进入DelegateConsistencyServiceImpl来看:

    @Override
    public void put(String key, Record value) throws NacosException {
        // 根据实例是否是临时实例,判断委托对象
        mapConsistencyService(key).put(key, value);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    其中的mapConsistencyService(key)方法就是选择委托方式的:

    private ConsistencyService mapConsistencyService(String key) {
        // 判断是否是临时实例:
        // 是,选择 ephemeralConsistencyService,也就是 DistroConsistencyServiceImpl类
        // 否,选择 persistentConsistencyService,也就是PersistentConsistencyServiceDelegateImpl
        return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    默认情况下,所有实例都是临时实例,我们关注DistroConsistencyServiceImpl即可。

    5.2.3.DistroConsistencyServiceImpl

    我们来看临时实例的一致性实现:DistroConsistencyServiceImpl类的put方法:

    public void put(String key, Record value) throws NacosException {
        // 先将要更新的实例信息写入本地实例列表
        onPut(key, value);
        // 开始集群同步
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,
                            globalConfig.getTaskDispatchPeriod() / 2);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    这里方法只有两行:

    • onPut(key, value):其中value就是Instances,要更新的服务信息。这行主要是基于线程池方式,异步的将Service信息写入注册表中(就是那个多重Map)
    • distroProtocol.sync():就是通过Distro协议将数据同步给集群中的其它Nacos节点

    我们先看onPut方法

    5.2.4.更新本地实例列表

    1)放入阻塞队列

    onPut方法如下:

    public void onPut(String key, Record value) {
    	// 判断是否是临时实例
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            // 封装 Instances 信息到 数据集:Datum
            Datum datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            // 放入DataStore
            dataStore.put(key, datum);
        }
    
        if (!listeners.containsKey(key)) {
            return;
        }
    	// 放入阻塞队列,这里的 notifier维护了一个阻塞队列,并且基于线程池异步执行队列中的任务
        notifier.addTask(key, DataOperation.CHANGE);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    notifier的类型就是DistroConsistencyServiceImpl.Notifier,内部维护了一个阻塞队列,存放服务列表变更的事件:

    在这里插入图片描述
    addTask时,将任务加入该阻塞队列:

    // DistroConsistencyServiceImpl.Notifier类的 addTask 方法:
    public void addTask(String datumKey, DataOperation action) {
    
        if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
            return;
        }
        if (action == DataOperation.CHANGE) {
            services.put(datumKey, StringUtils.EMPTY);
        }
        // 任务放入阻塞队列
        tasks.offer(Pair.with(datumKey, action));
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2)Notifier异步更新
    同时,notifier还是一个Runnable,通过一个单线程的线程池来不断从阻塞队列中获取任务,执行服务列表的更新。来看下其中的run方法:

    // DistroConsistencyServiceImpl.Notifier类的run方法:
    @Override
    public void run() {
        Loggers.DISTRO.info("distro notifier started");
    	// 死循环,不断执行任务。因为是阻塞队列,不会导致CPU负载过高
        for (; ; ) {
            try {
                // 从阻塞队列中获取任务
                Pair<String, DataOperation> pair = tasks.take();
                // 处理任务,更新服务列表
                handle(pair);
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    来看看handle方法:

    // DistroConsistencyServiceImpl.Notifier类的 handle 方法:
    private void handle(Pair<String, DataOperation> pair) {
        try {
            String datumKey = pair.getValue0();
            DataOperation action = pair.getValue1();
    
            services.remove(datumKey);
    
            int count = 0;
    
            if (!listeners.containsKey(datumKey)) {
                return;
            }
    		// 遍历,找到变化的service,这里的 RecordListener就是 Service
            for (RecordListener listener : listeners.get(datumKey)) {
    
                count++;
    
                try {
                    // 服务的实例列表CHANGE事件
                    if (action == DataOperation.CHANGE) {
                        // 更新服务列表
                        listener.onChange(datumKey, dataStore.get(datumKey).value);
                        continue;
                    }
    				// 服务的实例列表 DELETE 事件
                    if (action == DataOperation.DELETE) {
                        listener.onDelete(datumKey);
                        continue;
                    }
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                }
            }
    
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO
                    .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",
                           datumKey, count, action.name());
            }
        } catch (Throwable e) {
            Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    3)覆盖实例列表
    而在ServiceonChange方法中,就可以看到更新实例列表的逻辑了:

    @Override
    public void onChange(String key, Instances value) throws Exception {
    
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
    
    	// 更新实例列表
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    
        recalculateChecksum();
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    updateIPs方法:

    public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        // 准备一个Map,key是cluster,值是集群下的Instance集合
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        // 获取服务的所有cluster名称
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }
        // 遍历要更新的实例
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }
    			// 判断实例是否包含clusterName,没有的话用默认cluster
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }
    			// 判断cluster是否存在,不存在则创建新的cluster
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG
                        .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",
                              instance.getClusterName(), instance.toJson());
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    getClusterMap().put(instance.getClusterName(), cluster);
                }
    			// 获取当前cluster实例的集合,不存在则创建新的
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }
    			// 添加新的实例到 Instance 集合
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }
    
        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //make every ip mine
            List<Instance> entryIPs = entry.getValue();
            // 将实例集合更新到 clusterMap(注册表)
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }
    
        setLastModifiedMillis(System.currentTimeMillis());
        // 发布服务变更的通知消息
        getPushService().serviceChanged(this);
        StringBuilder stringBuilder = new StringBuilder();
    
        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
        }
    
        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),
                             stringBuilder.toString());
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    在第45行的代码中:clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);

    就是在更新注册表:

    public void updateIps(List<Instance> ips, boolean ephemeral) {
        // 获取旧实例列表
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
    
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
    
    	// 检查新加入实例的状态
        List<Instance> newIPs = subtract(ips, oldIpMap.values());
        if (newIPs.size() > 0) {
            Loggers.EVT_LOG
                .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),
                      getName(), newIPs.size(), newIPs.toString());
    
            for (Instance ip : newIPs) {
                HealthCheckStatus.reset(ip);
            }
        }
    	// 移除要删除的实例
        List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
    
        if (deadIPs.size() > 0) {
            Loggers.EVT_LOG
                .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),
                      getName(), deadIPs.size(), deadIPs.toString());
    
            for (Instance ip : deadIPs) {
                HealthCheckStatus.remv(ip);
            }
        }
    
        toUpdateInstances = new HashSet<>(ips);
    	// 直接覆盖旧实例列表
        if (ephemeral) {
            ephemeralInstances = toUpdateInstances;
        } else {
            persistentInstances = toUpdateInstances;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    5.2.5.集群数据同步

    DistroConsistencyServiceImpl的put方法中分为两步:
    在这里插入图片描述
    其中的onPut方法已经分析过了。

    下面的distroProtocol.sync()就是集群同步的逻辑了。

    DistroProtocol类的sync方法如下:

    public void sync(DistroKey distroKey, DataOperation action, long delay) {
        // 遍历 Nacos 集群中除自己以外的其它节点
        for (Member each : memberManager.allMembersWithoutSelf()) {
            DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(),
                                                          each.getAddress());
            // 定义一个Distro的同步任务
            DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
            // 交给线程池去执行
            distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
            if (Loggers.DISTRO.isDebugEnabled()) {
                Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress());
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    其中同步的任务封装为一个DistroDelayTask对象。

    交给了distroTaskEngineHolder.getDelayTaskExecuteEngine()执行,这行代码的返回值是:

    NacosDelayTaskExecuteEngine,这个类维护了一个线程池,并且接收任务,执行任务。

    执行任务的方法为processTasks()方法:

    protected void processTasks() {
        Collection<Object> keys = getAllTaskKeys();
        for (Object taskKey : keys) {
            AbstractDelayTask task = removeTask(taskKey);
            if (null == task) {
                continue;
            }
            NacosTaskProcessor processor = getProcessor(taskKey);
            if (null == processor) {
                getEngineLog().error("processor not found for task, so discarded. " + task);
                continue;
            }
            try {
                // 尝试执行同步任务,如果失败会重试
                if (!processor.process(task)) {
                    retryFailedTask(taskKey, task);
                }
            } catch (Throwable e) {
                getEngineLog().error("Nacos task execute error : " + e.toString(), e);
                retryFailedTask(taskKey, task);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    可以看出来基于Distro模式的同步是异步进行的,并且失败时会将任务重新入队并充实,因此不保证同步结果的强一致性,属于AP模式的一致性策略。

    5.3.服务端流程图

    在这里插入图片描述

    6.总结

    6.1.Nacos的注册表结构是什么样的?

    答:Nacos是多级存储模型,最外层通过namespace来实现环境隔离,然后是group分组,分组下就是服务,一个服务有可以分为不同的集群,集群中包含多个实例。因此其注册表结构为一个Map,类型是:

    Map>

    外层keynamespace_id,内层keygroup+serviceName.

    Service内部维护一个Map,结构是:MapkeyclusterName,值是集群信息

    Cluster内部维护一个Set集合,元素是Instance类型,代表集群中的多个实例。

    6.2.Nacos如何保证并发写的安全性?

    答:首先,在注册实例时,会对service加锁,不同service之间本身就不存在并发写问题,互不影响。相同service时通过锁来互斥。并且,在更新实例列表时,是基于异步的线程池来完成,而线程池的线程数量为1.

    6.3.Nacos如何避免并发读写的冲突?

    答:Nacos在更新实例列表时,会采用CopyOnWrite技术,首先将Old实例列表拷贝一份,然后更新拷贝的实例列表,再用更新后的实例列表来覆盖旧的实例列表。

    6.4.Nacos如何应对阿里内部数十万服务的并发写请求?

    答:Nacos内部会将服务注册的任务放入阻塞队列,采用线程池异步来完成实例更新,从而提高并发写能力。

  • 相关阅读:
    Java基于SpringBoot的闲一品交易平台
    101 - The Blocks Problem (UVA)
    【开源】SpringBoot框架开发用户画像活动推荐系统
    目标检测YOLO实战应用案例100讲-基于无人机的轻量化目标检测系统设计
    redux和Vuex的使用示例
    Windows之应用安装程序 —— winget
    Redis学习之路(四)--常用数据类型String字符串
    如何使用chorme版本对应的ChromeDriver(不用更改Chrome版本)
    Net 高级调试之十二:垃圾回收机制以及终结器队列、对象固定
    git 基本使用
  • 原文地址:https://blog.csdn.net/BruceLiu_code/article/details/126088029