• nacos服务注册详解


    一,nacos的源码剖析

    1,nacos主启动类

    可以看出这个nacos其实就是一个基于springboot的大型的web服务应用

    @SpringBootApplication(scanBasePackages = "com.alibaba.nacos")
    @ServletComponentScan
    @EnableScheduling
    public class Nacos {
        public static void main(String[] args) {
            SpringApplication.run(Nacos.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    2,将服务注册到服务中心

    在这个NacosNamingService类里面,有这个将服务注册到这个注册中心的具体逻辑

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        if (instance.isEphemeral()) {
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        //将这个服务注册到这个nacos的服务中心
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    注册方法的具体实现如下,主要是通过这个registerService方法实现,最后通过这个reqApi的方法,让这个客户端和这个服务端发起http请求,会将一些ip,端口号等一些全部加载到这个服务注册中心里面。

    public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
        final Map<String, String> params = new HashMap<String, String>(16);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.GROUP_NAME, groupName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("weight", String.valueOf(instance.getWeight()));
        params.put("enable", String.valueOf(instance.isEnabled()));
        params.put("healthy", String.valueOf(instance.isHealthy()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST);  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    客户端和服务端发起http请求

    HttpRestResult<String> restResult = 
    nacosRestTemplate.exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class);
    
    • 1
    • 2

    3,服务发现

    主要是通过这个nacosAutoServiceRegistration类来实现这个服务的发现

    @Bean
    @ConditionalOnBean({AutoServiceRegistrationProperties.class})
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    register注册方法的具体实现如下

    @Override
    public void register(Registration registration) {
    	if (StringUtils.isEmpty(registration.getServiceId())) {
    		return;
    	}
        //服务注册
    	NamingService namingService = namingService();
    	String serviceId = registration.getServiceId();
    	String group = nacosDiscoveryProperties.getGroup();
    	//构建客户端的ip,端口号等,完成最终的初始化
    	Instance instance = getNacosInstanceFromRegistration(registration);
    	try {
            //在这个注册中心里面注册这个服务的id,组等
    		namingService.registerInstance(serviceId, group, instance);
    	}
    	catch (Exception e) {
    		rethrowRuntimeException(e);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    4,实例注册的具体实现

    由于整个nacos就是一个springboot的应用,因此只需要找到具体的controller就可以找到这个实例注册的的具体实现,因此可以查看这个InstanceController类。该类的主要核心方法如下

    4.1,实例的注册

    public String register(HttpServletRequest request) throws Exception {
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        //将request转化为实例
        final Instance instance = parseInstance(request);
        //服务管理将服务进行注册
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    进行服务注册的具体流程,主要是在这个registerInstance类下面。

    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //创建一个空的实例服务,里面会对这个服务进行一个初始化
    	createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    	Service service = getService(namespaceId, serviceName);
    	if (service == null) {
        	throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " +serviceName);
            }
        //添加服务的具体实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    在这个创建空的实例createEmptyService方法里面,调用了一个putServiceAndInit的方法,会将这个服务加入到一个注册表里面,并且会对这个服务进行一个初始化

    private void putServiceAndInit(Service service) throws NacosException {
        putService(service);
        service = getService(service.getNamespaceId(), service.getName());
        service.init();
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(),service.getName(), true), service);
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    服务初始化init方法的具体操作如下

    public void init() {
        //会定时的向服务端发起心跳
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    4.2,添加服务addInstance方法

    里面会有一个重要的key,由这个字段ephemeral和其他字段组成,如果为true,那就是将数据加入到内存,false就是将数据加入到这个磁盘,用于区分是ap架构还是cp架构。

    public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        //生成一个字符串,由各种前缀组成
        //ephemeral:是否是临时实例,默认为true
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        Service service = getService(namespaceId, serviceName);
        synchronized (service) {
            //注册实例到一个map里面,最后将全部map以一个list返回
            List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); 
            Instances instances = new Instances();
            instances.setInstanceList(instanceList);
            //最后会将这个key和全部的实例形成一个映射
            consistencyService.put(key, instances);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    其put的的具体流程如下,里面主要通过这个onput方法对这个服务进行上传到这个注册列表里面

    @Override
    public void put(String key, Record value) throws NacosException {
        onPut(key, value);
        distroProtocol.sync(
            new DistroKey(key,
                          KeyBuilder.INSTANCE_LIST_KEY_PREFIX),
            			  DataOperation.CHANGE,
                          globalConfig.getTaskDispatchPeriod() / 2);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    其核心的方法onPut,主要是将这个实例添加的具体操作。将客户端传的参数封装到这个Pair里面,并通过一个任务的形式加入到这个内存队列里面,这个内存队列就是一个阻塞队列。

    tasks.offer(Pair.with(datumKey, action));
    
    • 1

    在加入到这个内存队列之后,会有线程以异步的方式来处理这个内存队列里面的任务。

    Pair<String, DataOperation> pair = tasks.take();
    handle(pair);
    
    • 1
    • 2

    这个handle方法就是具体的对这个线程执行的方法,其核心内容如下。主要是用于添加或者删除这个服务。

    DataOperation action = pair.getValue1();
    if (action == DataOperation.CHANGE) {
        listener.onChange(datumKey, dataStore.get(datumKey).value);
        continue;
    }
    if (action == DataOperation.DELETE) {
        listener.onDelete(datumKey);
        continue;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    添加服务的流程如下

    @Override
    public void onChange(String key, Instances value) throws Exception {
    	updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        recalculateChecksum();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    最后在这个updateIPs的方法里面,可以看到将这个instance实例注册到这注册表里面。

    //注册表架构,是ap架构还是cp架构
    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    4.3,copyOnWrite实现实例的添加

    如果存在多个实例同时的添加,主要通过这个写时复制,读写分离的方式来保证这个并发的问题。就是在添加实例的时候,会添加一个副本来记录原来的实例和新加入的实例,新的实例添加成功之后就会将这个副本作为新的注册表,将原来的替换掉。

    其源码在依旧在上面的这个updateIPs方法里面

    Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
    //创建一个副本Map
    HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
    //将原来的注册表的信息加入到这个副本里面
    for (Instance ip : toUpdateInstances) {
        oldIpMap.put(ip.getDatumKey(), ip);
    }
    //将现在的和原来的注册表实例进行一个对比,其就是一个更新和添加操作,如修改了配置文件等
    List<Instance> newIPs = subtract(ips, oldIpMap.values());
    //将某些服务进行一个剔除的操作,如某些实例超时,不健康等
    List<Instance> deadIPs = subtract(oldIpMap.values(), ips);
    toUpdateInstances = new HashSet<>(ips);
    //最后将这个副本作为新的注册表
    if (ephemeral) {
        ephemeralInstances = toUpdateInstances;
    } else {
        persistentInstances = toUpdateInstances;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    5,实例注册总结

    客户端向这个服务端发起http请求,服务端接收到请求之后,会将这个instance实例加入到这个阻塞队列里面,然后后端会异步的开启一个线程来读取这个实例以及实例的各个参数,并且通过写时复制的方法,将这个实例注册到一个双重注册表里面的一个set集合里面去

  • 相关阅读:
    SATA系列专题之三:3.2 Transport Layer传输层FIS Retry机制解析
    【21天算法挑战赛】排序算法——直接插入排序
    淘宝等电商平台API接口评论,item_review-获得商品评论
    逍遥自在学C语言 | 逻辑运算符
    Joplin Typora 粘贴图片 | 当使用Typora作为Joplin编辑器时,如何粘贴图片并上传到Joplin服务器,替换链接
    PHP代码审计--百家CMS4.1.4项目实战(下)
    MYSQL经典面试题
    naive-ui-admin跨域失败
    『现学现忘』Git后悔药 — 30、版本回退git reset --hard命令说明
    ES查询数据的时报错:circuit_breaking_exception[[parent] Data too large
  • 原文地址:https://blog.csdn.net/zhenghuishengq/article/details/126357908