限流可以认为服务降级的一种,限流就是限制系统的输入和输出流量已达到保护系统的目的。一般来说系统的吞吐量是可以被测算的,为了保证系统的稳定运行,一旦达到的需要限制的阈值,就需要限制流量并采取一些措施以完成限制流量的目的。比如:延迟处理,拒绝处理,或者部分拒绝处理等等。
限流的对象:
计数器法是限流算法里最简单的一种算法。
定义,对于A接口来说,1分钟的访问次数不能超过100个。设置一个计数器counter,效时间为1分钟(即每分钟计数器会被重置为0),每当一个请求过来,counter就加1,如果counter的值大于100,则说明请求数过多,限制后续请求访问;
劣势:临界时间点产生突发流量,统计数量不准确。
假设在 00:01 时发生一个请求,在 00:01-00:58 之间不在发送请求,在 00:59 时发送剩下的所有请求 n-1 (n 为限流请求数量),在下一分钟的 00:01 发送 n 个请求,这样在 2 秒钟内请求到达了 2n - 1 个。
设每分钟请求数量为 60 个,每秒可以处理 1 个请求,用户在 00:59 发送 60 个请求,在 01:00 发送 60 个请求 此时 2 秒钟有 120 个请求(每秒 60 个请求),远远大于了每秒钟处理数量的阈值。
import java.util.concurrent.atomic.AtomicInteger;
public class Counter {
/**
* 最大访问数量
*/
private final int limit = 10;
/**
* 访问时间差
*/
private final long timeout = 1000;
/**
* 请求时间
*/
private long time;
/**
* 当前计数器
*/
private AtomicInteger reqCount = new AtomicInteger(0);
public boolean limit() {
long now = System.currentTimeMillis();
if (now < time + timeout) {
// 单位时间内
reqCount.addAndGet(1);
return reqCount.get() <= limit;
} else {
// 超出单位时间
time = now;
reqCount = new AtomicInteger(0);
return true;
}
}
}
滑动窗口是对计数器方式的改进,增加一个时间粒度的度量单位,把一分钟分成若干等分(6 份,每份 10 秒),在每一份上设置独立计数器,在 00:00-00:09 之间发生请求计数器累加 1。当等分数量越大限流统计就越详细。
/** 队列id和队列的映射关系,队列里面存储的是每一次通过时候的时间戳,这样可以使得程序里有多个限流队列 */
private volatile static Map<String, List<Long>> MAP = new ConcurrentHashMap<>();
private SlideWindow() {}
public static void main(String[] args) throws InterruptedException {
while (true) {
// 任意10秒内,只允许2次通过
System.out.println(LocalTime.now().toString() + SlideWindow.isGo("ListId", 2, 10000L));
// 睡眠0-10秒
Thread.sleep(1000 * new Random().nextInt(10));
}
}
/**
* 滑动时间窗口限流算法
* 在指定时间窗口,指定限制次数内,是否允许通过
*
* @param listId 队列id
* @param count 限制次数
* @param timeWindow 时间窗口大小
* @return 是否允许通过
*/
public static synchronized boolean isGo(String listId, int count, long timeWindow) {
// 获取当前时间
long nowTime = System.currentTimeMillis();
// 根据队列id,取出对应的限流队列,若没有则创建
List<Long> list = MAP.computeIfAbsent(listId, k -> new LinkedList<>());
// 如果队列还没满,则允许通过,并添加当前时间戳到队列开始位置
if (list.size() < count) {
list.add(0, nowTime);
return true;
}
// 队列已满(达到限制次数),则获取队列中最早添加的时间戳
Long farTime = list.get(count - 1);
// 用当前时间戳 减去 最早添加的时间戳
if (nowTime - farTime <= timeWindow) {
// 若结果小于等于timeWindow,则说明在timeWindow内,通过的次数大于count
// 不允许通过
return false;
} else {
// 若结果大于timeWindow,则说明在timeWindow内,通过的次数小于等于count
// 允许通过,并删除最早添加的时间戳,将当前时间添加到队列开始位置
list.remove(count - 1);
list.add(0, nowTime);
return true;
}
}
漏桶(Leaky Bucket)算法思路:
规定固定容量的桶,有水进入,有水流出。对于流进的水我们无法估计进来的数量、速度,对于流出的水我们可以控制速度。
用白话具体说明:假设漏斗总支持并发100个最大请求,如果当前处理速率超过100,那么拒绝超出的请求
在Nginx限流中,有使用该算法的配置。
示例代码:
可见这里有两个变量,一个是桶的大小,支持流量突发增多时可以存多少的水(total),另一个是水桶漏洞的大小(rate),伪代码如下:
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
// 漏桶 限流
@Slf4j
public class LeakBucketLimiter {
// 计算的起始时间
private static long lastOutTime = System.currentTimeMillis();
// 流出速率 每秒 2 次
private static int leakRate = 2;
// 桶的容量
private static int capacity = 2;
//剩余的水量
private static AtomicInteger water = new AtomicInteger(0);
//返回值说明:
// false 没有被限制到
// true 被限流
public static synchronized boolean isLimit(long taskId, int turn) {
// 如果是空桶,就当前时间作为漏出的时间
if (water.get() == 0) {
lastOutTime = System.currentTimeMillis();
water.addAndGet(1);
return false;
}
// 执行漏水
int waterLeaked = ((int) ((System.currentTimeMillis() - lastOutTime) / 1000)) * leakRate;
// 计算剩余水量
int waterLeft = water.get() - waterLeaked;
water.set(Math.max(0, waterLeft));
// 重新更新leakTimeStamp
lastOutTime = System.currentTimeMillis();
// 尝试加水,并且水还未满 ,放行
if ((water.get()) < capacity) {
water.addAndGet(1);
return false;
} else {
// 水满,拒绝加水, 限流
return true;
}
}
//线程池,用于多线程模拟测试
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLimit() {
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
// 线程数
final int threads = 2;
// 每条线程的执行轮数 只测试一秒
final int turns = 5;
// 线程同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
long taskId = Thread.currentThread().getId();
boolean intercepted = isLimit(taskId, j);
if (intercepted) {
// 被限制的次数累积
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有线程结束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
log.info("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get()));
log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
log.info("运行的时长为:" + time);
}
}
令牌桶(Token Bucket)算法思路:
漏桶和令牌桶的比较:
开源实现场景:
Guava提供了限流工具类RateLimiter,该类基于令牌桶算法来完成限流,RateLimiter 是单机(单进程)的限流,是JVM级别的的限流,所有的令牌生成与消费都是在内存中,
示例代码:
// 令牌桶 限速
@Slf4j
public class TokenBucketLimiter {
// 上一次令牌发放时间
public long lastTime = System.currentTimeMillis();
// 桶的容量
public int capacity = 2;
// 令牌生成速度 /s
public int rate = 2;
// 当前令牌数量
public AtomicInteger tokens = new AtomicInteger(0);
;
//返回值说明:
// false 没有被限制到
// true 被限流
public synchronized boolean isLimited(long taskId, int applyCount) {
long now = System.currentTimeMillis();
//时间间隔,单位为 ms
long gap = now - lastTime;
//计算时间段内的令牌数
int reverse_permits = (int) (gap * rate / 1000);
int all_permits = tokens.get() + reverse_permits;
// 当前令牌数
tokens.set(Math.min(capacity, all_permits));
log.info("tokens {} capacity {} gap {} ", tokens, capacity, gap);
if (tokens.get() < applyCount) {
// 若拿不到令牌,则拒绝
// log.info("被限流了.." + taskId + ", applyCount: " + applyCount);
return true;
} else {
// 还有令牌,领取令牌
tokens.getAndAdd( - applyCount);
lastTime = now;
// log.info("剩余令牌.." + tokens);
return false;
}
}
//线程池,用于多线程模拟测试
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLimit() {
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
// 线程数
final int threads = 2;
// 每条线程的执行轮数
final int turns = 20;
// 同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
try {
for (int j = 0; j < turns; j++) {
long taskId = Thread.currentThread().getId();
boolean intercepted = isLimited(taskId, 1);
if (intercepted) {
// 被限制的次数累积
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有线程结束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
log.info("限制的次数为:" + limited.get() +",通过的次数为:" + (threads * turns - limited.get()));
log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
log.info("运行的时长为:" + time);
}
}
Guava的RateLimiter是一个基于令牌桶算法实现的限流器,常用于控制网站的QPS。与Semaphore不同,Semaphore控制的是某一时刻的访问量,RateLimiter控制的是某一时间间隔的访问量。
代码解析:基于guava-31.1-jre版本
测试demo
public void testLimit2() {
RateLimiter rateLimiter = RateLimiter.create(5);
// 被限制的次数
AtomicInteger limited = new AtomicInteger(0);
// 线程数
final int threads = 5;
// 每条线程的执行轮数
final int turns = 200;
// 线程同步器
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() -> {
try {
for (int j = 0; j < turns; j++) {
long taskId = Thread.currentThread().getId();
boolean isAcquire = rateLimiter.tryAcquire();
if (isAcquire) {
log.info("可以运行:taskId={} j={}", taskId, j);
} else {
log.info("被拒绝:taskId={} j={}", taskId, j);
// 被限制的次数累积
limited.getAndIncrement();
}
Thread.sleep(200);
}
} catch (Exception e) {
e.printStackTrace();
}
//等待所有线程结束
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
float time = (System.currentTimeMillis() - start) / 1000F;
//输出统计结果
log.info("限制的次数为:" + limited.get() + ",通过的次数为:" + (threads * turns - limited.get()));
log.info("限制的比例为:" + (float) limited.get() / (float) (threads * turns));
log.info("运行的时长为:" + time);
}
// 运行结果
[main] INFO com.conpany.project.junittest.SimpleTest - 限制的次数为:796,通过的次数为:204
[main] INFO com.conpany.project.junittest.SimpleTest - 限制的比例为:0.796
[main] INFO com.conpany.project.junittest.SimpleTest - 运行的时长为:40.864
https://www.cnblogs.com/duanxz/p/4123068.html 几种限流算法
https://www.cnblogs.com/crazymakercircle/p/15187184.html