在高并发系统的场景下,经常会听到几个概念:限流, 限速, 限并发。该如何理解它们呢?本文是以小白视角去理解学习其中的思想。
笔者认为服务限制可分为资源限制和频率限制两种。资源同时提供 n 个,用完后再申请就只能等释放/归还,如数据库连接池等。并发就是指这种限制,即某个 uid 只能同时有 n 个下载/上传的连接,或者 serverA 对 serviceB 只能同时有 n 个请求的连接;资源在周期内只能被访问 n 次,没有归还的概念,例如 qps, tps。文章源自玩技e族-https://www.playezu.com/239663.html
限速和限流指就是指这种限制, 一般是指某个 uid 对某个 api 只能在每秒访问 n 次以内,某个 uid 下载只能在每秒 n 字节以内。文章源自玩技e族-https://www.playezu.com/239663.html
实际生活中,也有很多场景,如双 11,618,各种秒杀活动等。通常限流思路应该是这样文章源自玩技e族-https://www.playezu.com/239663.html

与用户打交道的服务经常面临的场景文章源自玩技e族-https://www.playezu.com/239663.html

那么除了限流,还有哪些方案呢?系统保护的三大利器:限流、降级、熔断。服务降级是在服务器压力陡增的情况下,利用现有的资源,关闭一些服务接口或者页面,以此来释放服务器资源保证核心任务的正常运行。是降低了部分服务的处理能力,增加另一部分服务处理能力,访问量不变。文章源自玩技e族-https://www.playezu.com/239663.html
而限流器有四种实现思路:计数器、滑动窗口、漏桶、令牌桶。后两者在 nginx 等开源项目中很常用。文章源自玩技e族-https://www.playezu.com/239663.html
1. 固定窗口

思路文章源自玩技e族-https://www.playezu.com/239663.html
- 将时间划分为固定的窗口大小,例如 10s
- 在窗口时间段内,每来一个请求,对计数器加 1。
- 当计数器达到设定限制后,触发限流策略该窗口时间内的之后的请求都被丢弃处理。
- 该窗口时间结束后,计数器清零,从新开始计数。
缺点:可能会存在临界问题,双倍突发请求避免不了,流量能突破限制
关键代码
type limit struct {
beginTime time.Time
counter int
mu sync.Mutex
}
func (limit *limit) apiLimit() bool {
limit.mu.Lock()
defer limit.mu.Unlock()
nowTime := time.Now()
if nowTime.Sub(limit.beginTime) >= WINDOWTIME {
limit.beginTime = nowTime
limit.counter = 0
}
if limit.counter > MAXREQUEST {
return false
}
limit.counter++
fmt.Println("counter: ", limit.counter)
return true
}
2. 滑动窗口
顾名思义,滑动窗口就是时间窗口在随着时间推移不停地移动。

思路
- 时间划分为细粒度的区间,每个区间维持一个计数器,每进入一个请求则将计数器加一。
- 个区间组成一个时间窗口,每流逝一个区间时间后,则抛弃最老的一个区间,纳入新区间。如图中示例的窗口 T1 变为 窗口 T2
- 当前窗口的区间计数器总和超过设定的限制数量,则本窗口内的后续请求都被丢弃。
滑动窗口是固定窗口的优化,将统计周期进一步细分为 N 个小周期,限流统计时将所有小周期内统计时加和比较;滑动窗口的格子划分的越多,那么滑动窗口的滚动就越平滑,限流的统计就会越精确。
缺点:
- 要更多的存储空间,且小窗口越多,耗内存越多
- 丢请求
关键代码
type SlidingWindow struct {
smallWindowTime int64 // 小窗口时间大小
smallWindowNum int64 // 小窗口总数
smallWindowCap int // 小窗口请求容量
counters map[int64]int // 小窗口计数器
mu sync.Mutex // 锁
}
func NewSlidingWindow(smallWindowTime time.Duration) (*SlidingWindow, error) {
num := int64(WINDOWTIME / smallWindowTime) // 小窗口总数
return &SlidingWindow{
smallWindowTime: int64(smallWindowTime),
smallWindowNum: num,
smallWindowCap: MAXREQUEST / int(num),
counters: make(map[int64]int),
}, nil
}
func (sw *SlidingWindow) ReqLimit() bool {
sw.mu.Lock()
defer sw.mu.Unlock()
// 获取当前小格子窗口所在的时间值
curSmallWindowTime := time.Now().Unix()
// 计算当前小格子窗口起始时间
beginTime := curSmallWindowTime - sw.smallWindowTime*(sw.smallWindowNum-1)
// 计算当前小格子窗口请求总数
var count int
for sWindowTime, counter := range sw.counters { // 遍历计数器
if sWindowTime < beginTime { // 判断不是当前小格子
delete(sw.counters, sWindowTime)
} else {
count += counter // 当前小格子窗口计数器累加
}
}
// 当前小格子请求到达请求限制,请求失败,返回 false
if count >= sw.smallWindowCap {
return false
}
// 没有到达请求上限,当前小格子窗口计数器+1,请求成功
sw.counters[curSmallWindowTime]++
return true
}
3. 漏桶

