限流是一种编程中常用的保护服务正常运行的一种方案;为了应对激增的流量,防止服务被击垮,经常使用某种算法来对流量进行某种限流,以保证后端服务的正常运行。
本文介绍 计数器、令牌桶算法和楼桶算法及它们之间的优缺点。
           
      1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 package main import ( 	"log" 	"sync" 	"time" ) // Counter Counter type Counter struct { 	rate  int 	begin time.Time 	cycle time.Duration 	count int 	lock  sync.Mutex } // Set Set func (c *Counter) Set(r int, cycle time.Duration) { 	c.rate = r 	c.begin = time.Now() 	c.cycle = cycle 	c.count = 0 } // Reset Reset func (c *Counter) Reset(t time.Time) { 	c.begin = t 	c.count = 0 } // Allow Allow func (c *Counter) Allow() bool { 	c.lock.Lock() 	defer c.lock.Unlock() 	if c.count == c.rate { 		now := time.Now() 		if now.Sub(c.begin) >= c.cycle { 			c.Reset(now) 			return true 		} else { 			return false 		} 	} else { 		c.count++ 		return true 	} } func main() { 	var wg sync.WaitGroup 	lr := new(Counter) 	lr.Set(3, time.Second) 	for i := 0; i < 10; i++ { 		wg.Add(1) 		log.Println("创建请求:", i) 		go func(i int) { 			if lr.Allow() { 				log.Println("request allw: ", i) 			} 			wg.Done() 		}(i) 		time.Sleep(200 * time.Millisecond) 	} 	wg.Wait() } 
这个算法虽然简单,但是有一个十分致命的问题,那就是临界问题,我们看下图:
        
           
      是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。令牌桶算法的描述如下:
假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌。 
桶中最多存放 b 个令牌,当桶满时,新添加的令牌被丢弃或拒绝。 
当一个 n 个字节大小的数据包到达,将从桶中删除n 个令牌,接着数据包被发送到网络上。 
如果桶中的令牌不足 n 个,则不会删除令牌,且该数据包将被限流(要么丢弃,要么缓冲区等待)。 
 
官方实现:https://pkg.go.dev/golang.org/x/time/rate 
        
           
      1 2 3 4 5 6 7 8 9 // 构建限流对象 // NewLimiter returns a new Limiter that allows events up to rate r and permits // bursts of at most b tokens. func NewLimiter(r Limit, b int) *Limiter {     return &Limiter{         limit: r,         burst: b,     } } 
Allow/AllowN 判断是否允许通过
1 2 3 4 5 6 7 8 9 10 11 // Allow is shorthand for AllowN(time.Now(), 1). func (lim *Limiter) Allow() bool {     return lim.AllowN(time.Now(), 1) } // AllowN reports whether n events may happen at time now. // Use this method if you intend to drop / skip events that exceed the rate limit. // Otherwise use Reserve or Wait. func (lim *Limiter) AllowN(now time.Time, n int) bool {     return lim.reserveN(now, n, 0).ok } 
具体细节,请看官方实现。
        
           
      demo示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 package main import ( 	"math" 	"sync" 	"time" ) type tokenBucket struct { 	rate     int64 // 速率 	capacity int64 // 桶容量 	tokens   int64 // 桶内token 个数 	lastTime time.Time 	lock sync.Mutex } func (t *tokenBucket) allow() bool { 	t.lock.Lock() 	defer t.lock.Unlock() 	now := time.Now() 	t.tokens = t.tokens + int64(now.Sub(t.lastTime).Seconds() * float64(t.rate)) 	t.tokens = int64(math.Min(float64(t.tokens), float64(t.capacity))) 	if t.tokens > 0 { 		t.lastTime = now 		t.tokens-- 		return true 	} 	return false } func NewTokenBucket(r, c int64) *tokenBucket { 	return &tokenBucket{ 		rate:     r, 		capacity: c, 		tokens:   0, 		lastTime: time.Now(), 	} } 
测试,此时每5个请求,一个请求成功
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 func main() { 	t := NewTokenBucket(2, 5) 	for i := 0; i < 20; i++ { 		time.Sleep(time.Millisecond * 100) 		if ok := t.allow(); ok { 			fmt.Println("request allow!") 		} else { 			fmt.Println("request deny!") 		} 	} } request deny! request deny! request deny! request deny! request allow! ... 
           
      作为计量工具(The Leaky Bucket Algorithm as a Meter)时,可以用于流量整形(Traffic Shaping)和流量控制(TrafficPolicing),漏桶算法的描述如下:
一个固定容量的漏桶,按照常量固定速率流出水滴。 
如果桶是空的,则不需流出水滴。 
可以以任意速率流入水滴到漏桶。 
如果流入水滴超出了桶的容量,则流入的水滴溢出了(被丢弃),而漏桶容量是不变的。 
 
leaky-bucket rate limit algorithm:https://pkg.go.dev/go.uber.org/ratelimit 
        
           
      1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 import ( 	"fmt" 	"time" 	"go.uber.org/ratelimit" ) func main() {     rl := ratelimit.New(100) // per second     prev := time.Now()     for i := 0; i < 10; i++ {         // 此处会被阻塞,直到请求允许通过         // Take should block to make sure that the RPS is met.         now := rl.Take()         fmt.Println(i, now.Sub(prev))         prev = now     }     // Output:     // 0 0     // 1 10ms     // 2 10ms     // 3 10ms     // 4 10ms     // 5 10ms     // 6 10ms     // 7 10ms     // 8 10ms     // 9 10ms } 
           
      1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package main import ( 	"math" 	"sync" 	"time" ) type LeakBucket struct { 	rate       float64 // 固定每秒出水率 	capacity   float64 // 桶容量 	water      float64 // 桶中当前水量 	lastLeakMs int64   // 上次漏水时间戳 	lock       sync.Mutex } func (l *LeakBucket) Allow() bool { 	l.lock.Lock() 	defer l.lock.Unlock() 	now := time.Now().UnixNano() / 1e6 	eclipse := float64(now-l.lastLeakMs) * l.rate / 1000 	l.water = l.water - eclipse 	l.water = math.Max(0, l.water) 	if (l.water + 1) < l.capacity { 		l.lastLeakMs = now 		l.water++ 		return true 	} else { 		return false 	} } func newBucket(r float64, c float64) *LeakBucket { 	return &LeakBucket{ 		rate:       r, 		capacity:   c, 		water:      0, 		lastLeakMs: time.Now().UnixNano() / 1e6, 	} } 
计数器