• sentinel架构底层原理剖析详解


    一,sentinel源码分析

    首先查看这个引入的依赖,

    <!-- sentinel依赖-->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
        <version>2021.1</version>
    </dependency>
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    接下来看这个外部库里面对应的jar包,然后主要看里面的这个spring.factories这个文件
    在这里插入图片描述
    这个factories的文件里面的内容如下,因此主要分析这个带有autoConfiguration的这个类。

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    com.alibaba.cloud.sentinel.SentinelWebAutoConfiguration,\
    com.alibaba.cloud.sentinel.SentinelWebFluxAutoConfiguration,\
    com.alibaba.cloud.sentinel.endpoint.SentinelEndpointAutoConfiguration,\
    com.alibaba.cloud.sentinel.custom.SentinelAutoConfiguration,\
    com.alibaba.cloud.sentinel.feign.SentinelFeignAutoConfiguration
    org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
    com.alibaba.cloud.sentinel.custom.SentinelCircuitBreakerConfiguration
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    二,SentinelAutoConfiguration

    1,aop的底层实现

    aop依赖与ioc,在生产bean并进行实例化之前,先通过bean的第一个后置处理器找到所有在类上面加@AspectJ这个注解的所有类,并在这个类的里面找到所有的befeore,after等注解的方法,每一个before,after等都会生成一个对应的advisor,每个advisor包括advise和pointcut,advise主要是用来作为一个增强器的使用,pointcut是为了进行匹配,匹配成功才进行最终的动态代理的生成。最后获取到所有的advisors,由于可能有大量的advisor,因此在bean的最后一个后置处理器才对这些所有的advisor进行处理,即在bean进行初始化之后才进行处理。最后会去循环遍历这些advisors,通过advisors里面封装的pointcut和生成的advisor进行比较,如果匹配成功,则说明bean需要创建动态代理。主要是通过责任链的方式实现

    2,SentinelAutoConfiguration源码分析

    其主类如下,有一个@ConditionalOnProperty这个注解,这个主要是在springboot在整个容器启动的时候会进行一个自动加载。

    @Configuration(proxyBeanMethods = false)
    @ConditionalOnProperty(name = "spring.cloud.sentinel.enabled", matchIfMissing = true)
    @EnableConfigurationProperties(SentinelProperties.class)
    public class SentinelAutoConfiguration {
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这个类里面,有一个@Bean的一个注解,主要是为了向这个容器里面注入一个SentinelResourceAspect的这个实例。

    @Bean
    @ConditionalOnMissingBean
    public SentinelResourceAspect sentinelResourceAspect() {
    	return new SentinelResourceAspect();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    SentinelResourceAspect这个类如下,在这个方法上有一个**@AspectJ**的这个注解,因此可以知道这个类使用了这个aop来实现,

    @Aspect
    public class SentinelResourceAspect extends AbstractSentinelAspectSupport{
        
    }
    
    • 1
    • 2
    • 3
    • 4

    在这个类里面,有两个方法,上面有一些@Around,@Pointcut的注解,主要是为了生成一些advisor

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {
        
    }
    
    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable{
        try {
            //sentinel的核心代码
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            Object result = pjp.proceed();
            return result;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    通过这个entry方法最终可以进入到一个entryWithPriority方法里面

    private Entry entryWithPriority(){
        //责任链模式
        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
        //通过这个链路调用链路上的某一个槽点
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    接下来主要查看这个lookProcessChain的这个方法,里面主要是初始化了一个链条

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper){
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        if (chain == null){
            chain = SlotChainProvider.newSlotChain();
        }
        return chain;
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    接下来主要看这个newSlotChain方法,主要是看这个实例化链路的具体过程。

    public static ProcessorSlotChain newSlotChain(){
         slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class); 
        return slotChainBuilder.build();
    }
    
    • 1
    • 2
    • 3
    • 4

    然后进入这个build方法,看里面的具体build的构建流程

    public ProcessorSlotChain build(){
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        //通过这个spi机制进行一个解耦
        //将一些类进行一个加载
        List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
        for (ProcessorSlot slot : sortedSlotList) {
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                continue;
            }
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }
        return chain;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这个链路的添加方法如下,主要是将这些slot槽点加入到这个链条里面,主要是通过这个责任链的模式实现。

    //头插法
    @Override
    public void addFirst(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        protocolProcessor.setNext(first.getNext());
        first.setNext(protocolProcessor);
        if (end == first) {
            end = protocolProcessor;
        }
    }
    //尾插法
    @Override
    public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) {
        end.setNext(protocolProcessor);
        end = protocolProcessor;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    再链路构建好了之后,就会去调用这个链路里面的slot槽点
    在这里插入图片描述
    接下来主要查看这个DefaultProcessorSlotChain默认类里面的槽点的entry方法

     @Override
    public void entry(){
        throws Throwable {
        first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后在这个transformEntry方法里面,又有一个entry方法,可以发现来里面可以调用多个槽点

     void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
        throws Throwable {
        T t = (T)o;
        entry(context, resourceWrapper, t, count, prioritized, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述
    然后可以进入这个NodeSelectorSlot这个类里面,主要是构建一个资源的请求路径,然后通过这个fireEntry的方法,来实现各个slot的结点的遍历

    public void entry throws Throwable {
    	DefaultNode node = map.get(context.getName());
    	if (node == null) {
            synchronized (this) {
                node = map.get(context.getName());
                if (node == null) {
                    node = new DefaultNode(resourceWrapper, null);
                    HashMap cacheMap = new HashMap(map.size());
                    cacheMap.putAll(map);
                    cacheMap.put(context.getName(), node);
                    map = cacheMap;
                    // Build invocation tree
                    ((DefaultNode) context.getLastNode()).addChild(node);
                }
    
            }
        }
        context.setCurNode(node);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    3,FlowSlot流控规则

    通过上面的源码分析可知,这个sentinel主要是通过这个责任链的方式实现,这个链路上面主要会有大量的slot槽点,通过会通过这个fireEntry方法按顺序来依次遍历一下的槽点,接下来主要分析一下这个FlowSlot的这个限流的这个槽点,每一个槽点就是对应的一个规则
    在这里插入图片描述
    在进入这个规则之后,首先会有一个流量的校验规则,通过这个checkFlorw的方法实现。

    @Override
    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,boolean prioritized, Object... args) throws Throwable {
        //校验流量
        checkFlow(resourceWrapper, context, node, count, prioritized);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    可以查看这个checkFlow的这个规则,会获取所有的这个流控规则,如一些阈值的设置,流控模式,流控效果等,如果校验不通过,那么就会直接抛异常

    public void checkFlow(){
         Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    那么主要是通过这个canPassCheck方法来判断校验是否通过

    public boolean canPassCheck(){
        //返回一个本地的检测
        return passLocalCheck(rule, context, node, acquireCount, prioritized)
    }
    
    • 1
    • 2
    • 3
    • 4

    在这个passLocalCheck检测方法里面,会有一个具体的一个规则的判断,会通过这个用户在界面上的选项来进行一个判断,如流控模式是选的直连,关联还是链路

    private static boolean passLocalCheck() {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    然后在获取了所有的用户的值之后,会进行一个最终的判断,如判断这个qps或者并发数是否超过这个阈值,其逻辑主要是在这个canPass这个方法里面实现。如果不能通过,那么会直接返回一个false,并且直接会抛一个异常,链路直接断掉。并且这个异常可以进入一个HandlerBlockExecption。

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    如果没有出现异常,那么就会继续执行这个以下的业务流程。

    4,DegradeSlot熔断规则

    类似于这种电路的熔断器一样,主要有三种状态,分别是打开状态,半开状态和关闭状态。

    @SpiOrder(-1000)
    public class DegradeSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
    
    }
    
    • 1
    • 2
    • 3
    • 4

    在这个方法里面,有一个entry方法,就是加入这个槽点的一个方法

    @Override
    public void entry() throws Throwable {
        performChecking(context, resourceWrapper);
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    然后这个performChecking方法就是这个熔断器的核心方法,会去获取所有的已经配置的熔断规则,然后去校验每一个断路规则。

    void performChecking(Context context, ResourceWrapper r) throws BlockException {
        //获取全部的配置的熔断规则
    	List<CircuitBreaker> cir = DegradeRuleManager.getCircuitBreakers(r.getName());
        if (cir == null || cir.isEmpty()) {
            return;
        }
        for (CircuitBreaker cb : cir) {
            if (!cb.tryPass(context)) {
                throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    然后主要查看这个tryPass的这个方法,如果没有触发这个断路器,那么直接返回true

    @Override
    public boolean tryPass(Context context) {
        // Template implementation.
        if (currentState.get() == State.CLOSED) {
            return true;
        }
        if (currentState.get() == State.OPEN) {
            // For half-open state we allow a request for probing.
            return retryTimeoutArrived() && fromOpenToHalfOpen(context);
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    如果出现这个熔断,那么就会进入这个retryTimeoutArrived方法,如果当前时间大于这个熔断的时间,就会进入一个半开状态

    protected boolean retryTimeoutArrived() {
        return TimeUtil.currentTimeMillis() >= nextRetryTimestamp;
    }
    
    protected void updateNextRetryTimestamp() {
        this.nextRetryTimestamp = TimeUtil.currentTimeMillis() + recoveryTimeoutMs;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    半开状态,主要是通过这个fromOpenToHalfOpen实现,会给这个状态做一个标记,作为一个半开状态。就是允许这个客户端发起一次这个请求,看执行结果是否通过,如果通过,则恢复成关闭状态,否则回到之前的全开状态。

    protected boolean fromOpenToHalfOpen(Context context) {
        if (currentState.compareAndSet(State.OPEN, State.HALF_OPEN)) {
            notifyObservers(State.OPEN, State.HALF_OPEN, null);
            Entry entry = context.getCurEntry();
            entry.whenTerminate(new BiConsumer<Context, Entry>() {
                @Override
                public void accept(Context context, Entry entry) {
                    if (entry.getBlockError() != null) { 
                        currentState.compareAndSet(State.HALF_OPEN, State.OPEN);
                        notifyObservers(State.HALF_OPEN, State.OPEN, 1.0d);
                    }
                }
            });
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    在关闭时有一个这个exit的这个方法,这里面实现了这个状态的切换

    public void exit(){
        if (curEntry.getBlockError() == null) {
        	for (CircuitBreaker circuitBreaker : circuitBreakers) {
        	    circuitBreaker.onRequestComplete(context);
        	}
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    然后在这个onRequestComplete方里面有一个核心方法handleStateChangeWhenThresholdExceeded

    public void onRequestComplete(Context context){
        handleStateChangeWhenThresholdExceeded(rt);
    }
    
    • 1
    • 2
    • 3

    然后进入这个handleStateChangeWhenThresholdExceeded的这个方法,里面有具体的状态切换。就是在进入这个半开状态之后,会再发起一次请求,如果请求时间超过这个阈值,那么又会重新进入这个全开状态,如果没有超过这个阈值,那么就会进入这个关闭状态。

    private void handleStateChangeWhenThresholdExceeded(long rt) {
        if (currentState.get() == State.OPEN) {
            return;
        }
        
        if (currentState.get() == State.HALF_OPEN) {
            if (rt > maxAllowedRt) {
                fromHalfOpenToOpen(1.0d);
            } else {
                fromHalfOpenToClose();
            }
            return;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    这个半开到全开状态的转换,主要是通过这个cas比较与交换的这个算法实现,并且会重新更新这个滑动窗口的这个时间

    protected boolean fromHalfOpenToOpen(double snapshotValue) {
        if (currentState.compareAndSet(State.HALF_OPEN, State.OPEN)) {
            updateNextRetryTimestamp();
            notifyObservers(State.HALF_OPEN, State.OPEN, snapshotValue);
            return true;
        }
        return false;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    三,总结

    首先会在这个服务端里面配置一些规则,然后会将这些规则推送给客户端,客户端将这些规则存在一个内存的一个链条里面,在调用所有的处理器的时候,都会经过这个链条。在经过这个链条的过程中,如果有异常抛出,会根据异常的种类,调用对应的降级方法。

  • 相关阅读:
    nacos配置启动
    快速掌握Gulp自动化构建工具
    一文学会vim基本操作
    黑马程序员 MySQL数据库入门到精通——进阶篇(2)
    学习笔记-SET
    janus-gateway的videoroom插件的RTP包录制功能源码详解
    Day02—Python的变量类型、变量命名、变量类型检查、变量类型转换、运算符
    二维码生成器
    【介绍下LeetCode的使用方法】
    GStreamer Plugin之Gst-nvdewarper
  • 原文地址:https://blog.csdn.net/zhenghuishengq/article/details/126574945