思路
- 每个请求看作 “水滴” 一样,放入漏桶中暂存。
- 漏桶以固定的速率,漏出请求来执行;如果漏桶空了,则停止漏水。
- 如果漏桶满了,则新来的请求会被丢弃。
很明显,漏桶算法在实现的数据结构会选择有容量限制的队列,请求执行者定期定点从队列取出请求来执行,新来的请求会被暂存在队列中或者被丢弃。漏桶算法的缺点是,不论当前系统的负载压力如何,所有请求都得进行排队,即便此时服务器负载处于低位。
关键代码
type LeakyBucket struct {
capacity int // 桶的容量 - 最高水位
currentReqNum int // 桶中当前请求数量 - 当前水位
lastTime time.Time // 桶中上次请求时间 - 上次放水时间
rate int // 桶中流出请求的速率,每秒流出多少请求,水流速度/秒
mu sync.Mutex // 锁
}
func NewLeakyBucket(rate int) *LeakyBucket {
return &LeakyBucket{
capacity: MAXREQUEST, //容量
lastTime: time.Now(),
rate: rate,
}
}
func (lb *LeakyBucket) ReqLimit() bool {
lb.mu.Lock()
defer lb.mu.Unlock()
now := time.Now()
// 计算距离上次放水时间间隔
gap := now.Sub(lb.lastTime)
fmt.Println("gap:", gap)
fmt.Println("lb.currentReqNum is:",lb.currentReqNum)
if gap >= time.Second {
// gap 这段时间流出的请求数=gap时间 * 每秒流出速率
out := int(gap/time.Second) * lb.rate
// 计算当前桶中请求数
lb.currentReqNum = maxInt(0, lb.currentReqNum-out)
lb.lastTime = now
}
// 桶中的当前请求数大于桶容量,请求失败
if lb.currentReqNum >= lb.capacity {
return false
}
// 若没超过桶容量,桶中请求量+1,返回true
lb.currentReqNum++
fmt.Println("curReqNum:", lb.currentReqNum)
return true
}
4. 令牌桶

思路
- 令牌按固定速率发放,生成的令牌放入令牌桶中
- 令牌桶有容量限制,当桶满时,新生成的令牌会被丢弃。
- 请求到来时,先从令牌桶中获取令牌,如果取得,则执行请求;如果令牌桶为空,则丢弃该请求。
和漏桶算法相比,令牌桶算法最大的优势在于应对突增的流量。漏桶算法只能匀速地消费,突增的流量只能抛弃,而令牌桶只要桶中的令牌足够即可,且令牌生产的速度本身可控可调。
关键代码
func NewTokenBucket(rate int) *TokenBucket {
return &TokenBucket{
capacity: MAXREQUEST,
lastTime: time.Now(),
rate: rate,
}
}
func (tb *TokenBucket) ReqLimit() bool {
tb.mu.Lock()
defer tb.mu.Unlock()
now := time.Now()
// 距离上次发放令牌间隔时间
gap := now.Sub(tb.lastTime)
fmt.Println("gap: ", gap)
if gap >= time.Second {
// 这段时间发放的token数 = gap时间(秒为单位) * 每秒发放token速率
in := int(gap/time.Second) * tb.rate
// 这段时间桶中的token总量
// tb.currentTokenNum+in : 当前已有的token总量+发放的token树
// 当前桶中的token数量当然不能超过桶的最大容量
tb.currentTokenNum = minInt(tb.capacity, tb.currentTokenNum+in)
tb.lastTime = now
}
// 没有token,请求失败
if tb.currentTokenNum <= 0 {
return false
}
// 如果有token,当前 token 数量-1,也就是发放token,请求成功
tb.currentTokenNum--
fmt.Println("curTokenNum:", tb.currentTokenNum)
return true
}
其中,Golang 自带限流器(golang.org/x/time/rate)就是基于令牌桶算法来实现的。
主要包含以下几种方法:
- 限流器构造 func NewLimiter(r Limit, b int) *Limiter
- Wait/WaitN func WaitN(c context.Context, n int) (err error)
- Allow/AllowN func AllowN(now time.Time, n int) bool
- Reserve/ReserveN func ReserveN(n time.Time, n int) *Reservation
实验: 起服务,模拟 5s 内不断请求 API,超限则拒绝!
func main() {
go server()
time.Sleep(time.Second)
// 请求持续5秒
e := time.Now().UnixNano() + 5*time.Second.Nanoseconds()
for {
if time.Now().UnixNano() > e {
break
}
do()
}
}
func server() {
// 每1毫秒投放一次令牌,桶容量大小为 10
r := rate.Every(1 * time.Millisecond)
limit := rate.NewLimiter(r, 10)
http.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
if limit.Allow() {
fmt.Printf("请求成功,当前时间:%sn", time.Now().Format("2006-01-02 15:04:05"))
} else {
fmt.Printf("被限流了。。。n")
}
})
err := http.ListenAndServe(":38888", nil)
if err != nil {
panic(err)
}
}
func do() {
api := "http://localhost:38888/"
res, err := http.Get(api)
if err != nil {
panic(err)
}
defer res.Body.Close()
if res.StatusCode == http.StatusOK {
fmt.Printf("200 ")
}
}
综上讲解,希望对大家理解限流有些许帮助。当然,限流需要与多种技术结合使用,才能更好提供用户体验。
引用
https://blog.csdn.net/weixin_41846320/article/details/95941361
https://blog.csdn.net/weixin_41846320/article/details/95941361
https://zhuanlan.zhihu.com/p/90206074