diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cda2be2 --- /dev/null +++ b/go.mod @@ -0,0 +1,40 @@ +module github.com/cloudwego-contrib/cwgo-pkg + +go 1.21 + +require ( + github.com/c9s/goprocinfo v0.0.0-20210130143923-c95fcf8c64a8 + github.com/cloudwego/hertz v0.9.2 + github.com/hertz-contrib/limiter v0.0.0-20221008063035-ad27db7cc386 + github.com/stretchr/testify v1.8.2 +) + +require ( + github.com/bytedance/go-tagexpr/v2 v2.9.2 // indirect + github.com/bytedance/gopkg v0.0.0-20240514070511-01b2cbcf35e1 // indirect + github.com/bytedance/sonic v1.11.8 // indirect + github.com/bytedance/sonic/loader v0.1.1 // indirect + github.com/cloudwego/base64x v0.1.4 // indirect + github.com/cloudwego/iasm v0.2.0 // indirect + github.com/cloudwego/netpoll v0.6.3 // indirect + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/fsnotify/fsnotify v1.5.4 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.7 // indirect + github.com/henrylee2cn/ameda v1.4.10 // indirect + github.com/henrylee2cn/goutil v0.0.0-20210127050712-89660552f6f8 // indirect + github.com/klauspost/cpuid/v2 v2.2.4 // indirect + github.com/kr/pretty v0.1.0 // indirect + github.com/nyaruka/phonenumbers v1.0.55 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/tidwall/gjson v1.14.4 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.0 // indirect + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect + golang.org/x/arch v0.2.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/limiter/bbr.go b/limiter/bbr.go new file mode 100644 index 0000000..b103621 --- /dev/null +++ b/limiter/bbr.go @@ -0,0 +1,250 @@ +/* + * Copyright 2022 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package limiter + +import ( + "errors" + "github.com/cloudwego-contrib/cwgo-pkg/limiter/internal/utils" + "math" + "sync/atomic" + "time" + + "github.com/c9s/goprocinfo/linux" +) + +var ( + gCPU int64 + gStat linux.CPUStat + ErrLimit = "Hertz Adaptive Limit" +) + +type ( + cpuGetter func() int64 +) + +// getCpuLoad get CPU state by reading /proc/stat +func getCpuLoad() linux.CPUStat { + stat, err := linux.ReadStat("/proc/stat") + if err != nil { + panic("stat read fail") + } + return stat.CPUStatAll +} + +// calcCoreUsage calculate the overall utilization by reading the previous CPU state and the current CPU state +func calcCoreUsage(curr, prev linux.CPUStat) float64 { + PrevIdle := prev.Idle + prev.IOWait + Idle := curr.Idle + curr.IOWait + + PrevNonIdle := prev.User + prev.Nice + prev.System + prev.IRQ + prev.SoftIRQ + prev.Steal + NonIdle := curr.User + curr.Nice + curr.System + curr.IRQ + curr.SoftIRQ + curr.Steal + + PrevTotal := PrevIdle + PrevNonIdle + Total := Idle + NonIdle + totald := Total - PrevTotal + idled := Idle - PrevIdle + + CPU_Percentage := (float64(totald) - float64(idled)) / float64(totald) + + return CPU_Percentage +} + +func init() { + go cpuProc() +} + +// cpuProc CPU load correction by EMA algorithm +func cpuProc() { + ticker := time.NewTicker(opt.SamplingTime) // same to cpu sample rate + defer func() { + ticker.Stop() + if err := recover(); err != nil { + go cpuProc() + } + }() + + // EMA algorithm: https://blog.csdn.net/m0_38106113/article/details/81542863 + for range ticker.C { + preState := gStat + curState := getCpuLoad() + usage := calcCoreUsage(preState, curState) + prevCPU := atomic.LoadInt64(&gCPU) + curCPU := int64(float64(prevCPU)*opt.Decay + float64(usage*10)*(1.0-opt.Decay)) + atomic.StoreInt64(&gCPU, curCPU) + } +} + +// counterCache is used to cache maxPASS and minRt result. +type counterCache struct { + val int64 + time time.Time +} + +// BBR implements bbr-like limiter.It is inspired by sentinel. +// https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81 +type BBR struct { + cpu cpuGetter + passStat *utils.RollingWindow // request succeeded + rtStat *utils.RollingWindow // time consume + inFlight int64 // Number of requests being processed + bucketPerSecond int64 + bucketDuration time.Duration + + // prevDropTime defines previous start drop since initTime + prevDropTime atomic.Value + maxPASSCache atomic.Value + minRtCache atomic.Value + + opts options +} + +func NewLimiter(opts ...Option) *BBR { + opt := NewOption(opts...) + bucketDuration := opt.Window / time.Duration(opt.Bucket) + // 10s / 100 = 100ms + passStat := utils.NewRollingWindow(opt.Bucket, bucketDuration, utils.IgnoreCurrentBucket()) + rtStat := utils.NewRollingWindow(opt.Bucket, bucketDuration, utils.IgnoreCurrentBucket()) + + limiter := &BBR{ + opts: opt, + passStat: passStat, + rtStat: rtStat, + bucketDuration: bucketDuration, + bucketPerSecond: int64(time.Second / bucketDuration), + cpu: func() int64 { return atomic.LoadInt64(&gCPU) }, + } + + return limiter +} + +// maxPass maximum number of requests in a single sampling window +func (l *BBR) maxPass() int64 { + passCache := l.maxPASSCache.Load() + if passCache != nil { + ps := passCache.(*counterCache) + if l.timespan(ps.time) < 1 { + return ps.val + } + // Avoid glitches caused by fluctuations + } + var rawMaxPass float64 + l.passStat.Reduce(func(b *utils.Bucket) { + rawMaxPass = math.Max(float64(b.Sum), rawMaxPass) + }) + if rawMaxPass <= 0 { + rawMaxPass = 1 + } + l.maxPASSCache.Store(&counterCache{ + val: int64(rawMaxPass), + time: time.Now(), + }) + return int64(rawMaxPass) +} + +// timespan returns the passed bucket count +func (l *BBR) timespan(lastTime time.Time) int { + v := int(time.Since(lastTime) / l.bucketDuration) + if v > -1 { + return v + } + return l.opts.Bucket +} + +// minRT minimum response time +func (l *BBR) minRT() int64 { + rtCache := l.minRtCache.Load() + if rtCache != nil { + rc := rtCache.(*counterCache) + if l.timespan(rc.time) < 1 { + return rc.val + } + } + // Go to the nearest response time within 1s + var rawMinRT float64 = 1 << 31 + l.rtStat.Reduce(func(b *utils.Bucket) { + if b.Count <= 0 { + return + } + if rawMinRT > math.Ceil(b.Sum/float64(b.Count)) { + rawMinRT = math.Ceil(b.Sum / float64(b.Count)) + } + }) + if rawMinRT == 1<<31 { + rawMinRT = 1 + } + l.minRtCache.Store(&counterCache{ + val: int64(rawMinRT), + time: time.Now(), + }) + return int64(rawMinRT) +} + +// maxInFlight calculating the load +func (l *BBR) maxInFlight() int64 { + return int64(math.Ceil(float64(l.maxPass()*l.minRT()*l.bucketPerSecond) / 1000.0)) +} + +// shouldDrop (CPU load > 80% || (now - prevDrop) < 1s) and (MaxPass * MinRT * windows) / 1000 < InFlight +func (l *BBR) shouldDrop() bool { + now := time.Duration(time.Now().UnixNano()) + if l.cpu() < l.opts.CPUThreshold { + // current cpu payload below the threshold + prevDropTime, _ := l.prevDropTime.Load().(time.Duration) + if prevDropTime == 0 { + // haven't start drop, + // accept current request + return false + } + if time.Duration(now-prevDropTime) <= time.Second { + // just start drop one second ago, + // check current inflight count + inFlight := atomic.LoadInt64(&l.inFlight) + return inFlight > 1 && inFlight > l.maxInFlight() + } + l.prevDropTime.Store(time.Duration(0)) + return false + } + // current cpu payload exceeds the threshold + inFlight := atomic.LoadInt64(&l.inFlight) + drop := inFlight > 1 && inFlight > l.maxInFlight() + if drop { + prevDrop, _ := l.prevDropTime.Load().(time.Duration) + if prevDrop != 0 { + // already started drop, return directly + return drop + } + // store start drop time + l.prevDropTime.Store(now) + } + return drop +} + +// Allow determines the alarm triggering conditions, record the interface time consumption and QPS +func (l *BBR) Allow() (func(), error) { + if l.shouldDrop() { + return nil, errors.New(ErrLimit) + } + atomic.AddInt64(&l.inFlight, 1) + start := time.Now().UnixNano() + // DoneFunc record time-consuming + return func() { + rt := (time.Now().UnixNano() - start) / int64(time.Millisecond) + l.rtStat.Add(float64(rt)) + atomic.AddInt64(&l.inFlight, -1) + l.passStat.Add(1) + }, nil +} diff --git a/limiter/internal/utils/windows.go b/limiter/internal/utils/windows.go new file mode 100644 index 0000000..9967e39 --- /dev/null +++ b/limiter/internal/utils/windows.go @@ -0,0 +1,160 @@ +/* + * Copyright 2022 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "sync" + "time" +) + +type ( + RollingWindowOption func(rollingWindow *RollingWindow) + RollingWindow struct { + lock sync.RWMutex + size int + win *window + interval time.Duration + offset int + lastTime time.Time + ignoreCurrent bool + } +) + +// NewRollingWindow returns a RollingWindow that with size buckets and time interval,use opts to customize the RollingWindow. +func NewRollingWindow(size int, interval time.Duration, opts ...RollingWindowOption) *RollingWindow { + if size < 1 { + panic("size must be greater than 0") + } + + w := &RollingWindow{ + size: size, + win: newWindow(size), + interval: interval, + lastTime: time.Now(), + } + for _, opt := range opts { + opt(w) + } + return w +} + +// Add adds value to current bucket. +func (rw *RollingWindow) Add(v float64) { + rw.lock.Lock() + defer rw.lock.Unlock() + rw.updateOffset() + rw.win.add(rw.offset, v) +} + +// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set. +func (rw *RollingWindow) Reduce(fn func(b *Bucket)) { + rw.lock.RLock() + defer rw.lock.RUnlock() + + var diff int + span := rw.span() + // ignore current bucket + if span == 0 && rw.ignoreCurrent { + diff = rw.size - 1 + } else { + diff = rw.size - span + } + if diff > 0 { + offset := (rw.offset + span + 1) % rw.size + rw.win.reduce(offset, diff, fn) + + } +} + +// span Return the elapsed time interval +func (rw *RollingWindow) span() int { + offset := int(time.Since(rw.lastTime) / rw.interval) + if 0 <= offset && offset < rw.size { + return offset + } + return rw.size +} + +// updateOffset Update the offset of window +func (rw *RollingWindow) updateOffset() { + span := rw.span() + if span <= 0 { + return + } + + offset := rw.offset + // reset expired buckets + for i := 0; i < span; i++ { + rw.win.resetBucket((offset + i + 1) % rw.size) + } + + rw.offset = (offset + span) % rw.size + rw.lastTime = rw.lastTime.Add(time.Since(rw.lastTime.Add(time.Since(rw.lastTime) % rw.interval))) +} + +// Bucket defines the bucket that holds sum and num of additions. +type Bucket struct { + Sum float64 + Count int64 +} + +func (b *Bucket) add(v float64) { + b.Sum += v + b.Count++ +} + +func (b *Bucket) reset() { + b.Sum = 0 + b.Count = 0 +} + +type window struct { + buckets []*Bucket + size int +} + +func newWindow(size int) *window { + buckets := make([]*Bucket, size) + for i := 0; i < size; i++ { + buckets[i] = new(Bucket) + } + return &window{ + buckets: buckets, + size: size, + } +} + +func (w *window) add(offset int, v float64) { + w.buckets[offset%w.size].add(v) +} + +func (w *window) reduce(start, count int, fn func(b *Bucket)) { + for i := 0; i < count; i++ { + fn(w.buckets[(start+i)%w.size]) + } +} + +func (w *window) resetBucket(offset int) { + w.buckets[offset%w.size].reset() +} + +// IgnoreCurrentBucket lets the Reduce call ignore current bucket. +func IgnoreCurrentBucket() RollingWindowOption { + return func(w *RollingWindow) { + w.ignoreCurrent = true + } +} diff --git a/limiter/internal/utils/windows_test.go b/limiter/internal/utils/windows_test.go new file mode 100644 index 0000000..6218ee7 --- /dev/null +++ b/limiter/internal/utils/windows_test.go @@ -0,0 +1,102 @@ +/* + * Copyright 2022 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package utils + +import ( + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestNewRollingWindow(t *testing.T) { + assert.NotNil(t, NewRollingWindow(10, time.Second)) + assert.Panics(t, func() { + NewRollingWindow(0, time.Second) + }) +} + +func TestRollingWindowAdd(t *testing.T) { + r := NewRollingWindow(3, time.Millisecond*5) + list := func() []float64 { + var buckets []float64 + r.Reduce(func(b *Bucket) { + buckets = append(buckets, b.Sum) + }) + return buckets + } + assert.Equal(t, []float64{0, 0, 0}, list()) + r.Add(1) + assert.Equal(t, []float64{0, 0, 1}, list()) + time.Sleep(5 * time.Millisecond) + // next cycle + r.Add(2) + r.Add(3) + // 0 0 1 -> 0 1 0 -> 0 1 2 -> 0 1 5 + assert.Equal(t, []float64{0, 1, 5}, list()) +} + +func TestRollingWindowSum(t *testing.T) { + r := NewRollingWindow(3, time.Millisecond*5) + var cnt float64 + list := func() float64 { + r.Reduce(func(b *Bucket) { + cnt = math.Max(cnt, b.Sum) + }) + return cnt + } + assert.Equal(t, float64(0), list()) + r.Add(1) + assert.Equal(t, float64(1), list()) + time.Sleep(5 * time.Millisecond) + // next cycle + r.Add(2) + r.Add(3) + // 0 0 1 -> 0 1 0 -> 0 1 2 -> 0 1 5 + assert.Equal(t, float64(5), list()) +} + +func TestRollingWindowsAvg(t *testing.T) { + r := NewRollingWindow(3, time.Second*5) + var cnt float64 = 1 << 31 + list := func() float64 { + r.Reduce(func(b *Bucket) { + if b.Count <= 0 { + return + } + if cnt > math.Ceil(b.Sum/float64(b.Count)) { + cnt = math.Ceil(b.Sum / float64(b.Count)) + } + }) + if cnt == 1<<31 { + return 1 + } + return cnt + } + assert.Equal(t, float64(1), list()) + r.Add(1) + assert.Equal(t, float64(1), list()) + time.Sleep(5 * time.Second) + // next cycle + r.Add(2) + r.Add(3) + time.Sleep(5 * time.Second) + r.Add(4) + r.Add(5) + assert.Equal(t, float64(1), list()) +} diff --git a/limiter/limiter.go b/limiter/limiter.go new file mode 100644 index 0000000..e0fbb99 --- /dev/null +++ b/limiter/limiter.go @@ -0,0 +1,7 @@ +package limiter + +type Limiter interface { + // Allow returns an error if the request should be rejected + // should call returned function when request is consumed + Allow() (func(), error) +} diff --git a/limiter/middleware.go b/limiter/middleware.go new file mode 100644 index 0000000..7308bfd --- /dev/null +++ b/limiter/middleware.go @@ -0,0 +1,20 @@ +package limiter + +import ( + "context" + "github.com/cloudwego/hertz/pkg/app" + "github.com/cloudwego/hertz/pkg/protocol/consts" +) + +func NewHertzMiddleware(limiter Limiter) app.HandlerFunc { + return func(c context.Context, ctx *app.RequestContext) { + doneFunc, err := limiter.Allow() + if err != nil { + _ = ctx.AbortWithError(consts.StatusTooManyRequests, err) + ctx.String(consts.StatusTooManyRequests, ctx.Errors.String()) + } else { + ctx.Next(c) + doneFunc() + } + } +} diff --git a/limiter/options.go b/limiter/options.go new file mode 100644 index 0000000..1fe3f0e --- /dev/null +++ b/limiter/options.go @@ -0,0 +1,80 @@ +/* + * Copyright 2022 CloudWeGo Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package limiter + +import "time" + +type Option func(o *options) + +var opt = options{ + Window: time.Second * 10, + Bucket: 100, // 100ms + CPUThreshold: 800, // CPU load 80% + SamplingTime: 500 * time.Millisecond, // + Decay: 0.95, // +} + +type options struct { + Window time.Duration + Bucket int + CPUThreshold int64 + SamplingTime time.Duration + Decay float64 +} + +// WithWindow defines time duration per window +func WithWindow(window time.Duration) Option { + return func(o *options) { + o.Window = window + } +} + +// WithBucket defines bucket number for each window +func WithBucket(bucket int) Option { + return func(o *options) { + o.Bucket = bucket + } +} + +// WithCPUThreshold defines cpu threshold load. +// e.g. if you want to set the cpu threshold to 80%, you should set the value to 800. +func WithCPUThreshold(threshold int64) Option { + return func(o *options) { + o.CPUThreshold = threshold + } +} + +// WithSamplingTime defines cpu sampling time interval +func WithSamplingTime(samplingTime time.Duration) Option { + return func(o *options) { + o.SamplingTime = samplingTime + } +} + +// WithDecay defines cpu attenuation factor +func WithDecay(decay float64) Option { + return func(o *options) { + o.Decay = decay + } +} + +func NewOption(opts ...Option) options { + for _, apply := range opts { + apply(&opt) + } + return opt +}