• 常见限流算法(固定or滑动窗口、漏桶、令牌桶)


    202208-常见限流算法(固定or滑动窗口、漏桶、令牌桶)

    1. 为什么需要限流

    限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。

    限流的对象:

    • 系统自身:保护本系统,防止上游突发流量将本系统击穿。
    • 下游系统:例如第三方系统性能不可控,即使本系统能处理突发流量,下游由于性能限制,也无法处理。

    2. 固定窗口算法

    计数器法是限流算法里最简单的一种算法。

    定义,对于A接口来说,1分钟的访问次数不能超过100个。设置一个计数器counter,效时间为1分钟(即每分钟计数器会被重置为0),每当一个请求过来,counter就加1,如果counter的值大于100,则说明请求数过多,限制后续请求访问;

    劣势:临界时间点产生突发流量,统计数量不准确。

    假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 n-1 (n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 2n - 1 个。

    设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值。

    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Counter {
        /**
         * 最大访问数量
         */
        private final int limit = 10;
        /**
         * 访问时间差
         */
        private final long timeout = 1000;
        /**
         * 请求时间
         */
        private long time;
        /**
         * 当前计数器
         */
        private AtomicInteger reqCount = new AtomicInteger(0);
    
        public boolean limit() {
            long now = System.currentTimeMillis();
            if (now < time + timeout) {
                // 单位时间内
                reqCount.addAndGet(1);
                return reqCount.get() <= limit;
            } else {
                // 超出单位时间
                time = now;
                reqCount = new AtomicInteger(0);
                return true;
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3. 滑动窗口算法

    滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位,把一分钟分成若干等分(6 份,每份 10 秒),在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1。当等分数量越大限流统计就越详细。

    
        /** 队列id和队列的映射关系,队列里面存储的是每一次通过时候的时间戳,这样可以使得程序里有多个限流队列 */
        private volatile static Map<String, List<Long>> MAP = new ConcurrentHashMap<>();
    
        private SlideWindow() {}
    
        public static void main(String[] args) throws InterruptedException {
            while (true) {
                // 任意10秒内,只允许2次通过
                System.out.println(LocalTime.now().toString() + SlideWindow.isGo("ListId", 2, 10000L));
                // 睡眠0-10秒
                Thread.sleep(1000 * new Random().nextInt(10));
            }
        }
    
        /**
         * 滑动时间窗口限流算法
         * 在指定时间窗口,指定限制次数内,是否允许通过
         *
         * @param listId     队列id
         * @param count      限制次数
         * @param timeWindow 时间窗口大小
         * @return 是否允许通过
         */
        public static synchronized boolean isGo(String listId, int count, long timeWindow) {
            // 获取当前时间
            long nowTime = System.currentTimeMillis();
            // 根据队列id,取出对应的限流队列,若没有则创建
            List<Long> list = MAP.computeIfAbsent(listId, k -> new LinkedList<>());
            // 如果队列还没满,则允许通过,并添加当前时间戳到队列开始位置
            if (list.size() < count) {
                list.add(0, nowTime);
                return true;
            }
    
            // 队列已满(达到限制次数),则获取队列中最早添加的时间戳
            Long farTime = list.get(count - 1);
            // 用当前时间戳 减去 最早添加的时间戳
            if (nowTime - farTime <= timeWindow) {
                // 若结果小于等于timeWindow,则说明在timeWindow内,通过的次数大于count
                // 不允许通过
                return false;
            } else {
                // 若结果大于timeWindow,则说明在timeWindow内,通过的次数小于等于count
                // 允许通过,并删除最早添加的时间戳,将当前时间添加到队列开始位置
                list.remove(count - 1);
                list.add(0, nowTime);
                return true;
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    4. 漏桶算法

    漏桶(Leaky Bucket)算法思路:

    规定固定容量的桶,有水进入,有水流出。对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度。

    • 流入:以任意速率往桶中放入水滴。
    • 流出:以固定速率从桶中流出水滴。

    用白话具体说明:假设漏斗总支持并发100个最大请求,如果当前处理速率超过100,那么拒绝超出的请求

    • 优点:保护服务,服务的处理能力可控
    • 缺点:最大处理速度固定,针对突发特性的流量请求,无法过载处理,缺乏效率。

    在Nginx限流中,有使用该算法的配置。

    示例代码:
    可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(total),另一个是水桶漏洞的大小(rate),伪代码如下:

    
    import lombok.extern.slf4j.Slf4j;
    import org.junit.Test;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    // 漏桶 限流
    @Slf4j
    public class LeakBucketLimiter {
    
        // 计算的起始时间
        private static long lastOutTime = System.currentTimeMillis();
        // 流出速率 每秒 2 次
        private static int leakRate = 2;
    
        // 桶的容量
        private static int capacity = 2;
    
        //剩余的水量
        private static AtomicInteger water = new AtomicInteger(0);
    
        //返回值说明:
        // false 没有被限制到
        // true 被限流
        public static synchronized boolean isLimit(long taskId, int turn) {
            // 如果是空桶,就当前时间作为漏出的时间
            if (water.get() == 0) {
                lastOutTime = System.currentTimeMillis();
                water.addAndGet(1);
                return false;
            }
            // 执行漏水
            int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
            // 计算剩余水量
            int waterLeft = water.get() - waterLeaked;
            water.set(Math.max(0, waterLeft));
            // 重新更新leakTimeStamp
            lastOutTime = System.currentTimeMillis();
            // 尝试加水,并且水还未满 ,放行
            if ((water.get()) < capacity) {
                water.addAndGet(1);
                return false;
            } else {
                // 水满,拒绝加水, 限流
                return true;
            }
    
        }
    
    
        //线程池,用于多线程模拟测试
        private ExecutorService pool = Executors.newFixedThreadPool(10);
    
        @Test
        public void testLimit() {
    
            // 被限制的次数
            AtomicInteger limited = new AtomicInteger(0);
            // 线程数
            final int threads = 2;
            // 每条线程的执行轮数  只测试一秒
            final int turns = 5;
            // 线程同步器
            CountDownLatch countDownLatch = new CountDownLatch(threads);
            long start = System.currentTimeMillis();
            for (int i = 0; i < threads; i++) {
                pool.submit(() ->
                {
                    try {
    
                        for (int j = 0; j < turns; j++) {
    
                            long taskId = Thread.currentThread().getId();
                            boolean intercepted = isLimit(taskId, j);
                            if (intercepted) {
                                // 被限制的次数累积
                                limited.getAndIncrement();
                            }
                            Thread.sleep(200);
                        }
    
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    //等待所有线程结束
                    countDownLatch.countDown();
    
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            float time = (System.currentTimeMillis() - start) / 1000F;
            //输出统计结果
    
            log.info("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get()));
            log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
            log.info("运行的时长为:" + time);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106

    5. 令牌桶算法

    令牌桶(Token Bucket)算法思路:

    • 规定固定容量的桶,按恒定时间间隔往桶里加入Token,如果桶未满,令牌可以积累。如果桶已经满了,令牌则不再积累。(间隔:1/QPS,如果QPS=100,则间隔是10ms)
    • 新请求处理前时,尝试从桶中获取1个Token,如果拿出token,则处理请求;如果没有Token可拿,就阻塞或者拒绝服务。

    漏桶和令牌桶的比较:

    • 令牌桶算法,放在服务端,用来保护服务端(自己),主要用来对调用者频率进行限流,为的是不让自己被压垮。所以如果自己本身有处理能力的时候,如果流量突发(实际消费能力强于配置的流量限制=桶大小),那么实际处理速率可以超过配置的限制(桶大小)。
    • 而漏桶算法,放在调用方,这是用来保护他人,也就是保护他所调用的系统。主要场景是,当调用的第三方系统本身没有保护机制,或者有流量限制的时候,我们的调用速度不能超过他的限制,由于我们不能更改第三方系统,所以只能在主调方控制。即使流量突发也必须舍弃。因为消费能力是第三方决定的。
    • 令牌桶的另外一个好处是可以方便的改变速度。 一旦需要提高速率,则按需提高放入桶中的令牌的速率。 一般会定时(比如100毫秒)往桶中增加一定数量的令牌, 有些变种算法则实时的计算应该增加的令牌的数量。

    开源实现场景:
    Guava提供了限流工具类RateLimiter,该类基于令牌桶算法来完成限流,RateLimiter 是单机(单进程)的限流,是JVM级别的的限流,所有的令牌生成与消费都是在内存中,

    示例代码:

    // 令牌桶 限速
    @Slf4j
    public class TokenBucketLimiter {
        // 上一次令牌发放时间
        public long lastTime = System.currentTimeMillis();
        // 桶的容量
        public int capacity = 2;
        // 令牌生成速度 /s
        public int rate = 2;
        // 当前令牌数量
        public AtomicInteger tokens = new AtomicInteger(0);
        ;
    
        //返回值说明:
        // false 没有被限制到
        // true 被限流
        public synchronized boolean isLimited(long taskId, int applyCount) {
            long now = System.currentTimeMillis();
            //时间间隔,单位为 ms
            long gap = now - lastTime;
    
            //计算时间段内的令牌数
            int reverse_permits = (int) (gap * rate / 1000);
            int all_permits = tokens.get() + reverse_permits;
            // 当前令牌数
            tokens.set(Math.min(capacity, all_permits));
            log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
    
            if (tokens.get() < applyCount) {
                // 若拿不到令牌,则拒绝
                // log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
                return true;
            } else {
                // 还有令牌,领取令牌
                tokens.getAndAdd( - applyCount);
                lastTime = now;
    
                // log.info("剩余令牌.." + tokens);
                return false;
            }
    
        }
    
        //线程池,用于多线程模拟测试
        private ExecutorService pool = Executors.newFixedThreadPool(10);
    
        @Test
        public void testLimit() {
    
            // 被限制的次数
            AtomicInteger limited = new AtomicInteger(0);
            // 线程数
            final int threads = 2;
            // 每条线程的执行轮数
            final int turns = 20;
    
    
            // 同步器
            CountDownLatch countDownLatch = new CountDownLatch(threads);
            long start = System.currentTimeMillis();
            for (int i = 0; i < threads; i++) {
                pool.submit(() ->
                {
                    try {
                        for (int j = 0; j < turns; j++) {
    
                            long taskId = Thread.currentThread().getId();
                            boolean intercepted = isLimited(taskId, 1);
                            if (intercepted) {
                                // 被限制的次数累积
                                limited.getAndIncrement();
                            }
                            Thread.sleep(200);
                        }
    
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    //等待所有线程结束
                    countDownLatch.countDown();
    
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            float time = (System.currentTimeMillis() - start) / 1000F;
            //输出统计结果
    
            log.info("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get()));
            log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
            log.info("运行的时长为:" + time);
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100

    6. Guava包下的RateLimiter–令牌桶算法的完善版

    Guava的RateLimiter是一个基于令牌桶算法实现的限流器,常用于控制网站的QPS。与Semaphore不同,Semaphore控制的是某一时刻的访问量,RateLimiter控制的是某一时间间隔的访问量。

    • 支持系统预热
    • 支持令牌透支

    代码解析:基于guava-31.1-jre版本

    • RateLimiter是一个抽象类。
    • SmoothRateLimiter是RateLimiter的子类,也是一个抽象类。
    • 平滑突发限流(SmoothBursty)和平滑预热限流(SmoothWarmingUp)是定义在SmoothRateLimiter里的两个静态内部类,是SmoothRateLimiter的真正实现类。。

    测试demo

        public void testLimit2() {
            RateLimiter rateLimiter = RateLimiter.create(5);
    
            // 被限制的次数
            AtomicInteger limited = new AtomicInteger(0);
            // 线程数
            final int threads = 5;
            // 每条线程的执行轮数
            final int turns = 200;
            // 线程同步器
            CountDownLatch countDownLatch = new CountDownLatch(threads);
            long start = System.currentTimeMillis();
            for (int i = 0; i < threads; i++) {
                pool.submit(() -> {
                    try {
                        for (int j = 0; j < turns; j++) {
    
                            long taskId = Thread.currentThread().getId();
                            boolean isAcquire = rateLimiter.tryAcquire();
                            if (isAcquire) {
                                log.info("可以运行:taskId={} j={}", taskId, j);
                            } else {
                                log.info("被拒绝:taskId={} j={}", taskId, j);
                                // 被限制的次数累积
                                limited.getAndIncrement();
                            }
                            Thread.sleep(200);
                        }
    
    
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    //等待所有线程结束
                    countDownLatch.countDown();
    
                });
            }
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            float time = (System.currentTimeMillis() - start) / 1000F;
            //输出统计结果
    
            log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get()));
            log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
            log.info("运行的时长为:" + time);
        }
    
    
    // 运行结果
    [main] INFO com.conpany.project.junittest.SimpleTest - 限制的次数为:796,通过的次数为:204
    [main] INFO com.conpany.project.junittest.SimpleTest - 限制的比例为:0.796
    [main] INFO com.conpany.project.junittest.SimpleTest - 运行的时长为:40.864
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    参考

    https://www.cnblogs.com/duanxz/p/4123068.html 几种限流算法
    https://www.cnblogs.com/crazymakercircle/p/15187184.html

  • 相关阅读:
    Elasticsearch搭建
    Minecraft 1.16.5模组开发(五十四) 方块探测器(Detector)
    元老职员离职申请书怎么写模板,共计10篇
    spring框架漏洞整理(Spring Framework漏洞)
    通过stream流实现分页、模糊搜索、按列过滤功能
    网关gateway - 自定义实现动态路由信息存储记载
    zookeeper应用之分布式队列
    jQuery过滤器:筛选jquery对象数组中的DOM对象
    AI Earth ——开发者模式案例4:浙江省森林区域植被生长分析
    Neo4j数据库(二)
  • 原文地址:https://blog.csdn.net/zzxx1994617/article/details/126297359