• Eureka 服务注册源码探秘——图解、源码级解析


    🍊 Java学习社区快速通道

    🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

    🍊 绝对不一样的职场干货:大厂最佳实践经验指南


    📆 最近更新:2023年5月2日


    🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


    引言

    服务注册是为了解决各个微服务的“你是谁”这个问题,即获取所有服务节点的身份信息和服务名称,站在注册中心的角度来看,有以下两种比较直观的解决方案:

    1. 由注册中心主动访问网络节点中所有机器
    2. 注册中心等待服务节点主动进行注册

    在这里插入图片描述

    目前主流的注册中心(Nacos、Eureka)都选择了第二种方案,主要原因是第一种方案有很多弊端:

    • 模型复杂: 网络结点构成了一张复杂的网,结点与结点之间的关系错综复杂,轮询每个节点的做法通常是注册中心发局域网广播,客户端响应的方式。现实中对于跨局域网的分布式系统来说,响应模型会更加复杂。

    • 网络开销大: 整个网络环境里会掺杂大量非服务节点,这些节点无需对送达的广播请求做出响应,这种广播的模式无疑增加了网络通信成本。

    • 服务端压力增加: 不仅要求注册中心向网络中所有节点主动发送广播请求,还需要对客户端的应答做出响应。考虑到注册中心的节点数远远少于服务节点,所以要尽可能地减轻服务中心承载的业务。


    一一对照着看,第二种实现方案就有如下优点:

    1. 注册中心压力小: 网络中其它非服务节点不会产生任何无效请求,也就不用做额外的判断
    2. 效率高: 省去了广播环节的时间,使注册效率大大提高
    3. 节省成本: 节省了大量网络请求的开销

    下面就来探索一下经典注册中心微服务 Eureka 服务注册源码。


    Eureka 服务注册源码

    寻找配置类

    要使用Eureka,就需要在SpringBoot的启动类上添加 @EnableDiscoveryClient 注解,所以我们的源码解析,从启动类上的 @EnableDiscoveryClient 注解开始:
    请添加图片描述

    在Eureka已经启动的状态下,以debug模式启动EurekaClientApplication,会来到这里面的断点:

    请添加图片描述
    其中metadatamain函数里挂的注解:

    请添加图片描述
    attributes是会获得@EnableDiscoveryClientEnableDiscoveryClient注解,接下来读取注解里面的autoRegister属性,如果是true的话,会发现之后导入了一个配置类:

    请添加图片描述

    寻找服务注册的元数据

    进入到该配置类:

    请添加图片描述
    继续进入到AutoServiceRegistrationProperties类里:
    请添加图片描述
    这些个属性一定会在某些配置项加载的流程中应用到,大家尝试找一下哪些类会引用它。

    其中你会找到AbstractAutoServiceRegistration,发现其在初始化的流程里使用到:

    protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {
        this.serviceRegistry = serviceRegistry;
        this.properties = properties;
    }
    
    • 1
    • 2
    • 3
    • 4

    同时还发现了一个服务注册属性serviceRegistry

    private final ServiceRegistry<R> serviceRegistry;
    
    • 1

    进入到ServiceRegistry的实现类EurekaServiceRegistry

    请添加图片描述
    进入到第一行的方法maybeInitializeClient里:

    private void maybeInitializeClient(EurekaRegistration reg) {
        reg.getApplicationInfoManager().getInfo();
        reg.getEurekaClient().getApplications();
    }
    
    • 1
    • 2
    • 3
    • 4

    继续进入到getInfo

    请添加图片描述
    请添加图片描述

    发现这里面的信息其实就是我们要向服务中心注册的东西。


    register方法

    接下来继续执行EurekaServiceRegistryregister方法:

    reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
    
    • 1

    首先设置了instance的状态,这里reg.getInstanceConfig().getInitialStatus()是UP


    这里的register并没有发起服务调用请求,所以还要通过调用栈来继续寻找。来到上一层EurekaAutoServiceRegistrationstart方法里:

    请添加图片描述
    停留在的这一行往上下文中发布了一个事件InstanceRegisteredEvent,但此时我们会发现服务其实并没有注册
    请添加图片描述
    说明在event发布前后肯定发生了什么事,让eureka服务提供者向注册中心发送了请求,既然event发布之后running的状态变为了true,那确实是运行起来了。


    下一个流程

    下一个流程在DiscoveryClient里,它封装了我们服务的client和注册中心之间的各种交互,里面有一个register方法

    请添加图片描述
    跟着断点继续往下走:
    请添加图片描述
    发现这里用的是SessionedEurekaHttpClient,接下来去找它的源码:

    请添加图片描述
    register方法在其父类EurekaHttpClientDecorator

    请添加图片描述
    通过一个子类实现的execute方法,参数是由父类传入的一个代理delegateexecute是由SessionedEurekaHttpClient子类实现的

    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        long now = System.currentTimeMillis();
        long delay = now - this.lastReconnectTimeStamp;
        if (delay >= this.currentSessionDurationMs) {
            logger.debug("Ending a session and starting anew");
            this.lastReconnectTimeStamp = now;
            this.currentSessionDurationMs = this.randomizeSessionDuration(this.sessionDurationMs);
            TransportUtils.shutdown((EurekaHttpClient)this.eurekaHttpClientRef.getAndSet((Object)null));
        }
    
        EurekaHttpClient eurekaHttpClient = (EurekaHttpClient)this.eurekaHttpClientRef.get();
        if (eurekaHttpClient == null) {
            eurekaHttpClient = TransportUtils.getOrSetAnotherClient(this.eurekaHttpClientRef, this.clientFactory.newClient());
        }
    
        return requestExecutor.execute(eurekaHttpClient);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    这一段代码尝试从HttpClient里拿实例,如果实例为空则会调用一个工具类的方法getOrSetAnotherClient去获取一个新的实例,但这里我们会发现其实并不为空:

    请添加图片描述
    这里调用了另一个httpclient。其中SessionedEurekaHttpClient用到了装饰器模式,主要装饰的功能是delay时间过长时重新启动一个session


    进入到下一层RetryableEurekaHttpClient,这一层装饰的功能是可以重试,默认最大重试次数为3:

    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        List<EurekaEndpoint> candidateHosts = null;
        int endpointIdx = 0;
    
        for(int retry = 0; retry < this.numberOfRetries; ++retry) {
            EurekaHttpClient currentHttpClient = (EurekaHttpClient)this.delegate.get();
            EurekaEndpoint currentEndpoint = null;
            if (currentHttpClient == null) {
                if (candidateHosts == null) {
                    candidateHosts = this.getHostCandidates();
                    if (candidateHosts.isEmpty()) {
                        throw new TransportException("There is no known eureka server; cluster server list is empty");
                    }
                }
    
                if (endpointIdx >= candidateHosts.size()) {
                    throw new TransportException("Cannot execute request on any known server");
                }
    
                currentEndpoint = (EurekaEndpoint)candidateHosts.get(endpointIdx++);
                currentHttpClient = this.clientFactory.newClient(currentEndpoint);
            }
    
            try {
                EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
                if (this.serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                    this.delegate.set(currentHttpClient);
                    if (retry > 0) {
                        logger.info("Request execution succeeded on retry #{}", retry);
                    }
    
                    return response;
                }
    
                logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
            } catch (Exception var8) {
                logger.warn("Request execution failed with message: {}", var8.getMessage());
            }
    
            this.delegate.compareAndSet(currentHttpClient, (Object)null);
            if (currentEndpoint != null) {
                this.quarantineSet.add(currentEndpoint);
            }
        }
    
        throw new TransportException("Retry limit reached; giving up on completing the request");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    其中this.getHostCandidates();获取的是注册中心:

    private List<EurekaEndpoint> getHostCandidates() {
        List<EurekaEndpoint> candidateHosts = this.clusterResolver.getClusterEndpoints();
        this.quarantineSet.retainAll((Collection)candidateHosts);
        int threshold = (int)((double)((List)candidateHosts).size() * this.transportConfig.getRetryableClientQuarantineRefreshPercentage());
        if (threshold > ((List)candidateHosts).size()) {
            threshold = ((List)candidateHosts).size();
        }
    
        if (!this.quarantineSet.isEmpty()) {
            if (this.quarantineSet.size() >= threshold) {
                logger.debug("Clearing quarantined list of size {}", this.quarantineSet.size());
                this.quarantineSet.clear();
            } else {
                List<EurekaEndpoint> remainingHosts = new ArrayList(((List)candidateHosts).size());
                Iterator var4 = ((List)candidateHosts).iterator();
    
                while(var4.hasNext()) {
                    EurekaEndpoint endpoint = (EurekaEndpoint)var4.next();
                    if (!this.quarantineSet.contains(endpoint)) {
                        remainingHosts.add(endpoint);
                    }
                }
    
                candidateHosts = remainingHosts;
            }
        }
    
        return (List)candidateHosts;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    如果坏注册中心节点的数量超过了阈值(66%),则要重启。quarantineSet存储的是失败的注册中心,remainingHosts存储的是成功的注册中心。


    继续execute

    回到上面的execute方法里,如果重试的索引大于候选注册中心的size时,就表示已知的所有注册中心都不能处理注册请求,此时会抛一个异常出来:

    if (endpointIdx >= candidateHosts.size()) {
        throw new TransportException("Cannot execute request on any known server");
    }
    
    currentEndpoint = (EurekaEndpoint)candidateHosts.get(endpointIdx++);
    currentHttpClient = this.clientFactory.newClient(currentEndpoint);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果在某一层execute成功了,则会将deligate设置为当前的client,如果不成功则会通过CAS操作将currentHttpClient设置为空,然后放置到失效的EurekaEndpoint加入到quarantineSet,下次不用了
    请添加图片描述
    此时还有好多层装饰器,这里直接快进跳到最后一层AbstractJerseyEurekaHttpClient中的register方法:

    public EurekaHttpResponse<Void> register(InstanceInfo info) {
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
    
        EurekaHttpResponse var5;
        try {
            Builder resourceBuilder = this.jerseyClient.resource(this.serviceUrl).path(urlPath).getRequestBuilder();
            this.addExtraHeaders(resourceBuilder);
            response = (ClientResponse)((Builder)((Builder)((Builder)resourceBuilder.header("Accept-Encoding", "gzip")).type(MediaType.APPLICATION_JSON_TYPE)).accept(new String[]{"application/json"})).post(ClientResponse.class, info);
            var5 = EurekaHttpResponse.anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", new Object[]{this.serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()});
            }
    
            if (response != null) {
                response.close();
            }
        }
        return var5;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    在这里发送了http请求,info里面存的是当前服务的所有信息

    请添加图片描述
    这一步结束之后就注册成功了

    请添加图片描述

  • 相关阅读:
    【uniapp】查看协议文件
    [源码解析] TensorFlow 分布式 DistributedStrategy 之基础篇
    Offset explorer连接Kerberos认证的Kafka
    Java版本+企业电子招投标系统源代码+支持二开+招投标系统+中小型企业采购供应商招投标平台
    LeetCode 367. 有效的完全平方数
    LeetCode --- 1512. Number of Good Pairs 解题报告
    程序的编译和链接
    【电源专题】案例:异常样机为什么只在40%以下电量时与其他样机显示电量差异10%,40%以上电量差异却都在5%以内。
    学习笔记|ADC|NTC原理|测温程序|STC32G单片机视频开发教程(冲哥)|第十九集:ADC应用之NTC
    鲲鹏生态下,长沙“计算”产业再登新高度
  • 原文地址:https://blog.csdn.net/HNU_Csee_wjw/article/details/124244398