• 微服务组件--注册中心Spring Cloud Eureka分析


    Eureka核心功能点

    【1】服务注册(register):Eureka Client会通过发送REST请求的方式向Eureka Server注册自己的服务,提供自身的元数据,比如ip地址、端口、运行状况指标的url、主页地址等信息。Eureka Server接收到注册请求后,就会把这些元数据信息存储在一个双层的Map中。

    【2】服务续约(renew):在服务注册后,Eureka Client会维护一个心跳来持续通知Eureka Server,说明服务一直处于可用状态,防止被剔除。Eureka Client在默认的情况下会每隔30秒(eureka.instance.leaseRenewalIntervalInSeconds)发送一次心跳来进行服务续约。

    【3】服务同步(replicate):Eureka Server之间会互相进行注册,构建Eureka Server集群,不同Eureka Server之间会进行服务同步,用来保证服务信息的一致性。

    【4】获取服务(get registry):服务消费者(Eureka Client)在启动的时候,会发送一个REST请求给Eureka Server,获取上面注册的服务清单,并且缓存在Eureka Client本地,默认缓存30秒(eureka.client.registryFetchIntervalSeconds)。同时,为了性能考虑,EurekaServer也会维护一份只读的服务清单缓存,该缓存每隔30秒更新一次。

    【5】服务调用:服务消费者在获取到服务清单后,就可以根据清单中的服务列表信息,查找到其他服务的地址,从而进行远程调用。Eureka有Region和Zone的概念,一个Region可以包含多个Zone,在进行服务调用时,优先访问处于同一个Zone中的服务提供者。

    【6】服务下线(cancel):当Eureka Client需要关闭或重启时,就不希望在这个时间段内再有请求进来,所以,就需要提前先发送REST请求给Eureka Server,告诉Eureka Server自己要下线了,Eureka Server在收到请求后,就会把该服务状态置为下线(DOWN),并把该下线事件传播出去。

    【7】服务剔除(evict):有时候,服务实例可能会因为网络故障等原因导致不能提供服务,而此时该实例也没有发送请求给Eureka Server来进行服务下线,所以,还需要有服务剔除的机制。Eureka Server在启动的时候会创建一个定时任务,每隔一段时间(默认60秒),从当前服务清单中把超时没有续约(默认90秒,eureka.instance.leaseExpirationDurationInSeconds)的服务剔除。180s被剔除

    【8】自我保护:既然Eureka Server会定时剔除超时没有续约的服务,那就有可能出现一种场景,网络一段时间内发生了异常,所有的服务都没能够进行续约,Eureka Server就把所有的服务都剔除了,这样显然不太合理。所以,就有了自我保护机制,当短时间内,统计续约失败的比例,如果达到一定阈值,则会触发自我保护的机制,在该机制下,Eureka Server不会剔除任何的微服务,等到正常后,再退出自我保护机制。自我保护开关(eureka.server.enable-self-preservation: false)

     

    常见的问题

      【1】当eureka服务实例有注册或下线或有实例发生故障,内存注册表虽然会及时更新数据,但是客户端不一定能及时感知到,可能会过30秒才能感知到,因为客户端拉取注册表实例这里面有一个多级缓存机制。【实现的是最终一致性

      【2】还有服务剔除的不是默认90秒没心跳的实例,剔除的是180秒没心跳的实例(eureka的bug导致,注解有说明是因为加了两次过期时间,但是很小的BUG所有不修复了【在Lease结构里说明】)

      【3】分析eureka服务下线的情况

        1)图示

          

     

        2)说明

    复制代码
    1.客户端每个30s会发送心跳到服务端
    2.ReadOnlyCacheMap和ReadWriteCacheMap每30s同步一次
    3.客户端每隔30s同步一次ReadOnlyCacheMap
    4.ribbon缓存每隔30s同步一次【有负载均衡的情况】
    所以正常下线需要120s
    而非正常下线,外加上服务剔除的180s+60s的定时任务,也就是360s【6min】
    
    如果出现时间太长容易出现问题
    1.修改 ribbon 同步缓存的时间为 3 秒:ribbon.ServerListRefreshInterval = 3000
    2.修改客户端同步缓存时间为 3 秒 :eureka.client.registry-fetch-interval-seconds = 3
    3.心跳间隔时间修改为 3 秒:eureka.instance.lease-renewal-interval-in-seconds = 3
    4.超时剔除的时间改为 9 秒:eureka.instance.lease-expiration-duration-in-seconds = 9
    5.清理线程定时时间改为 5 秒执行一次:eureka.server.eviction-interval-timer-in-ms = 5000
    6.同步到只读缓存的时间修改为 3 秒一次:eureka.server.response-cache-update-interval-ms = 3000
    只读缓存其实是可以关闭的,通过修改参数eureka.server.use-read-only-response-cache = false可以做到
    正常下线就是 3+3+3+3=12 秒,非正常下线再加 18+5 秒为 35 秒。
    因为本质上服务剔除的是超时过期的,而lease可知过期时间实际上是两倍,也就是18s。考虑极端情况,18s刚好卡在定时任务的最后一刻,则是直接加上5s。
    此外,这里的极端情况,也就是从某一次心跳之后开始不正常的。
    复制代码

     

     

    源码精髓总结

    【1】服务端多级缓存设计思想

      1)在拉取注册表的时候:

        (1)首先从ReadOnlyCacheMap里查缓存的注册表。

        (2)若没有,就找ReadWriteCacheMap里缓存的注册表。

        (3)如果还没有,就从内存中获取实际的注册表数据。

      2)在注册表发生变更的时候:

        (1)会在内存中更新变更的注册表数据,同时过期掉ReadWriteCacheMap。

        (2)此过程不会影响ReadOnlyCacheMap提供人家查询注册表。

        (3)默认每30秒Eureka Server会将ReadWriteCacheMap更新到ReadOnlyCacheMap里

        (4)默认每180秒Eureka Server会将ReadWriteCacheMap里是数据失效

        (5)下次有服务拉取注册表,又会从内存中获取最新的数据了,同时填充 各级缓存

      3)多级缓存机制的优点:

        (1)尽可能保证了内存注册表数据不会出现频繁的读写冲突问题

        (2)并且进一步保证对Eureka Server的大量请求,都是快速从纯内存走,性能极高(可以稍微估计下对于一线互联网公司,内部上千个eureka client实例,每分钟对eureka大几千次的访问,一天就是上千万次的访问)

     

    【2】TimedSupervisorTask定时任务的设计

      1)从整体上看,TimedSupervisorTask是固定间隔的周期性任务,一旦遇到超时就会将下一个周期的间隔时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦新任务不再超时,间隔时间又会自动恢复为初始值,另外还有CAS来控制多线程同步。

     

    【3】增量更新中哈希码检验的设计

    //里面的一致性哈希码,本质上就是校验数据
    //如:服务器上全量块存的是【ABCDEFG】,此时它的哈希码便是全量块存的数据的哈希值,增量块存的是【FG】,
    //而我们客户端是【ABCD】,增量拉下来再合并,则为【ABCDFG】,得到的哈希值便会与全量哈希值不一致,代表了缺失一部分数据
    //故检验不对就会全量拉取

     

    【4】注册表的结构说明(这个仅是记录):

    复制代码
    实例信息存放的map,这是个两层的ConcurrentHashMap>>,外层map的key是appName,也就是服务名,内层map的key是instanceId,也就是实例名
    注册表map数据示例如下:
    {
        MICROSERVICE - PROVIDER - USER = {
            DESKTOP - 1 SLJLB7: microservice - provider - user: 8002 = com.netflix.eureka.lease.Lease @2cd36af6,
            DESKTOP - 1 SLJLB7: microservice - provider - user: 8001 = com.netflix.eureka.lease.Lease @600b7073
        },
        MICROSERVICE - PROVIDER - ORDER = {
            DESKTOP - 1 SLJLB7: microservice - provider - order: 8002 = com.netflix.eureka.lease.Lease @2cd36af6,
            DESKTOP - 1 SLJLB7: microservice - provider - order: 8001 = com.netflix.eureka.lease.Lease @600b7073
        }
    }
    复制代码

     

    Eureka服务端源码分析

    【1】分析注解@EnableEurekaServer是如何开启eurekaServer服务注册功能

    复制代码
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Import(EurekaServerMarkerConfiguration.class)
    public @interface EnableEurekaServer {}
    
    //注释有说:这个注解是为了激活Eureka相关的配置类EurekaServerAutoConfiguration类
    //但是却是导入了EurekaServerMarkerConfiguration类
    复制代码

    【2】分析导入的EurekaServerMarkerConfiguration类

    复制代码
    //注释说明:采用Marker的bean去激活EurekaServerAutoConfiguration类
    //但实际上并没有做什么,直接去EurekaServerAutoConfiguration类看他是怎么处理的
    @Configuration(proxyBeanMethods = false)
    public class EurekaServerMarkerConfiguration {
        @Bean
        public Marker eurekaServerMarkerBean() {
            return new Marker();
        }
    
        class Marker {}
    }
    复制代码

    【3】分析EurekaServerAutoConfiguration类

    复制代码
    @Configuration(proxyBeanMethods = false)
    @Import(EurekaServerInitializerConfiguration.class)
    //当发现了这里,便明白了,这个配置类要生效是必须要有Marker类的存在
    //而且EurekaServerAutoConfiguration类本身是基于SpringBoot的SPI机制,自动导入的
    @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
    @EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
    @PropertySource("classpath:/eureka/server.properties")
    public class EurekaServerAutoConfiguration implements WebMvcConfigurer {...}
    复制代码

    【4】分析EurekaServerAutoConfiguration类中的方法

    复制代码
    //初始化集群节点集合
    @Bean
    @ConditionalOnMissingBean
    public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
        return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
                this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
                replicationClientAdditionalFilters);
    }
    
    //初始化EurekaServer的相关配置
    @Configuration(proxyBeanMethods = false)
    protected static class EurekaServerConfigBeanConfiguration {
        @Bean
        @ConditionalOnMissingBean
        public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
            EurekaServerConfigBean server = new EurekaServerConfigBean();
            if (clientConfig.shouldRegisterWithEureka()) {
                // Set a sensible default if we are supposed to replicate
                server.setRegistrySyncRetries(5);
            }
            return server;
        }
    }
    
    //初始化一些接口,用于获取EurekaServer的信息
    @Bean
    @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
    public EurekaController eurekaController() {
        return new EurekaController(this.applicationInfoManager);
    }
    
    //基于EurekaServer的配置,注册表,集群节点集合,以及服务实例初始化EurekaServer上下文
    @Bean
    public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
        return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
    }
    
    //初始化经过包装的Eureka原生启动类
    @Bean
    public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
        return new EurekaServerBootstrap(this.applicationInfoManager,
                this.eurekaClientConfig, this.eurekaServerConfig, registry,
                serverContext);
    }
    
    //初始化集群注册表
    @Bean
    public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
        this.eurekaClient.getApplications(); // force initialization
        return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
                serverCodecs, this.eurekaClient,
                this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
                this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
    }
    复制代码

    【5】分析EurekaServerAutoConfiguration类导入的EurekaServerInitializerConfiguration

    复制代码
    //因为实现了SmartLifecycle接口,会在初始化完成后根据isAutoStartup()的返回值确认是否调用start()方法
    //故查看EurekaServerInitializerConfiguration类#start()方法
    @Override
    public void start() {
        new Thread(() -> {
            try {
                //初始化EurekaServer,同时启动Eureka Server
                eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
                log.info("Started Eureka Server");
                //发送Eureka注册事件
                publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
                // 设置启动的状态为true
                EurekaServerInitializerConfiguration.this.running = true;
                // 发送Eureka Start事件,其他还有各种事件,我们可以监听这种时间,然后做一些特定的业务需求
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
            }
            catch (Exception ex) {...}
        }).start();
    }
    
    //初始化EurekaServer的运行环境和上下文
    //EurekaServerBootstrap类#contextInitialized方法
    public void contextInitialized(ServletContext context) {
        try {
            //初始化运行环境
            initEurekaEnvironment();
            //初始化上下文
            initEurekaServerContext();
    
            context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
        }
        catch (Throwable e) {
            throw new RuntimeException(...);
        }
    }
    复制代码

    【6】分析初始化上下文initEurekaServerContext方法做了什么【进行了服务同步,服务剔除的启动】

    复制代码
    protected void initEurekaServerContext() throws Exception {
        // For backward compatibility
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
                XStream.PRIORITY_VERY_HIGH);
    
        if (isAws(this.applicationInfoManager.getInfo())) {
            this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                    this.eurekaClientConfig, this.registry, this.applicationInfoManager);
            this.awsBinder.start();
        }
        //初始化eureka server上下文
        EurekaServerContextHolder.initialize(this.serverContext);
    
        log.info("Initialized server context");
    
        // Copy registry from neighboring eureka node
        // 从相邻的eureka节点复制注册表
        int registryCount = this.registry.syncUp();
        // 默认每30秒发送心跳,1分钟就是2次
        // 修改eureka状态为up
        // 同时,这里面会开启一个定时任务,用于清理60秒没有心跳的客户端。自动下线
        // 根据属性值可知是PeerAwareInstanceRegistry类
        this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
        // Register all monitoring statistics.
        EurekaMonitors.registerAllStats();
    }
    
    //返回了一个EurekaServerContextHolder【其实就是将serverContext设置进入到里面当做属性值】
    public static synchronized void initialize(EurekaServerContext serverContext) {
        holder = new EurekaServerContextHolder(serverContext);
    }
    复制代码

    【7】服务同步的逻辑

    复制代码
    //进行服务同步
    @Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
        //从配置文件中拿到注册的节点
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) { break; }
            }
            //调用节点的http请求获取所有的服务实例
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            //将其他节点的实例注册到本节点
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {...}
                }
            }
        }
        return count;
    }
    复制代码

    【8】服务剔除的逻辑

    复制代码
    //进行服务剔除
    @Override
    public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
        // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
        // 计算每分钟最大续约数
        this.expectedNumberOfClientsSendingRenews = count;
        // 每分钟最小续约数
        updateRenewsPerMinThreshold();
        
        this.startupTime = System.currentTimeMillis();
        if (count > 0) {
            this.peerInstancesTransferEmptyOnStartup = false;
        }
        DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
        boolean isAws = Name.Amazon == selfName;
        if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
            primeAwsReplicas(applicationInfoManager);
        }
        logger.info("Changing status to UP");
        // 设置实例的状态为UP
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        // 开启定时任务,默认60秒执行一次,用于清理60秒之内没有续约的实例
        super.postInit();
    }
    
    protected void updateRenewsPerMinThreshold() {
        this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
                * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
                * serverConfig.getRenewalPercentThreshold());
    }
    
    protected void postInit() {
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        //服务剔除任务
        //evictionIntervalTimerInMs = 60 * 1000,即每60s执行一次,且延迟60s
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
    }
    //EvictionTask类#run方法
    @Override
    public void run() {
        try {
            long compensationTimeMs = getCompensationTimeMs();
            logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
            evict(compensationTimeMs);
        } catch (Throwable e) {...}
    }
    
    //剔除逻辑
    public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
    
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
    
        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
        List> expiredLeases = new ArrayList<>();
        for (Entry>> groupEntry : registry.entrySet()) {
            Map> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry> leaseEntry : leaseMap.entrySet()) {
                    Lease lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
    
        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
    
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);
    
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease lease = expiredLeases.get(i);
    
                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }
    复制代码

     

    Eureka客户端源码分析

    【1】根据SpringBoot自动装配先找出所有会调用的类

    复制代码
    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
    org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
    org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
    org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration
    
    org.springframework.cloud.bootstrap.BootstrapConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
    复制代码

    【2】找到对应的自动装配类EurekaClientAutoConfiguration类

    复制代码
    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @Import(DiscoveryClientOptionalArgsConfiguration.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @ConditionalOnDiscoveryEnabled
    @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
            CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
    @AutoConfigureAfter(name = {
            "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
            "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
            "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
    public class EurekaClientAutoConfiguration {
        //初始化EurekaClient的相关配置
        @Bean
        @ConditionalOnMissingBean(value = EurekaClientConfig.class,
                search = SearchStrategy.CURRENT)
        public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
            EurekaClientConfigBean client = new EurekaClientConfigBean();
            if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
                // We don't register during bootstrap by default, but there will be another
                // chance later.
                client.setRegisterWithEureka(false);
            }
            return client;
        }
    
        //Client启动时的自动注册Bean
        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
        public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
                ApplicationContext context, EurekaServiceRegistry registry,
                EurekaRegistration registration) {
            return new EurekaAutoServiceRegistration(context, registry, registration);
        }
    
        //EurekaClient配置类
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnMissingRefreshScope
        protected static class EurekaClientConfiguration {
    
            @Autowired
            private ApplicationContext context;
    
            @Autowired
            private AbstractDiscoveryClientOptionalArgs optionalArgs;
    
            @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
            public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
                return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
            }
    
            @Bean
            @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
            public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
                InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
                return new ApplicationInfoManager(config, instanceInfo);
            }
    
            @Bean
            @ConditionalOnBean(AutoServiceRegistrationProperties.class)
            @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
            public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider healthCheckHandler) {
                return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
                        .with(eurekaClient).with(healthCheckHandler).build();
            }
    
        }
    ....
    }
    复制代码

    【2.1】分析注解@AutoConfigureAfter导入的EurekaDiscoveryClientConfiguration类做了什么

    复制代码
    @Configuration(proxyBeanMethods = false)
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @ConditionalOnDiscoveryEnabled
    @ConditionalOnBlockingDiscoveryEnabled
    public class EurekaDiscoveryClientConfiguration {
    
        //基于EurekaClientAutoConfiguration的启动标志
        @Deprecated
        @Bean
        public Marker eurekaDiscoverClientMarker() {
            return new Marker();
        }
    
        //将EurekaClient包装成EurekaDiscoveryClient
        @Bean
        @ConditionalOnMissingBean
        public EurekaDiscoveryClient discoveryClient(EurekaClient client,
                EurekaClientConfig clientConfig) {
            return new EurekaDiscoveryClient(client, clientConfig);
        }
    
        //心跳检测的处理配置
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnProperty(value = "eureka.client.healthcheck.enabled",matchIfMissing = false)
        protected static class EurekaHealthCheckHandlerConfiguration {
    
            @Autowired(required = false)
            private StatusAggregator statusAggregator = new SimpleStatusAggregator();
    
            @Bean
            @ConditionalOnMissingBean(HealthCheckHandler.class)
            public EurekaHealthCheckHandler eurekaHealthCheckHandler() {
                return new EurekaHealthCheckHandler(this.statusAggregator);
            }
    
        }
    
        @Deprecated
        class Marker {
    
        }
    
        //定义了Client配置重刷的监听器
        @Configuration(proxyBeanMethods = false)
        @ConditionalOnClass(RefreshScopeRefreshedEvent.class)
        protected static class EurekaClientConfigurationRefresher implements ApplicationListener {
            ....
        }
    
    }
    
    //看得出来包装也只是将配置和客户端放在了一起
    public EurekaDiscoveryClient(EurekaClient eurekaClient,
            EurekaClientConfig clientConfig) {
        this.clientConfig = clientConfig;
        this.eurekaClient = eurekaClient;
    }
    复制代码

     

    【3】分析EurekaClient的相关配置EurekaClientConfigBean类

    复制代码
    //仅列举了部分
    @ConfigurationProperties(EurekaClientConfigBean.PREFIX)
    public class EurekaClientConfigBean implements EurekaClientConfig, Ordered {
        //客户端配置前缀
        public static final String PREFIX = "eureka.client";
        //public static final String DEFAULT_PREFIX = "/eureka";
        //默认的注册地址
        public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX + "/";
        //默认域
        public static final String DEFAULT_ZONE = "defaultZone";
    
        private static final int MINUTES = 60;
    
        //多长时间从注册中心服务端拉取一次服务信息,单位秒;这个就是主动拉取注册中心上所有服务的实例信息
        private int registryFetchIntervalSeconds = 30;
        //多长时间复制实例变化到eureka服务端,单位秒;这个配置是复制实例信息到注册中心
        private int instanceInfoReplicationIntervalSeconds = 30;
        //实例初始化复制信息到eureka服务端的间隔时间,所以可以看到,其实实例的初始化阶段不是立即复制实例信息到注册中心的,单位秒
        private int initialInstanceInfoReplicationIntervalSeconds = 40;
        //eureka服务端的变化,多长时间,客户端会获取一次eureka服务的信息
        private int eurekaServiceUrlPollIntervalSeconds = 5 * MINUTES;
        //eureka server的代理端口
        private String proxyPort;
        //eureka server的代理host name
        private String proxyHost;
        //账号
        private String proxyUserName;
        //密码
        private String proxyPassword;
        //从server读取所需的超时时间
        private int eurekaServerReadTimeoutSeconds = 8;
        //连接server的超时时间
        private int eurekaServerConnectTimeoutSeconds = 5;
        //被允许连接到所有server host的总连接数
        private int eurekaServerTotalConnections = 200;
        // 被允许连接到每一个server host的总连接数
        private int eurekaServerTotalConnectionsPerHost = 50;
        //连接到server的http连接的空闲超时时间,超时会被清理掉
        private int eurekaConnectionIdleTimeoutSeconds = 30;
        //heartbeatExecutor 心跳的线程数
        private int heartbeatExecutorThreadPoolSize = 2;
        //客户端初始化阶段强制注册,默认关闭
        private boolean shouldEnforceRegistrationAtInit = false;
    
    ...
    }
    复制代码

     

    【4】分析EurekaClientConfiguration配置类里面生成的EurekaClient的Bean

    复制代码
    //CloudEurekaClient类【继承DiscoveryClient类】#构造方法
    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, ApplicationEventPublisher publisher) {
        super(applicationInfoManager, config, args);
        this.applicationInfoManager = applicationInfoManager;
        this.publisher = publisher;
        this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
                "eurekaTransport");
        ReflectionUtils.makeAccessible(this.eurekaTransportField);
    }
    
    //DiscoveryClient类【继承EurekaClient(原生的EurekaClient)】#构造方法
    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
        this(applicationInfoManager, config, args, ResolverUtils::randomize);
    }
    
    public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
        //主要是这个this指向,毕竟里面的都是方法传参
        this(applicationInfoManager, config, args, new Provider() {
            private volatile BackupRegistry backupRegistryInstance;
    
            @Override
            public synchronized BackupRegistry get() {
                if (backupRegistryInstance == null) {
                    String backupRegistryClassName = config.getBackupRegistryImpl();
                    if (null != backupRegistryClassName) {
                        try {
                            backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                            logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                        } catch (InstantiationException e) {..} catch (IllegalAccessException e) {..} catch (ClassNotFoundException e) {...}
                    }
    
                    if (backupRegistryInstance == null) {
                        logger.warn("Using default backup registry implementation which does not do anything.");
                        backupRegistryInstance = new NotImplementedRegistryImpl();
                    }
                }
    
                return backupRegistryInstance;
            }
        }, randomizer);
    }
    复制代码

     

    【5】分析DiscoveryClient的构造方法

    复制代码
    @Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventListeners.addAll(args.getEventListeners());
            this.preRegistrationHandler = args.preRegistrationHandler;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.preRegistrationHandler = null;
        }
        
        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();
    
        clientConfig = config;
        staticClientConfig = clientConfig;
        transportConfig = config.getTransportConfig();
        instanceInfo = myInfo;
        if (myInfo != null) {
            appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
        } else {...}
    
        this.backupRegistryProvider = backupRegistryProvider;
        this.endpointRandomizer = endpointRandomizer;
        this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
        localRegionApps.set(new Applications());
    
        fetchRegistryGeneration = new AtomicLong(0);
    
        remoteRegionsToFetch = new AtomicReference(clientConfig.fetchRegistryForRemoteRegions());
        remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
    
        if (config.shouldFetchRegistry()) {
            this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        if (config.shouldRegisterWithEureka()) {
            this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        } else {
            this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
        }
    
        //从这里开始初始化Eureka Client
        if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
            logger.info("Client configured to neither register nor query for data.");
            scheduler = null;
            heartbeatExecutor = null;
            cacheRefreshExecutor = null;
            eurekaTransport = null;
            instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
    
            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
    
            return;  // no need to setup up an network tasks and we are done
        }
    
        try {
            // default size of 2 - 1 each for heartbeat and cacheRefresh
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            //心跳的线程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            //缓存重刷的线程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
    
            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, args);
    
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }
    
        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }
    
        // call and execute the pre registration handler before all background tasks (inc registration) is started
        if (this.preRegistrationHandler != null) {
            this.preRegistrationHandler.beforeRegistration();
        }
    
        if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
            try {
                if (!register() ) {
                    throw new IllegalStateException("Registration error at startup. Invalid server response.");
                }
            } catch (Throwable th) {...}
        }
    
        // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
        //最核心代码,初始化定时任务
        initScheduledTasks();
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {...}
    
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);
    
        initTimestampMs = System.currentTimeMillis();
    }
    复制代码

     

    【6】核心逻辑initScheduledTasks初始化定时任务,是做了什么

    复制代码
    /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        //获取服务注册列表信息
        if (clientConfig.shouldFetchRegistry()) {
            //服务注册列表更新的周期时间
            //默认是30
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            //定时更新服务注册列表
            //这里的延时任务明显是只调用一次,具体在分析他的任务的run方法
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()   //该线程执行更新的具体逻辑
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
    
        if (clientConfig.shouldRegisterWithEureka()) {
            //服务续约的周期时间
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            //应用启动可见此日志,内容是:Starting heartbeat executor: renew interval is: 30
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
            // Heartbeat timer
            // 服务定时续约
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()  //该线程执行续约的具体逻辑
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
    
            //这个Runable中含有服务注册的逻辑
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
    
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
    
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
    
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            //服务注册
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }
    复制代码

     

    【6.1】定时任务TimedSupervisorTask类的设计

    复制代码
    //TimedSupervisorTask类#run方法
    //这里存在一个设计的亮点
    public class TimedSupervisorTask extends TimerTask {
        ...
    
        public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                                   int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
            this.scheduler = scheduler;
            this.executor = executor;
            this.timeoutMillis = timeUnit.toMillis(timeout);
            this.task = task;  //可以看出任务还是需要根据传入来的
            this.delay = new AtomicLong(timeoutMillis);
            this.maxDelay = timeoutMillis * expBackOffBound;
            // Initialize the counters and register.
            successCounter = Monitors.newCounter("success");
            timeoutCounter = Monitors.newCounter("timeouts");
            rejectedCounter = Monitors.newCounter("rejectedExecutions");
            throwableCounter = Monitors.newCounter("throwables");
            threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
            Monitors.registerObject(name, this);
        }
    
        @Override
        public void run() {
            Future future = null;
            try {
                future = executor.submit(task);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                //设置了超时时间
                future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
                //出现任务不超时的情况又会将延迟时间重置(这里主要是配合下面捕捉异常的超时翻倍情况)
                delay.set(timeoutMillis);
                threadPoolLevelGauge.set((long) executor.getActiveCount());
                successCounter.increment();
            } catch (TimeoutException e) {
                logger.warn("task supervisor timed out", e);
                //出现超时的记录
                timeoutCounter.increment();
                //将超时时间翻倍(在最大的任务时间内),主动延迟
                long currentDelay = delay.get();
                long newDelay = Math.min(maxDelay, currentDelay * 2);
                //设置为最新的值,考虑到多线程,所以用了CAS
                delay.compareAndSet(currentDelay, newDelay);
    
            } catch (RejectedExecutionException e) {
                //一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉
                if (executor.isShutdown() || scheduler.isShutdown()) {
                    logger.warn("task supervisor shutting down, reject the task", e);
                } else {
                    logger.warn("task supervisor rejected the task", e);
                }
                //被拒绝的次数
                rejectedCounter.increment();
            } catch (Throwable e) {
                if (executor.isShutdown() || scheduler.isShutdown()) {
                    logger.warn("task supervisor shutting down, can't accept the task");
                } else {
                    logger.warn("task supervisor threw an exception", e);
                }
    
                throwableCounter.increment();
            } finally {
                if (future != null) {
                    //这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务;
                    future.cancel(true);
                }
                //只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务
                //任务里面又塞入这个任务
                if (!scheduler.isShutdown()) {
                    //假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50秒(构造方法的入参expBackOffBound)
                    //如果最近一次任务没有超时,那么就在30秒后开始新任务,
                    //如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作,乘以二后的60秒超过了最大间隔50秒)
                    scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
                }
            }
        }
    }
    复制代码

     

    【6.2】分析更新服务注册列表任务 CacheRefreshThread【获取服务逻辑】

    复制代码
    //DiscoveryClient类的内置类
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }
    
    //DiscoveryClient类#refreshRegistry方法
    @VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();
    
            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            //不做aws环境的配置这个if逻辑不会执行
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {....}
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }
            //获取注册信息方法
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
    
            if (logger.isDebugEnabled()) {...省略日志内容...}
        } catch (Throwable e) {...}
    }
    
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
        try {
            // 如果增量被禁用,或者是第一次,那么获取所有应用程序
            // 取出本地缓存之前获取的服务列表信息
            Applications applications = getApplications();
            //是否禁用增量更新
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    //是否第一次拉取
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                //全量获取
                getAndStoreFullRegistry();
            } else {
                //增量获取
                getAndUpdateDelta(applications);
            }
            //更新本地缓存
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    
        // Notify about cache refresh before updating the instance remote status
        //将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写
        onCacheRefreshed();
        // Update remote status based on refreshed data held in the cache
        //检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态,
        //当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态,
        //上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件
        updateInstanceRemoteStatus();
        // registry was fetched successfully, so return true
        return true;
    }
    
    @Override
    public Applications getApplications() {
        return localRegionApps.get();
    }
    复制代码

     

    【6.2.1】分析全量更新

    复制代码
    private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        Applications apps = null;
        //由于并没有配置特别关注的region信息,因此会调用eurekaTransport.queryClient.getApplications方法从服务端获取服务列表
        EurekaHttpResponse httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            //返回对象就是服务列表
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());
    
        if (apps == null) {...} 
        else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //考虑到多线程同步,只有CAS成功的线程,才会把自己从Eureka server获取的数据来替换本地缓存
            localRegionApps.set(this.filterAndShuffle(apps));
        } else {...}
    }
    
    //EurekaHttpClientDecorator类#getApplications方法
    @Override
    public EurekaHttpResponse getApplications(final String... regions) {
        //这里面涉及到配置是否重试
        return execute(new RequestExecutor() {
            @Override
            public EurekaHttpResponse execute(EurekaHttpClient delegate) {
                //调用AbstractJerseyEurekaHttpClient类
                return delegate.getApplications(regions);
            }
    
            @Override
            public RequestType getRequestType() {
                return RequestType.GetApplications;
            }
        });
    }
    
    @Override
    public EurekaHttpResponse getApplications(String... regions) {
        //取增量数据的path是"apps/delta"
        return getApplicationsInternal("apps/", regions);
    }
    
    //具体的请求响应处理都在此方法中
    private EurekaHttpResponse getApplicationsInternal(String urlPath, String[] regions) {
        ClientResponse response = null;
        String regionsParamValue = null;
        try {
            //jersey、resource这些关键词都预示着这是个restful请求
            WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
            if (regions != null && regions.length > 0) {
                regionsParamValue = StringUtil.join(regions);
                webResource = webResource.queryParam("regions", regionsParamValue);
            }
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            //发起网络请求,将响应封装成ClientResponse实例
            response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
    
            Applications applications = null;
            if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
                //取得全部应用信息
                applications = response.getEntity(Applications.class);
            }
            return anEurekaHttpResponse(response.getStatus(), Applications.class)
                    .headers(headersOf(response))
                    .entity(applications)
                    .build();
        } finally {
            if (response != null) {
                response.close();
            }
        }
    }
    
    //总结:获取全量数据,是通过jersey-client库的API向Eureka server发起restful请求http://localhost:8761/eureka/apps实现的,并将响应的服务列表数据放在一个成员变量中作为本地缓存
    复制代码

     

    【6.2.2】分析增量更新

    复制代码
    //分析增量更新
    //里面的一致性哈希码,本质上就是校验数据
    //如:服务器上全量块存的是【ABCDEFG】,此时它的哈希码便是全量块存的数据的哈希值,增量块存的是【FG】,
    //而我们客户端是【ABCD】,增量拉下来再合并,则为【ABCDFG】,得到的哈希值便会与全量哈希值不一致,代表了缺失一部分数据
    //故检验不对就会全量拉取
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        Applications delta = null;
        //增量信息是通过eurekaTransport.queryClient.getDelta方法完成的
        EurekaHttpResponse httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            //delta中保存了Eureka server返回的增量更新
            delta = httpResponse.getEntity();
        }
        //如果没有
        if (delta == null) {
            //如果增量信息为空,就直接发起一次全量更新
            getAndStoreFullRegistry();
        } 
        //考虑到多线程同步问题,这里通过CAS来确保请求发起到现在是线程安全的,
        //如果这期间fetchRegistryGeneration变了,就表示其他线程也做了类似操作,因此放弃本次响应的数据
        else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    //用Eureka返回的增量数据和本地数据做合并操作
                    updateDelta(delta);
                    //用合并了增量数据之后的本地数据来生成一致性哈希码
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {...}
            //Eureka server在返回增量更新数据时,也会返回服务端的一致性哈希码,
            //理论上每次本地缓存数据经历了多次增量更新后,计算出的一致性哈希码应该是和服务端一致的,
            //如果发现不一致,就证明本地缓存的服务列表信息和Eureka server不一致了,需要做一次全量更新
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                //一致性哈希码不同,就在reconcileAndLogDifference方法中做全量更新
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {...}
    }
    
    //updateDelta方法将增量更新数据和本地数据做合并
    private void updateDelta(Applications delta) {
        int deltaCount = 0;
        //遍历所有服务
        for (Application app : delta.getRegisteredApplications()) {
            //遍历当前服务的所有实例
            for (InstanceInfo instance : app.getInstances()) {
                //取出缓存的所有服务列表,用于合并
                Applications applications = getApplications();
                String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
                //判断正在处理的实例和当前应用是否在同一个region
                if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                    //如果不是同一个region,接下来合并的数据就换成专门为其他region准备的缓存
                    Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                    if (null == remoteApps) {
                        remoteApps = new Applications();
                        remoteRegionVsApps.put(instanceRegion, remoteApps);
                    }
                    applications = remoteApps;
                }
    
                ++deltaCount;
                 //对新增的实例的处理
                if (ActionType.ADDED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
                } 
                 //对修改实例的处理
                else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp == null) {
                        applications.addApplication(app);
                    }
                    logger.debug("Modified instance {} to the existing apps ", instance.getId());
    
                    applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
    
                } 
                //对删除实例的处理
                else if (ActionType.DELETED.equals(instance.getActionType())) {
                    Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                    if (existingApp != null) {
                        logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                        existingApp.removeInstance(instance);
                        /*
                         * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                         * if instance list is empty, we remove the application.
                         */
                        if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                            applications.removeApplication(existingApp);
                        }
                    }
                }
            }
        }
        logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);
    
        getApplications().setVersion(delta.getVersion());
        //整理数据,使得后续使用过程中,这些应用的实例总是以相同顺序返回
        getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        //和当前应用不在同一个region的应用,其实例数据也要整理
        for (Applications applications : remoteRegionVsApps.values()) {
            applications.setVersion(delta.getVersion());
            applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
        }
    }
    复制代码

     

    【6.3】分析服务定时续约任务 HeartbeatThread(也就是心跳机制)

    复制代码
    private class HeartbeatThread implements Runnable {
    
        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    
    boolean renew() {
        EurekaHttpResponse httpResponse;
        try {
            //发送心跳请求
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }
    复制代码

     

    【7】分析服务注册的instanceInfoReplicator.start方法

    复制代码
    public void start(int initialDelayMs) {
        if (started.compareAndSet(false, true)) {
            instanceInfo.setIsDirty();  // for initial register
            Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
    
    //InstanceInfoReplicator类#run方法
    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();
    
            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                //服务注册
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }
    
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse httpResponse;
        try {
            //发起注册请求
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
    复制代码

     

    【8】Eureka Server服务端Jersey接口部分分析

    【8.1】服务端Jersey接口处理类ApplicationResource

    复制代码
    @Produces({"application/xml", "application/json"})
    public class ApplicationResource {
        ...
        //注册一个实例的信息
        @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            // validate that the instanceinfo contains all the necessary required fields
            // 参数校验,不符合验证规则的,返回400状态码,
            if (isBlank(info.getId())) {
                return Response.status(400).entity("Missing instanceId").build();
            } else if (isBlank(info.getHostName())) {
                return Response.status(400).entity("Missing hostname").build();
            } else if (isBlank(info.getIPAddr())) {
                return Response.status(400).entity("Missing ip address").build();
            } else if (isBlank(info.getAppName())) {
                return Response.status(400).entity("Missing appName").build();
            } else if (!appName.equals(info.getAppName())) {
                return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
            } else if (info.getDataCenterInfo() == null) {
                return Response.status(400).entity("Missing dataCenterInfo").build();
            } else if (info.getDataCenterInfo().getName() == null) {
                return Response.status(400).entity("Missing dataCenterInfo Name").build();
            }
    
            // handle cases where clients may be registering with bad DataCenterInfo with missing data
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if (dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
                if (isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if (experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    } else if (dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                        String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                        if (effectiveId == null) {
                            amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }
            // 重点在这里,进行注册
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }
    
    }
    复制代码

     

    【8.1.1】注册方法分析

    复制代码
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
    
    //AbstractInstanceRegistry类#register方法
    public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            // 上只读锁
            read.lock();
            // 从本地MAP里面获取当前实例的信息
            //注册表的结构
            Map> gMap = registry.get(registrant.getAppName());
            // 增加注册次数到监控信息里面去。
            REGISTER.increment(isReplication);
            if (gMap == null) {
                // 如果第一次进来,那么gMap为空,则创建一个ConcurrentHashMap放入到registry里面去
                final ConcurrentHashMap> gNewMap = new ConcurrentHashMap>();
                // putIfAbsent方法主要是在向ConcurrentHashMap中添加键—值对的时候,它会先判断该键值对是否已经存在。
                // 如果不存在(新的entry),那么会向map中添加该键值对,并返回null。
                // 如果已经存在,那么不会覆盖已有的值,直接返回已经存在的值。
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    // 表明map中确实不存在,则设置gMap为最新创建的那个
                    gMap = gNewMap;
                }
            }
            // 从MAP中查询已经存在的Lease信息 (比如第二次来)
            Lease existingLease = gMap.get(registrant.getId());
            // 当Lease的对象不为空时。
            if (existingLease != null && (existingLease.getHolder() != null)) {
                // 当instance已经存在是,和客户端的instance的信息做比较,时间最新的那个,为有效instance信息
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
    
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    registrant = existingLease.getHolder();
                }
            } else {
                // 这里只有当existinglease不存在时,才会进来。 像那种恢复心跳,信息过期的,都不会进入这里。
                // Eureka‐Server的自我保护机制做的操作,为每分钟最大续约数+2 ,同时重新计算每分钟最小续约数
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
            }
            // 构建一个最新的Lease信息
            Lease lease = new Lease(registrant, leaseDuration);
            if (existingLease != null) {
                // 当原来存在Lease的信息时,设置他的serviceUpTimestamp, 保证服务开启的时间一直是第一次的那个
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            // 放入本地Map中
            gMap.put(registrant.getId(), lease);
            // 添加到最近的注册队列里面去,以时间戳作为Key, 名称作为value,主要是为了运维界面的统计数据。
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // 分析instanceStatus
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }
    
            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
    
            // 得到instanceStatus,判断是否是UP状态,
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            // 设置注册类型为添加
            registrant.setActionType(ActionType.ADDED);
            // 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取、
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
             // 清理缓存 ,传入的参数为key
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
        } finally {
            read.unlock();
        }
    }
    复制代码

     

    【8.1.1】分析Lease结构

    复制代码
    public class Lease {
    
        enum Action {
            Register, Cancel, Renew
        };
        //租约过期的时间常量,默认90秒,也就说90秒没有心跳过来,那么这边将会自动剔除该节点
        public static final int DEFAULT_DURATION_IN_SECS = 90;
        这个租约是属于谁的, 目前占用这个属性的是
        private T holder;
        //租约是啥时候过期的,当服务下线的时候,会过来更新这个时间戳registrationTimestamp : 租约的注册时间
        private long evictionTimestamp;
        private long registrationTimestamp;
        //服务启动时间 ,当客户端在注册的时候,instanceInfo的status 为UP的时候,则更新这个时间戳
        private long serviceUpTimestamp;
        //最后更新时间,每次续约的时候,都会更新这个时间戳,在判断实例是否过期时,需要用到这个属性。
        private volatile long lastUpdateTimestamp;
        //过期时间,毫秒单位
        private long duration;
    
        public Lease(T r, int durationInSecs) {
            holder = r;
            registrationTimestamp = System.currentTimeMillis();
            lastUpdateTimestamp = registrationTimestamp;
            duration = (durationInSecs * 1000);
    
        }
    
        //更新的时候设置过期时间为当前时间+90S
        public void renew() {
            lastUpdateTimestamp = System.currentTimeMillis() + duration;
    
        }
    
        public void cancel() {
            if (evictionTimestamp <= 0) {
                evictionTimestamp = System.currentTimeMillis();
            }
        }
    
        public void serviceUp() {
            if (serviceUpTimestamp == 0) {
                serviceUpTimestamp = System.currentTimeMillis();
            }
        }
    
        public void setServiceUpTimestamp(long serviceUpTimestamp) {
            this.serviceUpTimestamp = serviceUpTimestamp;
        }
    
        public boolean isExpired() {
            return isExpired(0l);
        }
        //这里面存在的问题是过期时间+90S
        //实际上也就是在更新时候的180s之后才算过期
        public boolean isExpired(long additionalLeaseMs) {
            return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
        }
    
        public long getRegistrationTimestamp() {
            return registrationTimestamp;
        }
    
        public long getLastRenewalTimestamp() {
            return lastUpdateTimestamp;
        }
    
        public long getEvictionTimestamp() {
            return evictionTimestamp;
        }
    
        public long getServiceUpTimestamp() {
            return serviceUpTimestamp;
        }
    
        public T getHolder() {
            return holder;
        }
    
    }
    复制代码

     

    【8.2】客户端Jersey接口处理类ApplicationsResource

    复制代码
    @Path("/{version}/apps")
    @Produces({"application/xml", "application/json"})
    public class ApplicationsResource {
        ...
        private final EurekaServerConfig serverConfig;
        private final PeerAwareInstanceRegistry registry;
        private final ResponseCache responseCache;
    
        @Inject
        ApplicationsResource(EurekaServerContext eurekaServer) {
            this.serverConfig = eurekaServer.getServerConfig();
            this.registry = eurekaServer.getRegistry();
            this.responseCache = registry.getResponseCache();
        }
    
        public ApplicationsResource() {
            this(EurekaServerContextHolder.getInstance().getServerContext());
        }
    
        //获取关于特定{@link com.netflix.discovery.shared.Application}的信息。
        @Path("{appId}")
        public ApplicationResource getApplicationResource(
                @PathParam("version") String version,
                @PathParam("appId") String appId) {
            CurrentRequestVersion.set(Version.toEnum(version));
            return new ApplicationResource(appId, serverConfig, registry);
        }
    
        //获取关于所有{@link com.netflix.discovery.shared.Applications}的信息。
        @GET
        public Response getContainers(@PathParam("version") String version,
                                      @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                      @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                      @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                      @Context UriInfo uriInfo,
                                      @Nullable @QueryParam("regions") String regionsStr) {
    
            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
            String[] regions = null;
            if (!isRemoteRegionRequested) {
                EurekaMonitors.GET_ALL.increment();
            } else {
                regions = regionsStr.toLowerCase().split(",");
                Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
                EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
            }
    
            // Check if the server allows the access to the registry. The server can
            // restrict access if it is not
            // ready to serve traffic depending on various reasons.
            if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
                return Response.status(Status.FORBIDDEN).build();
            }
            CurrentRequestVersion.set(Version.toEnum(version));
            KeyType keyType = Key.KeyType.JSON;
            String returnMediaType = MediaType.APPLICATION_JSON;
            if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
                keyType = Key.KeyType.XML;
                returnMediaType = MediaType.APPLICATION_XML;
            }
            //获取服务实例对应的缓存key
            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    
            Response response;
            //是否压缩
            if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                response = Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                //从缓存里获取服务实例注册信息
                response = Response.ok(responseCache.get(cacheKey))
                        .build();
            }
            return response;
        }
    
        //在{@link com.netflix.discovery.shared.Applications}中获取关于所有增量更改的信息。
        @Path("delta")
        @GET
        public Response getContainerDifferential(
                @PathParam("version") String version,
                @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
    
            boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
    
            // If the delta flag is disabled in discovery or if the lease expiration
            // has been disabled, redirect clients to get all instances
            if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
                return Response.status(Status.FORBIDDEN).build();
            }
    
            String[] regions = null;
            if (!isRemoteRegionRequested) {
                EurekaMonitors.GET_ALL_DELTA.increment();
            } else {
                regions = regionsStr.toLowerCase().split(",");
                Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
                EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
            }
    
            CurrentRequestVersion.set(Version.toEnum(version));
            KeyType keyType = Key.KeyType.JSON;
            String returnMediaType = MediaType.APPLICATION_JSON;
            if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
                keyType = Key.KeyType.XML;
                returnMediaType = MediaType.APPLICATION_XML;
            }
    
            Key cacheKey = new Key(Key.EntityType.Application,
                    ResponseCacheImpl.ALL_APPS_DELTA,
                    keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
            );
    
            if (acceptEncoding != null
                    && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
                return Response.ok(responseCache.getGZIP(cacheKey))
                        .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                        .header(HEADER_CONTENT_TYPE, returnMediaType)
                        .build();
            } else {
                return Response.ok(responseCache.get(cacheKey))
                        .build();
            }
        }
    }
    复制代码

     

    【8.2.1】ApplicationsResource类的getContainers方法分析

    复制代码
    //获取关于所有{@link com.netflix.discovery.shared.Applications}的信息。
    @GET
    public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
    
        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }
    
        // Check if the server allows the access to the registry. The server can
        // restrict access if it is not
        // ready to serve traffic depending on various reasons.
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
        //获取服务实例对应的缓存key
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );
    
        Response response;
        //是否压缩
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            //从缓存里获取服务实例注册信息,从ResponseCacheImpl类中获取
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        return response;
    }
    
    //分析responseCache.get方法
    //ResponseCacheImpl类#get方法
    public String get(final Key key) {
        return get(key, shouldUseReadOnlyResponseCache);
    }
    
    @VisibleForTesting
    String get(final Key key, boolean useReadOnlyCache) {
        Value payload = getValue(key, useReadOnlyCache);
        if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
            return null;
        } else {
            return payload.getPayload();
        }
    }
    
    //精髓设计的点,利用了读写分离,有种CopyOnWrite的思维
    //private final ConcurrentMap readOnlyCacheMap = new ConcurrentHashMap();
    //private final LoadingCache readWriteCacheMap;
    @VisibleForTesting
    Value getValue(final Key key, boolean useReadOnlyCache) {
        Value payload = null;
        try {
            //只读缓存的开启
            if (useReadOnlyCache) {
                final Value currentPayload = readOnlyCacheMap.get(key);
                //只读缓存拿不到才去读写缓存里面拿
                if (currentPayload != null) {
                    payload = currentPayload;
                } else {
                    payload = readWriteCacheMap.get(key);
                    readOnlyCacheMap.put(key, payload);
                }
            } else {
                payload = readWriteCacheMap.get(key);
            }
        } catch (Throwable t) {...}
        return payload;
    }
    
    //ResponseCacheImpl类#构造方法
    ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
        this.registry = registry;
    
        long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
        this.readWriteCacheMap =
                CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                        //读写缓存默认180秒会自动定时过期
                        .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                        .removalListener(new RemovalListener() {
                            @Override
                            public void onRemoval(RemovalNotification notification) {
                                Key removedKey = notification.getKey();
                                if (removedKey.hasRegions()) {
                                    Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                    regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                                }
                            }
                        })
                        .build(new CacheLoader() {
                            @Override
                            public Value load(Key key) throws Exception {
                                if (key.hasRegions()) {
                                    Key cloneWithNoRegions = key.cloneWithoutRegions();
                                    regionSpecificKeys.put(cloneWithNoRegions, key);
                                }
                                //从内存注册表中获取
                                Value value = generatePayload(key);
                                return value;
                            }
                        });
    
        if (shouldUseReadOnlyResponseCache) {
            //默认30秒用读写缓存的数据更新只读缓存的数据
            timer.schedule(getCacheUpdateTask(),
                    new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                            + responseCacheUpdateIntervalMs),
                    responseCacheUpdateIntervalMs);
        }
    
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {...}
    }
    复制代码

     

     

    Eureka服务端源码分析图


    Eureka服务端Jersey接口分析图

        

    Eureka客户端源码分析图

     

     

  • 相关阅读:
    Mac 卸载 PyCharm 方法
    java计算机毕业设计内容校园热点新闻推送网站源码+mysql数据库+系统+lw文档+部署
    Docker 搭建 Gitlab(新手村试炼版)
    期货开户市场的交易时间
    Android Kotlin 高阶详解
    【全面速懂】C#使用WSDL服务
    计算机网络:应用层知识点汇总
    # 用acme.sh申请证书(含泛域名)
    PCL 视图变换(OpenGL)
    规则链使用详解
  • 原文地址:https://www.cnblogs.com/chafry/p/16810064.html