go实现各类限流算法

介绍

限流是一种编程中常用的保护服务正常运行的一种方案;为了应对激增的流量,防止服务被击垮,经常使用某种算法来对流量进行某种限流,以保证后端服务的正常运行。

本文介绍 计数器、令牌桶算法和楼桶算法及它们之间的优缺点。

计数器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package main

import (
"log"
"sync"
"time"
)

// Counter Counter
type Counter struct {
rate int
begin time.Time
cycle time.Duration
count int
lock sync.Mutex
}

// Set Set
func (c *Counter) Set(r int, cycle time.Duration) {
c.rate = r
c.begin = time.Now()
c.cycle = cycle
c.count = 0
}

// Reset Reset
func (c *Counter) Reset(t time.Time) {
c.begin = t
c.count = 0
}

// Allow Allow
func (c *Counter) Allow() bool {
c.lock.Lock()
defer c.lock.Unlock()

if c.count == c.rate {
now := time.Now()
if now.Sub(c.begin) >= c.cycle {
c.Reset(now)
return true
} else {
return false
}
} else {
c.count++
return true
}

}

func main() {
var wg sync.WaitGroup
lr := new(Counter)
lr.Set(3, time.Second)
for i := 0; i < 10; i++ {
wg.Add(1)
log.Println("创建请求:", i)
go func(i int) {
if lr.Allow() {
log.Println("request allw: ", i)
}
wg.Done()
}(i)
time.Sleep(200 * time.Millisecond)
}
wg.Wait()
}

这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:
counter
从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。

令牌桶算法

是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述如下:

  • 假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌。
  • 桶中最多存放 b 个令牌,当桶满时,新添加的令牌被丢弃或拒绝。
  • 当一个 n 个字节大小的数据包到达,将从桶中删除n 个令牌,接着数据包被发送到网络上。
  • 如果桶中的令牌不足 n 个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)。

官方实现:https://pkg.go.dev/golang.org/x/time/rate

通过 time/rate 包实现

1
2
3
4
5
6
7
8
9
// 构建限流对象
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}

Allow/AllowN 判断是否允许通过

1
2
3
4
5
6
7
8
9
10
11
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}

具体细节,请看官方实现。

自己手动实现

demo示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"math"
"sync"
"time"
)

type tokenBucket struct {
rate int64 // 速率
capacity int64 // 桶容量
tokens int64 // 桶内token 个数
lastTime time.Time
lock sync.Mutex
}

func (t *tokenBucket) allow() bool {
t.lock.Lock()
defer t.lock.Unlock()
now := time.Now()
t.tokens = t.tokens + int64(now.Sub(t.lastTime).Seconds() * float64(t.rate))
t.tokens = int64(math.Min(float64(t.tokens), float64(t.capacity)))
if t.tokens > 0 {
t.lastTime = now
t.tokens--
return true
}
return false
}

func NewTokenBucket(r, c int64) *tokenBucket {
return &tokenBucket{
rate: r,
capacity: c,
tokens: 0,
lastTime: time.Now(),
}
}

测试,此时每5个请求,一个请求成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
func main() {
t := NewTokenBucket(2, 5)
for i := 0; i < 20; i++ {
time.Sleep(time.Millisecond * 100)
if ok := t.allow(); ok {
fmt.Println("request allow!")
} else {
fmt.Println("request deny!")
}
}
}

request deny!
request deny!
request deny!
request deny!
request allow!
...

漏桶算法

作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:

  • 一个固定容量的漏桶,按照常量固定速率流出水滴。
  • 如果桶是空的,则不需流出水滴。
  • 可以以任意速率流入水滴到漏桶。
  • 如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。

leaky-bucket rate limit algorithm:
https://pkg.go.dev/go.uber.org/ratelimit

uber ratelimit 官方实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import (
"fmt"
"time"

"go.uber.org/ratelimit"
)

func main() {
rl := ratelimit.New(100) // per second

prev := time.Now()
for i := 0; i < 10; i++ {
// 此处会被阻塞,直到请求允许通过
// Take should block to make sure that the RPS is met.
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}

// Output:
// 0 0
// 1 10ms
// 2 10ms
// 3 10ms
// 4 10ms
// 5 10ms
// 6 10ms
// 7 10ms
// 8 10ms
// 9 10ms
}

自己实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package main

import (
"math"
"sync"
"time"
)

type LeakBucket struct {
rate float64 // 固定每秒出水率
capacity float64 // 桶容量
water float64 // 桶中当前水量
lastLeakMs int64 // 上次漏水时间戳
lock sync.Mutex
}

func (l *LeakBucket) Allow() bool {
l.lock.Lock()
defer l.lock.Unlock()

now := time.Now().UnixNano() / 1e6
eclipse := float64(now-l.lastLeakMs) * l.rate / 1000
l.water = l.water - eclipse
l.water = math.Max(0, l.water)
if (l.water + 1) < l.capacity {
l.lastLeakMs = now
l.water++
return true
} else {
return false
}
}

func newBucket(r float64, c float64) *LeakBucket {
return &LeakBucket{
rate: r,
capacity: c,
water: 0,
lastLeakMs: time.Now().UnixNano() / 1e6,
}
}

计数器
漏桶算法 流出的速率是固定的,与令牌桶算法不同的是,令牌桶算法是放入的速率固定,取出的速率不固定,所以令牌桶算法支持流量的爆发。

Search by:GoogleBingBaidu