go语言中的限流漏桶和令牌桶库怎么使用
导读:本文共8567字符,通常情况下阅读需要29分钟。同时您也可以点击右侧朗读,来听本文内容。按键盘←(左) →(右) 方向键可以翻页。
摘要: 为什么需要限流中间件?在大数据量高并发访问时,经常会出现服务或接口面对大量的请求而导致数据库崩溃的情况,甚至引发连锁反映导致整个系统崩溃。或者有人恶意攻击网站,大量的无用请求出现会导致缓存穿透的情况出现。使用限流中间件可以在短时间内对请求进行限制数量... ...
目录
(为您整理了一些要点),点击可以直达。
为什么需要限流中间件?
在大数据量高并发访问时,经常会出现服务或接口面对大量的请求而导致数据库崩溃的情况,甚至引发连锁反映导致整个系统崩溃。或者有人恶意攻击网站,大量的无用请求出现会导致缓存穿透的情况出现。使用限流中间件可以在短时间内对请求进行限制数量,起到降级的作用,从而保障了网站的安全性。
应对大量并发请求的策略?
使用消息中间件进行统一限制(降速)
使用限流方案将多余请求返回(限流)
升级服务器
缓存(但仍然有缓存穿透等危险)
等等
可以看出在代码已经无法提升的情况下,只能去提升硬件水平。或者改动架构再加一层!也可以使用消息中间件统一处理。而结合看来,限流方案是一种既不需要大幅改动也不需要高额开销的策略。
常见的限流方案
令牌桶算法
漏桶算法
滑动窗口算法
等等
漏桶
引入ratelimit库
go get -u go.uber.org/ratelimit
库函数源代码
//NewreturnsaLimiterthatwilllimittothegivenRPS.funcNew(rateint,opts...Option)Limiter{returnnewAtomicBased(rate,opts...)}//newAtomicBasedreturnsanewatomicbasedlimiter.funcnewAtomicBased(rateint,opts...Option)*atomicLimiter{//TODOconsidermovingconfigbuildingtotheimplementation//independentcode.config:=buildConfig(opts)perRequest:=config.per/time.Duration(rate)l:=&atomicLimiter{perRequest:perRequest,maxSlack:-1*time.Duration(config.slack)*perRequest,clock:config.clock,}initialState:=state{last:time.Time{},sleepFor:0,}atomic.StorePointer(&l.state,unsafe.Pointer(&initialState))returnl}
该函数使用了函数选项模式对多个结构体对象进行初始化
根据传入的值来初始化一个桶结构体 rate
为int
传参 。
初始化过程中包括了
每一滴水需要的时间
perquest = config.per / time.Duration(rate)
maxSlack
宽松度(宽松度为负值)-1 * time.Duration(config.slack) * perRequest
松紧度是用来规范等待时间的
//Clockistheminimumnecessaryinterfacetoinstantiatearatelimiterwith//aclockormockclock,compatiblewithclockscreatedusing//github.com/andres-erbsen/clock.typeClockinterface{Now()time.TimeSleep(time.Duration)}
同时还需要结构体Clock
来记录当前请求的时间now
和此刻的请求所需要花费等待的时间sleep
typestatestruct{lasttime.TimesleepFortime.Duration}
state
主要用来记录上次执行的时间以及当前执行请求需要花费等待的时间(作为中间状态记录)
最重要的Take逻辑
func(t*atomicLimiter)Take()time.Time{var(newStatestatetakenboolintervaltime.Duration)for!taken{now:=t.clock.Now()previousStatePointer:=atomic.LoadPointer(&t.state)oldState:=(*state)(previousStatePointer)newState=state{last:now,sleepFor:oldState.sleepFor,}ifoldState.last.IsZero(){taken=atomic.CompareAndSwapPointer(&t.state,previousStatePointer,unsafe.Pointer(&newState))continue}//计算是否需要进行等待取水操作newState.sleepFor+=t.perRequest(每两滴水之间的间隔时间)-now.Sub(oldState.last)(当前时间与上次取水时间的间隔)//如果等待取水时间特别小,就需要松紧度进行维护ifnewState.sleepFor<t.maxSlack{newState.sleepFor=t.maxSlack}//如果等待时间大于0,就进行更新ifnewState.sleepFor>0{newState.last=newState.last.Add(newState.sleepFor)interval,newState.sleepFor=newState.sleepFor,0}taken=atomic.CompareAndSwapPointer(&t.state,previousStatePointer,unsafe.Pointer(&newState))}t.clock.Sleep(interval)//最后返回需要等待的时间returnnewState.last}
实现一个Take方法
该Take方法会进行原子性操作(可以理解为加锁和解锁),在大量并发请求下仍可以保证正常使用。
记录下当前的时间
now := t.clock.Now()
oldState.last.IsZero()
判断是不是第一次取水,如果是就直接将state
结构体中的值进行返回。而这个结构体中初始化了上次执行时间,如果是第一次取水就作为当前时间直接传参。如果
newState.sleepFor
非常小,就会出现问题,因此需要借助宽松度,一旦这个最小值比宽松度小,就用宽松度对取水时间进行维护。如果
newState.sleepFor > 0
就直接更新结构体中上次执行时间newState.last = newState.last.Add(newState.sleepFor)
并记录需要等待的时间interval, newState.sleepFor = newState.sleepFor, 0
。如果允许取水和等待操作,那就说明没有发生并发竞争的情况,就模拟睡眠时间
t.clock.Sleep(interval)
。然后将取水的目标时间进行返回,由服务端代码来判断是否打回响应或者等待该时间后继续响应。
t.clock.Sleep(interval)
func(c*clock)Sleep(dtime.Duration){time.Sleep(d)}
实际上在一个请求来的时候,限流器就会进行睡眠对应的时间,并在睡眠后将最新取水时间返回。
实际应用(使用Gin框架)
funcratelimit1()func(ctx*gin.Context){r1:=rate1.New(100)returnfunc(ctx*gin.Context){now:=time.Now()//Take返回的是一个time.Duration的时间ifr1.Take().Sub(now)>0{//返回的时间比当前的时间还大,说明需要进行等待//如果需要等待,就time.Sleep(r1.Take().Sub(now()))然后放行//如果不需要等待请求时间,就直接进行Abort然后返回response(ctx,http.StatusRequestTimeout,"rate1limit...")fmt.Println("rate1limit...")ctx.Abort()return}//放行ctx.Next()}}
这里你可以进行选择是否返回。因为Take一定会执行sleep函数,所以当执行take结束后表示当前请求已经接到了水。当前演示使用第一种情况。
如果你的业务要求响应不允许进行等待。那么可以在该请求接完水之后然后,如上例。
如果你的业务允许响应等待,那么该请求等待对应的接水时间后进行下一步。具体代码就是将
if
中的内容直接忽略。(建议使用)
测试代码
这里定义了一个响应函数和一个handler
函数方便测试
funcresponse(c*gin.Context,codeint,infoany){c.JSON(code,info)}funcpingHandler(c*gin.Context){response(c,200,"pingok~")}
执行go test -run=Run -v
先开启一个web服务
funcTestRun(t*testing.T){r:=gin.Default()r.GET("/ping1",ratelimit1(),pingHandler)r.GET("/ping2",ratelimit2(),helloHandler)_=r.Run(":4399")}
使用接口压力测试工具go-wrk
进行测试->tsliwowicz/go-wrk: go-wrk)
在golang引入install版本可以直接通过go install github.com/tsliwowicz/go-wrk@latest
下载
使用帮助
Usage:go-wrk<options><url>Options:-HHeadertoaddtoeachrequest(youcandefinemultiple-Hflags)(Default)-MHTTPmethod(DefaultGET)-TSocket/requesttimeoutinms(Default1000)-bodyrequestbodystringor@filename(Default)-cNumberofgoroutinestouse(concurrentconnections)(Default10)-caCAfiletoverifypeeragainst(SSL/TLS)(Default)-certCAcertificatefiletoverifypeeragainst(SSL/TLS)(Default)-dDurationoftestinseconds(Default10)-fPlaybackfilename(Default<empty>)-helpPrinthelp(Defaultfalse)-hostHostHeader(Default)-httpUseHTTP/2(Defaulttrue)-keyPrivatekeyfilename(SSL/TLS(Default)-no-cDisableCompression-Preventssendingthe"Accept-Encoding:gzip"header(Defaultfalse)-no-kaDisableKeepAlive-preventsre-useofTCPconnectionsbetweendifferentHTTPrequests(Defaultfalse)-no-vrSkipverifyingSSLcertificateoftheserver(Defaultfalse)-redirAllowRedirects(Defaultfalse)-vPrintversiondetails(Defaultfalse)
-t 8个线程 -c 400个连接 -n 模拟100次请求 -d 替换-n 表示连接时间
输入
go-wrk -t=8 -c=400 -n=100 http://127.0.0.1:4399/ping1
可以稍微等待一下水流积攒(压测速度过快)。
可以看出,89
个请求全部返回。也就是说在一段请求高峰期,不会有请求进行响应。因此我认为既然内部已经睡眠,那么就也就应该对请求放行处理。
令牌桶
引入ratelimit
库
go get -u github.com/juju/ratelimit
初始化
//NewBucketreturnsanewtokenbucketthatfillsatthe//rateofonetokeneveryfillInterval,uptothegiven//maximumcapacity.Bothargumentsmustbe//positive.Thebucketisinitiallyfull.funcNewBucket(fillIntervaltime.Duration,capacityint64)*Bucket{returnNewBucketWithClock(fillInterval,capacity,nil)}//NewBucketWithClockisidenticaltoNewBucketbutinjectsatestableclock//interface.funcNewBucketWithClock(fillIntervaltime.Duration,capacityint64,clockClock)*Bucket{returnNewBucketWithQuantumAndClock(fillInterval,capacity,1,clock)}
进行Bucket
桶的初始化。
funcNewBucketWithQuantumAndClock(fillIntervaltime.Duration,capacity,quantumint64,clockClock)*Bucket{ifclock==nil{clock=realClock{}}//填充速率iffillInterval<=0{panic("tokenbucketfillintervalisnot>0")}//最大令牌容量ifcapacity<=0{panic("tokenbucketcapacityisnot>0")}//单次令牌生成量ifquantum<=0{panic("tokenbucketquantumisnot>0")}return&Bucket{clock:clock,startTime:clock.Now(),latestTick:0,fillInterval:fillInterval,capacity:capacity,quantum:quantum,availableTokens:capacity,}}
令牌桶初始化过程,初始化结构体 fillInterval
(填充速率) cap
(最大令牌量) quannum
(每次令牌生成量)。
如果三个变量有一个小于或者等于0的话直接进行报错返回。在最开始就将当前令牌数初始化为最大容量。
调用
//TakeAvailabletakesuptocountimmediatelyavailabletokensfromthe//bucket.Itreturnsthenumberoftokensremoved,orzeroifthereare//noavailabletokens.Itdoesnotblock.func(tb*Bucket)TakeAvailable(countint64)int64{tb.mu.Lock()defertb.mu.Unlock()returntb.takeAvailable(tb.clock.Now(),count)}
调用TakeAvailable
函数,传入参数为需要取出的令牌数量,返回参数是实际能够取出的令牌数量。
内部实现
func(tb*Bucket)takeAvailable(nowtime.Time,countint64)int64{//如果需要取出的令牌数小于等于零,那么就返回0个令牌ifcount<=0{return0}//根据时间对当前桶中令牌数进行计算tb.adjustavailableTokens(tb.currentTick(now))//计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌iftb.availableTokens<=0{return0}//如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数ifcount>tb.availableTokens{count=tb.availableTokens}//调整令牌数tb.availableTokens-=countreturncount}
如果需要取出的令牌数小于等于零,那么就返回0个令牌
根据时间对当前桶中令牌数进行计算
计算之后的令牌总数小于等于0,说明当前令牌不足取出,那么就直接返回0个令牌
如果当前存储的令牌数量多于请求数量,那么就返回取出令牌数
调整令牌数
调整令牌
func(tb*Bucket)adjustavailableTokens(tickint64){lastTick:=tb.latestTicktb.latestTick=tick//如果当前令牌数大于最大等于容量,直接返回最大容量iftb.availableTokens>=tb.capacity{return}//当前令牌数+=(当前时间-上次取出令牌数的时间)*quannum(每次生成令牌量)tb.availableTokens+=(tick-lastTick)*tb.quantum//如果当前令牌数大于最大等于容量,将当前令牌数=最大容量然后返回当前令牌数iftb.availableTokens>tb.capacity{tb.availableTokens=tb.capacity}return}
如果当前令牌数大于最大等于容量,直接返回最大容量
当前令牌数 += (当前时间 - 上次取出令牌数的时间) * quannum(每次生成令牌量)
如果当前令牌数大于最大等于容量, 将当前令牌数 = 最大容量 然后返回 当前令牌数
实现原理
加锁
defer
解锁判断count(想要取出的令牌数) 是否小于等于 0,如果是直接返回 0
调用函数
adjustTokens
获取可用的令牌数量如果当前可以取出的令牌数小于等于0 直接返回 0
如果当前可以取出的令牌数小于当前想要取出的令牌数(count) count = 当前可以取出的令牌数
当前的令牌数 -= 取出的令牌数 (count)
返回 count(可以取出的令牌数)
额外介绍
take
函数,能够返回等待时间和布尔值,允许欠账,没有令牌也可以取出。
func (tb *Bucket) Take(count int64) time.Duration
takeMaxDuration
函数,可以根据最大等待时间来进行判断。
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
因为他们内部的实现都基于令牌调整,我这里不做过多介绍,如果感兴趣可以自行研究一下。
测试
funcratelimit2()func(ctx*gin.Context){//生成速率最大容量r2:=rate2.NewBucket(time.Second,200)returnfunc(ctx*gin.Context){//r2.Take()//允许欠账,令牌不够也可以接收请求ifr2.TakeAvailable(1)==1{//如果想要取出1个令牌并且能够取出,就放行ctx.Next()return}response(ctx,http.StatusRequestTimeout,"rate2limit...")ctx.Abort()return}}
压测速度过于快速,在实际过程中可以根据调整令牌生成速率来进行具体限流!
</div> <div class="zixun-tj-product adv-bottom"></div> </div> </div> <div class="prve-next-news">
go语言中的限流漏桶和令牌桶库怎么使用的详细内容,希望对您有所帮助,信息来源于网络。