限流技术的常见算法及其实现

绝境长城the wall

通常限流主要是限制并发数以及QPS,从而避免异常流量对系统的冲击;

并发数和QPS是紧密相关的,可以参考Little’s Law(律特法则):L = λW (proven 1961)

一个排队系统在稳定状态下,在系统里面的个体的数量的平均值 L, 等于
平均个体到达率λ (单位是λ个每单位时间)乘以 个体的平均逗留时间W

数学定理(严格的数学推理,非经验公式)
排队论的理论

限流算法

最粗暴的实现方式是每执行一次delay一定时间,从而达到限制QPS的效果

比如,我们想以最大的QPS为10去处理1000个业务逻辑,那么代码很可能这么写

1
2
3
4
for (int i = 0; i < 1000; i++) {
doSomething();
Thread.sleep(100);
}

这种写法有什么弊端?

  • delay的预估时间不精确

    为什么是100ms呢?其实QPS还依赖于doSomething的执行时间;

    如果doSomething的执行时间短,比如是1ms,那么QPS最接近10;

    如果doSomething的执行时间长,比如是500,那么QPS还不到2

    通常情况下,doSomething的执行时间是非常不确定的,所以我们很难给出一个相对精确的delay时间

  • 多线程下如何控制呢?

    显然,由于doSomething的执行时间不确定,会导致没法在较短的时间内处理完1000个业务逻辑

    所以必须借助线程池,通过并行去处理,但是QPS也必须控制在10以内

    WorkThread的代码很有可能这么写,然后启多少个线程合适呢?

1
2
3
4
5
6
7
8
9
@Override
public void run() {
doSomething();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

因此,采用delay这种粗暴的实现方式很难将QPS稳定地控制在10

要想解决这个问题,就必须做到

  • 固定速率

    在1秒这个时间窗口内,将执行的次数控制在10,这样速率是固定的

  • 不允许突发情况
    在任何1秒时间窗口内,不能超过10个

    所以不能出现以下突发情况:比如QPS=2,0.0s到1.0s和1.0s到2.0s分别指允许执行2次,

    但是在0.08秒、0.09秒、1.01秒、1.02秒执行了4次,所以在0.5秒到1.5秒这期间的1秒,

    QPS是4并且超过了2

Fixed_Sliding_Window

Leaky bucket(漏桶算法)正好解决了这些问题

Token bucket(令牌桶算法)和它一样,都是最常见的两种限流算法

Leaky bucket漏桶算法

算法实现

Leaky bucket

不断的往桶里面注水,无论注水的速度是大还是小,水都是按固定的速率往外漏水;

如果桶满了,水会溢出;

特点

  • 漏水的速率是固定的
  • 即使存在注水burst(突然注水量变大)的情况,漏水的速率也是固定的

Token bucket令牌桶算法

算法实现

Token bucket

令牌发送:每秒往桶里面发送r个令牌(token)

桶的容量:桶中最多可以存放b个token;当放入的token数量超过b时,新放入的token会被丢弃

请求访问:每次请求访问时先check桶中有没有剩余的令牌

  • 如果桶中有足够的令牌,那么允许这次业务请求,并将这次从桶中取出的令牌删除

  • 如果桶中令牌不足,那么这次业务请求不允许操作,会以如下方式处理:

    • 直接被丢弃

    • 放到队列中,当桶中有足够多令牌时再允许该业务请求

由于每秒会不断地往桶中放r个token,所以当无业务请求需处理时,桶中的token数量会不断增加,止到达到桶的容量b为止

特点

  • 令牌可以积累

    桶中最大的令牌数是b,也是可以积累的最大令牌数

  • 允许突发流量

    桶中token可以积累到n(b<=n<=0),此时如果有n个突发请求同时到达,这n个请求是可以同时允许处理的

Leakly bucket VS Token bucket

对比项 Leakly bucket Token bucket Token bucket的备注
依赖token
立即执行 有足够的token才能执行
堆积token
速率恒定 可以大于设定的QPS

具体实现

Guava的RateLimiter实现

在Guava中RateLimiter的实现有两种:SmoothBurstySmoothWarmUp

补充类图

SmoothBursty

  • 基于Token bucket算法的实现
1
2
3
RateLimiter rateLimiter = RateLimiter.create(permitsPerSecond);//创建一个SmoothBursty实例
rateLimiter.acquire();//获取1个permit;可能会被阻塞止到获取到为止

以下场景,调用acquire()时何时有返回值?

QPS=1,4个线程在以下时间点依次调用acquire()方法

1
2
3
4
5
6
7
T0 at 0 seconds --> 0 excute
T1 at 1.05 seconds --> 1.05
T2 at 2 seconds --> 2.05(=1.05+1)
T3 at 3 seconds --> 3.05(=2.05+1)
  • 存储token,具备处理突发请求的能力

    当RateLimiter空闲时(无请求需处理),可以积累一定时间内的permits(token)

    比如,当QPS=2,maxBurstSeconds=10时,也就意味着如果RateLimiter空闲,那么可以积累

    10秒的permits,也就是10*2=20个(maxPermits = maxBurstSeconds *permitsPerSecond),

    当下次有请求过来时,可以立即取走这20个permits,从而可以达到突发请求的效果

注意:SmoothBursty中maxBurstSeconds的默认值是1,并且不可以修改,所以SmoothBursty最多只能积累permitsPerSecond个permits

SmoothWarmingUp

  • 基于Leaky bucket算法实现
  • QPS是固定的
  • 不支持burst
  • 使用于需要预热时间的使用场景
    RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit)
