• Eureka架构篇 - 服务续约与自我保护机制


    前言

    服务续约

    默认情况下,客户端的服务实例每隔30秒向Eureka服务端发送一次心跳。如果90秒之内Eureka服务端没有收到服务实例的心跳,该服务实例会被执行剔除任务的线程(每隔60秒执行一次)从注册的服务实例列表中剔除。

    自我保护机制

    如果15分钟之内,心跳发送失败的比例低于85%,就会触发Eureka服务端的自我保护机制。Eureka不会剔除通信不正常的服务实例,并且仍然接收客户端的服务的注册与服务的查询。但是不会与其它Eureka服务端节点进行同步。自我保护机制是一种针对网络异常波动的安全保护措施,可以使Eureka集群更加的健壮、稳定的运行。

    Peer to peer 架构

    Eureka对于多个副本之间的复制方式并没有采用主从复制,而是选择了对等复制,即peer to peer的模式。副本之间没有主从,每个副本都可以处理读写请求,通过彼此之间的数据复制使数据进行同步更新。但是数据同步可能会出现冲突。Eureka对此的解决方案是校验lastDirtyTimestamp

    校验lastDirtyTimestamp:在peer节点之间的复制请求中,如果其它peer节点传递的服务实例的lastDirtyTimestamp大于当前服务端本地存储的服务实例的lastDirtyTimestamp,则返回404,要求服务实例重新注册;如果小于,则返回409,要求其同步最新的数据信息。

    源码分析

    接下来对Eureka的服务续约的分析分为客户端逻辑和服务端逻辑两个部分。

    服务续约(客户端逻辑)

    一、DiscoveryClient

    在 DiscoveryClient 构造器方法中的 initScheduledTasks 方法,有如下内容:

    // 获取客户端需要向Eureka服务端发送心跳的时间间隔,默认30秒
    // 可以在配置文件中指定 eureka.instance.leaseRenewalIntervalInSeconds 来修改默认值
    int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
    
    // 获取心跳任务的指数补偿的相关属性,默认10
    // 可以在配置文件中指定 eureka.client.heartbeatExecutorExponentialBackOffBound 来修改默认值
    int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
    
    logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
    
    // 创建心跳任务
    heartbeatTask = new TimedSupervisorTask(
            "heartbeat",
            scheduler,
            heartbeatExecutor,
            renewalIntervalInSecs,
            TimeUnit.SECONDS,
            expBackOffBound,
            new HeartbeatThread()
    );
    // 初始延迟30秒执行心跳任务,之后每隔30秒重复执行一次
    scheduler.schedule(
            heartbeatTask,
            renewalIntervalInSecs, TimeUnit.SECONDS);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    二、TimedSupervisorTask

    @Override
    public void run() {
       
        Future<?> future = null;
        try {
       
          	// 执行 HeartbeatThread#run 方法
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            future.get(timeoutMillis, TimeUnit.MILLISECONDS); 
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
       
            logger.warn("task supervisor timed out", e);
            timeoutCounter.increment();
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            delay.compareAndSet(currentDelay, newDelay);
        } catch (RejectedExecutionException e) {
       
            if (executor.isShutdown() || scheduler.isShutdown()) {
       
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
       
                logger.warn("task supervisor rejected the task", e);
            }
            rejectedCounter.increment();
        } catch (Throwable e) {
       
            if (executor.isShutdown() || scheduler.isShutdown()) {
       
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
       
                logger.warn("task supervisor threw an exception", e);
            }
            throwableCounter.increment();
        } finally {
       
            if (future != null) {
       
                future.cancel(true);
            }
            if (!scheduler.isShutdown()) {
       
              	// 每隔30秒重复执行一次
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53

    三、HeartbeatThread

    private class HeartbeatThread implements Runnable {
       
    
        public void run() {
       
          	// 如果续约成功
            if (renew()) {
       
              	// 更新lastSuccessfulHeartbeatTimestamp属性为当前时间
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    boolean renew() {
       
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
       
          	// 向服务端发起服务续约
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
          	// 如果响应的状态码是404,则需要重新注册服务
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
       
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • 相关阅读:
    面向对象的封装、继承、多态
    logback--基础--02--配置--configuration
    zookeeper源码(10)node增删改查及监听
    linux:vi和vim的使用
    ISO7421D代替品 π122M31 双通道数字隔离器 更快更稳更安全
    原型模式(Prototype Pattern)
    virtualbox7 虚拟机与宿主机互传文件、共享u盘
    扩容LVM卷导致lvm元数据丢失的恢复过程
    关于<dependencyManagement>和<dependencies>
    无语,程序在main方法执行和在junit单元测试结果居然不一致
  • 原文地址:https://blog.csdn.net/qq_34561892/article/details/127926860