• Nacos源码分析专题(四)-服务发现


    1.客户端

    1.1.定时更新服务列表

    1.1.1.NacosNamingService

    在前面我们讲到一个类NacosNamingService,这个类不仅仅提供了服务注册功能,同样提供了服务发现的功能。
    在这里插入图片描述
    多个重载的方法最终都会进入一个方法:

    @Override
    public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters,
                                          boolean subscribe) throws NacosException {
    
        ServiceInfo serviceInfo;
        // 1.判断是否需要订阅服务信息(默认为 true)
        if (subscribe) {
            // 1.1.订阅服务信息
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                                                     StringUtils.join(clusters, ","));
        } else {
            // 1.2.直接去nacos拉取服务信息
            serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                                                  StringUtils.join(clusters, ","));
        }
        // 2.从服务信息中获取实例列表并返回
        List<Instance> list;
        if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
            return new ArrayList<Instance>();
        }
        return list;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    1.1.2.HostReactor

    进入订阅服务消息,这里是由HostReactor类的getServiceInfo()方法来实现的:

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        // 由 服务名@@集群名拼接 key
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        // 读取本地服务列表的缓存,缓存是一个Map,格式:Map
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        // 判断缓存是否存在
        if (null == serviceObj) {
            // 不存在,创建空ServiceInfo
            serviceObj = new ServiceInfo(serviceName, clusters);
            // 放入缓存
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            // 放入待更新的服务列表(updatingMap)中
            updatingMap.put(serviceName, new Object());
            // 立即更新服务列表
            updateServiceNow(serviceName, clusters);
            // 从待更新列表中移除
            updatingMap.remove(serviceName);
    
        } else if (updatingMap.containsKey(serviceName)) {
            // 缓存中有,但是需要更新
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish 等待5秒中,待更新完成
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                            .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        // 开启定时更新服务列表的功能
        scheduleUpdateIfAbsent(serviceName, clusters);
        // 返回缓存中的服务信息
        return serviceInfoMap.get(serviceObj.getKey());
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    基本逻辑就是先从本地缓存读,根据结果来选择:

    • 如果本地缓存没有,立即去nacos读取,updateServiceNow(serviceName, clusters)
      在这里插入图片描述
    • 如果本地缓存有,则开启定时更新功能,并返回缓存结果:
      • scheduleUpdateIfAbsent(serviceName, clusters)
        在这里插入图片描述
        UpdateTask中,最终还是调用updateService方法:
        在这里插入图片描述
        不管是立即更新服务列表,还是定时更新服务列表,最终都会执行HostReactor中的updateService()方法:
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
    		// 基于ServerProxy发起远程调用,查询服务列表
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
    
            if (StringUtils.isNotEmpty(result)) {
                // 处理查询结果
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    1.1.3.ServerProxy

    ServerProxyqueryList方法如下:

    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly)
        throws NacosException {
    	// 准备请求参数
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
    	// 发起请求,地址与API接口一致
        return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    1.2.处理服务变更通知

    除了定时更新服务列表的功能外,Nacos还支持服务列表变更时的主动推送功能。

    HostReactor类的构造函数中,有非常重要的几个步骤:
    在这里插入图片描述
    基本思路是:

    • 通过PushReceiver监听服务端推送的变更数据
    • 解析数据后,通过NotifyCenter发布服务变更的事件
    • InstanceChangeNotifier监听变更事件,完成对服务列表的更新

    1.2.1.PushReceiver

    我们先看PushReceiver,这个类会以UDP方式接收Nacos服务端推送的服务变更数据。

    先看构造函数:

    public PushReceiver(HostReactor hostReactor) {
        try {
            this.hostReactor = hostReactor;
            // 创建 UDP客户端
            String udpPort = getPushReceiverUdpPort();
            if (StringUtils.isEmpty(udpPort)) {
                this.udpSocket = new DatagramSocket();
            } else {
                this.udpSocket = new DatagramSocket(new InetSocketAddress(Integer.parseInt(udpPort)));
            }
            // 准备线程池
            this.executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setDaemon(true);
                    thread.setName("com.alibaba.nacos.naming.push.receiver");
                    return thread;
                }
            });
    		// 开启线程任务,准备接收变更数据
            this.executorService.execute(this);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] init udp socket failed", e);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    PushReceiver构造函数中基于线程池来运行任务。这是因为PushReceiver本身也是一个Runnable,其中的run方法业务逻辑如下:

    @Override
    public void run() {
        while (!closed) {
            try {
                // byte[] is initialized with 0 full filled by default
                byte[] buffer = new byte[UDP_MSS];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
    			// 接收推送数据
                udpSocket.receive(packet);
    			// 解析为json字符串
                String json = new String(IoUtils.tryDecompress(packet.getData()), UTF_8).trim();
                NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());
    			// 反序列化为对象
                PushPacket pushPacket = JacksonUtils.toObj(json, PushPacket.class);
                String ack;
                if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
                    // 交给 HostReactor去处理
                    hostReactor.processServiceJson(pushPacket.data);
    
                    // send ack to server 发送ACK回执,略。。
            } catch (Exception e) {
                if (closed) {
                    return;
                }
                NAMING_LOGGER.error("[NA] error while receiving push data", e);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    1.2.2.HostReactor

    通知数据的处理由交给了HostReactorprocessServiceJson方法:

    public ServiceInfo processServiceJson(String json) {
        // 解析出ServiceInfo信息
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        String serviceKey = serviceInfo.getKey();
        if (serviceKey == null) {
            return null;
        }
        // 查询缓存中的 ServiceInfo
        ServiceInfo oldService = serviceInfoMap.get(serviceKey);
    
        // 如果缓存存在,则需要校验哪些数据要更新
        boolean changed = false;
        if (oldService != null) {
    		// 拉取的数据是否已经过期
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                                   + serviceInfo.getLastRefTime());
            }
            // 放入缓存
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
    		
            // 中间是缓存与新数据的对比,得到newHosts:新增的实例;remvHosts:待移除的实例;
            // modHosts:需要修改的实例
            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                // 发布实例变更的事件
                NotifyCenter.publishEvent(new InstancesChangeEvent(
                    serviceInfo.getName(), serviceInfo.getGroupName(),
                    serviceInfo.getClusters(), serviceInfo.getHosts()));
                DiskCache.write(serviceInfo, cacheDir);
            }
    
        } else {
            // 本地缓存不存在
            changed = true;
            // 放入缓存
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            // 直接发布实例变更的事件
            NotifyCenter.publishEvent(new InstancesChangeEvent(
                serviceInfo.getName(), serviceInfo.getGroupName(),
                serviceInfo.getClusters(), serviceInfo.getHosts()));
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }
    	// 。。。
        return serviceInfo;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    2.服务端

    2.1.拉取服务列表接口

    在介绍的InstanceController中,提供了拉取服务列表的接口:

    /**
         * Get all instance of input service.
         *
         * @param request http request
         * @return list of instance
         * @throws Exception any error during list
         */
    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {
        // 从request中获取namespaceId和serviceName
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
    
        String agent = WebUtils.getUserAgent(request);
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        // 获取客户端的 UDP端口
        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    
        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    
        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
        // 获取服务列表
        return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
                         healthyOnly);
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    进入doSrvIpxt()方法来获取服务列表:

    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent,
                                String clusters, String clientIP,
                                int udpPort, String env, boolean isCheck,
                                String app, String tid, boolean healthyOnly) throws Exception {
        ClientInfo clientInfo = new ClientInfo(agent);
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        // 获取服务列表信息
        Service service = serviceManager.getService(namespaceId, serviceName);
        long cacheMillis = switchDomain.getDefaultCacheMillis();
    
        // now try to enable the push
        try {
            if (udpPort > 0 && pushService.canEnablePush(agent)) {
    			// 添加当前客户端 IP、UDP端口到 PushService 中
                pushService
                    .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                               pushDataSource, tid, app);
                cacheMillis = switchDomain.getPushCacheMillis(serviceName);
            }
        } catch (Exception e) {
            Loggers.SRV_LOG
                .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
            cacheMillis = switchDomain.getDefaultCacheMillis();
        }
    
        if (service == null) {
            // 如果没找到,返回空
            if (Loggers.SRV_LOG.isDebugEnabled()) {
                Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
            }
            result.put("name", serviceName);
            result.put("clusters", clusters);
            result.put("cacheMillis", cacheMillis);
            result.replace("hosts", JacksonUtils.createEmptyArrayNode());
            return result;
        }
    	// 结果的检测,异常实例的剔除等逻辑省略
        // 最终封装结果并返回 。。。
    
        result.replace("hosts", hosts);
        if (clientInfo.type == ClientInfo.ClientType.JAVA
            && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57

    2.2.发布服务变更的UDP通知

    在上一节中,InstanceController中的doSrvIpxt()方法中,有这样一行代码:

    pushService.addClient(namespaceId, serviceName, clusters, agent,
                          new InetSocketAddress(clientIP, udpPort),
                               pushDataSource, tid, app);
    
    
    • 1
    • 2
    • 3
    • 4

    其实是把消费者的UDP端口、IP等信息封装为一个PushClient对象,存储PushService中。方便以后服务变更后推送消息。

    PushService类本身实现了ApplicationListener接口:
    在这里插入图片描述
    这个是事件监听器接口,监听的是ServiceChangeEvent(服务变更事件)。

    当服务列表变化时,就会通知我们:
    在这里插入图片描述

    3.总结

    Nacos的服务发现分为两种模式:

    模式一:主动拉取模式,消费者定期主动从Nacos拉取服务列表并缓存起来,再服务调用时优先读取本地缓存中的服务列表。

    模式二:订阅模式,消费者订阅Nacos中的服务列表,并基于UDP协议来接收服务变更通知。当Nacos中的服务列表更新时,会发送UDP广播给所有订阅者。

    与Eureka相比,Nacos的订阅模式服务状态更新更及时,消费者更容易及时发现服务列表的变化,剔除故障服务。

  • 相关阅读:
    服务端Skynet(三)——启动lua服务
    Java File.list具有什么功能呢?
    个人C语言问题记录
    一次性解决office部署问题(即点即用等)
    Socket网络编程练习题二:客户端发送一条数据,接收服务端反馈的消息并打印;服务端接收数据并打印,再给客户端反馈消息
    【SQL语法基础】如何使用Python操作MySQL?
    PostgreSQL索引篇 | BTree
    OneFlow v0.8.0正式发布
    合肥工业大学自然语言处理实验报告
    数据库之关系运算和完整性约束(软考中级)
  • 原文地址:https://blog.csdn.net/BruceLiu_code/article/details/126168633