介绍
限流是一种编程中常用的保护服务正常运行的一种方案;为了应对激增的流量,防止服务被击垮,经常使用某种算法来对流量进行某种限流,以保证后端服务的正常运行。
本文介绍 计数器、令牌桶算法和楼桶算法及它们之间的优缺点。
计数器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package main import ( "log" "sync" "time" ) // Counter Counter type Counter struct { rate int begin time.Time cycle time.Duration count int lock sync.Mutex } // Set Set func (c *Counter) Set(r int, cycle time.Duration) { c.rate = r c.begin = time.Now() c.cycle = cycle c.count = 0 } // Reset Reset func (c *Counter) Reset(t time.Time) { c.begin = t c.count = 0 } // Allow Allow func (c *Counter) Allow() bool { c.lock.Lock() defer c.lock.Unlock() if c.count == c.rate { now := time.Now() if now.Sub(c.begin) >= c.cycle { c.Reset(now) return true } else { return false } } else { c.count++ return true } } func main() { var wg sync.WaitGroup lr := new(Counter) lr.Set(3, time.Second) for i := 0; i < 10; i++ { wg.Add(1) log.Println("创建请求:", i) go func(i int) { if lr.Allow() { log.Println("request allw: ", i) } wg.Done() }(i) time.Sleep(200 * time.Millisecond) } wg.Wait() }
这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图: 从上图中我们可以看到,假设有一个恶意用户,他在0:59时,瞬间发送了100个请求,并且1:00又瞬间发送了100个请求,那么其实这个用户在 1秒里面,瞬间发送了200个请求。我们刚才规定的是1分钟最多100个请求,也就是每秒钟最多1.7个请求,用户通过在时间窗口的重置节点处突发请求, 可以瞬间超过我们的速率限制。用户有可能通过算法的这个漏洞,瞬间压垮我们的应用。
令牌桶算法
是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述如下:
假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌。
桶中最多存放 b 个令牌,当桶满时,新添加的令牌被丢弃或拒绝。
当一个 n 个字节大小的数据包到达,将从桶中删除n 个令牌,接着数据包被发送到网络上。
如果桶中的令牌不足 n 个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)。
官方实现:https://pkg.go.dev/golang.org/x/time/rate
通过 time/rate 包实现
1 2 3 4 5 6 7 8 9 // 构建限流对象 // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. func NewLimiter(r Limit, b int) *Limiter { return &Limiter{ limit: r, burst: b, } }
Allow/AllowN 判断是否允许通过
1 2 3 4 5 6 7 8 9 10 11 // Allow is shorthand for AllowN(time.Now(), 1). func (lim *Limiter) Allow() bool { return lim.AllowN(time.Now(), 1) } // AllowN reports whether n events may happen at time now. // Use this method if you intend to drop / skip events that exceed the rate limit. // Otherwise use Reserve or Wait. func (lim *Limiter) AllowN(now time.Time, n int) bool { return lim.reserveN(now, n, 0).ok }
具体细节,请看官方实现。
自己手动实现
demo示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package main import ( "math" "sync" "time" ) type tokenBucket struct { rate int64 // 速率 capacity int64 // 桶容量 tokens int64 // 桶内token 个数 lastTime time.Time lock sync.Mutex } func (t *tokenBucket) allow() bool { t.lock.Lock() defer t.lock.Unlock() now := time.Now() t.tokens = t.tokens + int64(now.Sub(t.lastTime).Seconds() * float64(t.rate)) t.tokens = int64(math.Min(float64(t.tokens), float64(t.capacity))) if t.tokens > 0 { t.lastTime = now t.tokens-- return true } return false } func NewTokenBucket(r, c int64) *tokenBucket { return &tokenBucket{ rate: r, capacity: c, tokens: 0, lastTime: time.Now(), } }
测试,此时每5个请求,一个请求成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func main() { t := NewTokenBucket(2, 5) for i := 0; i < 20; i++ { time.Sleep(time.Millisecond * 100) if ok := t.allow(); ok { fmt.Println("request allow!") } else { fmt.Println("request deny!") } } } request deny! request deny! request deny! request deny! request allow! ...
漏桶算法
作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
一个固定容量的漏桶,按照常量固定速率流出水滴。
如果桶是空的,则不需流出水滴。
可以以任意速率流入水滴到漏桶。
如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。
leaky-bucket rate limit algorithm:https://pkg.go.dev/go.uber.org/ratelimit
uber ratelimit 官方实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import ( "fmt" "time" "go.uber.org/ratelimit" ) func main() { rl := ratelimit.New(100) // per second prev := time.Now() for i := 0; i < 10; i++ { // 此处会被阻塞,直到请求允许通过 // Take should block to make sure that the RPS is met. now := rl.Take() fmt.Println(i, now.Sub(prev)) prev = now } // Output: // 0 0 // 1 10ms // 2 10ms // 3 10ms // 4 10ms // 5 10ms // 6 10ms // 7 10ms // 8 10ms // 9 10ms }
自己实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package main import ( "math" "sync" "time" ) type LeakBucket struct { rate float64 // 固定每秒出水率 capacity float64 // 桶容量 water float64 // 桶中当前水量 lastLeakMs int64 // 上次漏水时间戳 lock sync.Mutex } func (l *LeakBucket) Allow() bool { l.lock.Lock() defer l.lock.Unlock() now := time.Now().UnixNano() / 1e6 eclipse := float64(now-l.lastLeakMs) * l.rate / 1000 l.water = l.water - eclipse l.water = math.Max(0, l.water) if (l.water + 1) < l.capacity { l.lastLeakMs = now l.water++ return true } else { return false } } func newBucket(r float64, c float64) *LeakBucket { return &LeakBucket{ rate: r, capacity: c, water: 0, lastLeakMs: time.Now().UnixNano() / 1e6, } }
计数器 漏桶算法 流出的速率是固定的,与令牌桶算法不同的是,令牌桶算法是放入的速率固定,取出的速率不固定,所以令牌桶算法支持流量的爆发。