1
2
3
4
//创建一个SmoothWarmingUp实例;warmupPeriod是指预热的时间
RateLimiter rateLimiter =
RateLimiter.create(permitsPerSecond,warmupPeriod,timeUnit);
rateLimiter.acquire();//获取1个permit;可能会被阻塞止到获取到为止

预热期间QPS会平滑地逐步加速到最大的速率(也就是QPS)

简单用例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class RateLimiterUseCase {
private Logger logger = LoggerFactory.getLogger(RateLimiterUseCase.class);
private int qps;
private int requestCount;
private RateLimiter rateLimiter;
public RateLimiterUseCase(int qps, int requestCount) {
this.qps = qps;
this.requestCount = requestCount;
}
private void buildRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
private void processRequest(int requestCount) {
logger.info("RateLimiter 类型:{}", rateLimiter.getClass());
long startTimeMillis = System.currentTimeMillis();
for (int i = 0; i < requestCount; i++) {
rateLimiter.acquire();
}
long usedTimeMillis = System.currentTimeMillis() - startTimeMillis;
logger.info("处理请求数:{},耗时:{},限流的qps:{},实际的qps:{}", requestCount, usedTimeMillis, rateLimiter.getRate(), Math.ceil(requestCount / (usedTimeMillis / 1000.00)));
logger.info("");
}
private void sleep(int sleepTimeSeconds) {
logger.info("等待{}秒后,继续处理下一批{}个请求", sleepTimeSeconds, requestCount);
try {
Thread.sleep(sleepTimeSeconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void processWithTokenBucket() {
buildRateLimiter(RateLimiter.create(qps));
doProcess();
}
public void processWithLeakBucket() {
buildRateLimiter(RateLimiter.create(qps, 0, TimeUnit.MILLISECONDS));
doProcess();
}
private void doProcess() {
sleep(0);
processRequest(requestCount);
sleep(1);
processRequest(requestCount);
sleep(5);
processRequest(requestCount);
sleep(10);
processRequest(requestCount);
}
public static void main(String[] args) {
RateLimiterUseCase test = new RateLimiterUseCase(10, 100);
test.processWithLeakBucket();
test.processWithTokenBucket();
test = new RateLimiterUseCase(10, 15);
test.processWithLeakBucket();
test.processWithTokenBucket();
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
09:55:47.662 [main] INFO c.n.guava.RateLimiterUseCase - 等待0秒后,继续处理下一批100个请求
09:55:47.668 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:55:57.573 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:9904,限流的qps:10.0,实际的qps:11.0
09:55:57.574 [main] INFO c.n.guava.RateLimiterUseCase -
09:55:57.574 [main] INFO c.n.guava.RateLimiterUseCase - 等待1秒后,继续处理下一批100个请求
09:55:58.578 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:56:08.481 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:9903,限流的qps:10.0,实际的qps:11.0
09:56:08.481 [main] INFO c.n.guava.RateLimiterUseCase -
09:56:08.481 [main] INFO c.n.guava.RateLimiterUseCase - 等待5秒后,继续处理下一批100个请求
09:56:13.486 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:56:23.388 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:9902,限流的qps:10.0,实际的qps:11.0
09:56:23.388 [main] INFO c.n.guava.RateLimiterUseCase -
09:56:23.388 [main] INFO c.n.guava.RateLimiterUseCase - 等待10秒后,继续处理下一批100个请求
09:56:33.391 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:56:43.293 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:9902,限流的qps:10.0,实际的qps:11.0
09:56:43.294 [main] INFO c.n.guava.RateLimiterUseCase -
09:56:43.294 [main] INFO c.n.guava.RateLimiterUseCase - 等待0秒后,继续处理下一批100个请求
09:56:43.294 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:56:53.195 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:9901,限流的qps:10.0,实际的qps:11.0
09:56:53.196 [main] INFO c.n.guava.RateLimiterUseCase -
09:56:53.196 [main] INFO c.n.guava.RateLimiterUseCase - 等待1秒后,继续处理下一批100个请求
09:56:54.197 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:57:03.194 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:8997,限流的qps:10.0,实际的qps:12.0
09:57:03.194 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:03.194 [main] INFO c.n.guava.RateLimiterUseCase - 等待5秒后,继续处理下一批100个请求
09:57:08.198 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:57:17.102 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:8904,限流的qps:10.0,实际的qps:12.0
09:57:17.103 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:17.103 [main] INFO c.n.guava.RateLimiterUseCase - 等待10秒后,继续处理下一批100个请求
09:57:27.107 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:57:36.012 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:100,耗时:8905,限流的qps:10.0,实际的qps:12.0
09:57:36.012 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:36.012 [main] INFO c.n.guava.RateLimiterUseCase - 等待0秒后,继续处理下一批15个请求
09:57:36.013 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:57:37.416 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:1403,限流的qps:10.0,实际的qps:11.0
09:57:37.416 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:37.416 [main] INFO c.n.guava.RateLimiterUseCase - 等待1秒后,继续处理下一批15个请求
09:57:38.417 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:57:39.820 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:1402,限流的qps:10.0,实际的qps:11.0
09:57:39.821 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:39.821 [main] INFO c.n.guava.RateLimiterUseCase - 等待5秒后,继续处理下一批15个请求
09:57:44.825 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:57:46.228 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:1403,限流的qps:10.0,实际的qps:11.0
09:57:46.229 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:46.229 [main] INFO c.n.guava.RateLimiterUseCase - 等待10秒后,继续处理下一批15个请求
09:57:56.233 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothWarmingUp
09:57:57.636 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:1403,限流的qps:10.0,实际的qps:11.0
09:57:57.636 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:57.636 [main] INFO c.n.guava.RateLimiterUseCase - 等待0秒后,继续处理下一批15个请求
09:57:57.636 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:57:59.037 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:1400,限流的qps:10.0,实际的qps:11.0
09:57:59.037 [main] INFO c.n.guava.RateLimiterUseCase -
09:57:59.037 [main] INFO c.n.guava.RateLimiterUseCase - 等待1秒后,继续处理下一批15个请求
09:58:00.038 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:58:00.539 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:501,限流的qps:10.0,实际的qps:30.0
09:58:00.540 [main] INFO c.n.guava.RateLimiterUseCase -
09:58:00.540 [main] INFO c.n.guava.RateLimiterUseCase - 等待5秒后,继续处理下一批15个请求
09:58:05.542 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:58:05.945 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:403,限流的qps:10.0,实际的qps:38.0
09:58:05.946 [main] INFO c.n.guava.RateLimiterUseCase -
09:58:05.946 [main] INFO c.n.guava.RateLimiterUseCase - 等待10秒后,继续处理下一批15个请求
09:58:15.947 [main] INFO c.n.guava.RateLimiterUseCase - RateLimiter 类型:class com.google.common.util.concurrent.SmoothRateLimiter$SmoothBursty
09:58:16.350 [main] INFO c.n.guava.RateLimiterUseCase - 处理请求数:15,耗时:403,限流的qps:10.0,实际的qps:38.0
09:58:16.350 [main] INFO c.n.guava.RateLimiterUseCase -

RateLimiter的结论

对于SmoothBurst而言,是基于Token bucket算法,因此

速率不是固定的,会出现burst突发情况
设定的QPS越大,需要处理的request越小时,实际的QPS越大;
设定的QPS越小,需要处理的request越大时,实际的QPS越平稳,越接近设定的QPS

对于SmoothWarmingUp而言,是基于Leaky bucket算法,因此

速率是固定的,因此QPS也是固定的,不会出现burst突发情况

参考资料

Token bucket算法图片

Better Rate Limiting in .NET