From f19621fc8c2e3bb9d43f6242b6cd414a45c8aa8f Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Fri, 14 Jul 2023 15:58:32 +0800 Subject: [PATCH] Support refund the tokens to limiter (#25598) Signed-off-by: bigsheeper --- internal/proxy/multi_rate_limiter.go | 11 ++++++ pkg/util/ratelimitutil/limiter.go | 7 ++++ pkg/util/ratelimitutil/limiter_test.go | 48 ++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/internal/proxy/multi_rate_limiter.go b/internal/proxy/multi_rate_limiter.go index 59fa76cfff..b2e08b4cf6 100644 --- a/internal/proxy/multi_rate_limiter.go +++ b/internal/proxy/multi_rate_limiter.go @@ -98,6 +98,9 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i if ret == commonpb.ErrorCode_Success && !IsDDLRequest(rt) { // only dml and dql have collection level rate limits ret = checkFunc(m.collectionLimiters[collectionID]) + if ret != commonpb.ErrorCode_Success { + m.globalDDLLimiter.cancel(rt, n) + } } return ret @@ -195,6 +198,14 @@ func (rl *rateLimiter) limit(rt internalpb.RateType, n int) (bool, float64) { return !limit.AllowN(time.Now(), n), float64(limit.Limit()) } +func (rl *rateLimiter) cancel(rt internalpb.RateType, n int) { + limit, ok := rl.limiters.Get(rt) + if !ok { + return + } + limit.Cancel(n) +} + func (rl *rateLimiter) setRates(collectionRate *proxypb.CollectionRate) error { log := log.Ctx(context.TODO()).WithRateGroup("proxy.rateLimiter", 1.0, 60.0).With( zap.Int64("proxyNodeID", paramtable.GetNodeID()), diff --git a/pkg/util/ratelimitutil/limiter.go b/pkg/util/ratelimitutil/limiter.go index b4385ec571..6528f2bf49 100644 --- a/pkg/util/ratelimitutil/limiter.go +++ b/pkg/util/ratelimitutil/limiter.go @@ -121,6 +121,13 @@ func (lim *Limiter) SetLimit(newLimit Limit) { } } +// Cancel the AllowN operation and refund the tokens that have already been deducted by the limiter. +func (lim *Limiter) Cancel(n int) { + lim.mu.Lock() + defer lim.mu.Unlock() + lim.tokens += float64(n) +} + // advance calculates and returns an updated state for lim resulting from the passage of time. // lim is not changed. advance requires that lim.mu is held. func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { diff --git a/pkg/util/ratelimitutil/limiter_test.go b/pkg/util/ratelimitutil/limiter_test.go index 6ce7d89e1e..8b86cee8ef 100644 --- a/pkg/util/ratelimitutil/limiter_test.go +++ b/pkg/util/ratelimitutil/limiter_test.go @@ -343,3 +343,51 @@ func BenchmarkLimiter_AllowN(b *testing.B) { } }) } + +func TestLimiter_Cancel(t *testing.T) { + // The test runs for a few (fake) seconds executing many requests + // and then checks that overall number of requests is reasonable. + const ( + limit = 100 + burst = 100 + ) + var ( + numOK = int32(0) + tt = makeTestTime(t) + ) + + lim := NewLimiter(limit, burst) + + start := tt.now() + end := start.Add(5 * time.Second) + i := 0 + cancelNum := 100 + for tt.now().Before(end) { + if ok := lim.AllowN(tt.now(), 1); ok { + numOK++ + // inject some cancellations + if i <= cancelNum { + lim.Cancel(1) + numOK-- + } + } + + // This will still offer ~500 requests per second, but won't consume + // outrageous amount of CPU. + tt.advance(100 * time.Microsecond) + i++ + } + elapsed := tt.since(start) + ideal := burst + (limit * float64(elapsed) / float64(time.Second)) + + // We should never get more requests than allowed. + t.Log("numOK =", numOK, "want", int32(ideal), "ideal", ideal) + if want := int32(ideal); numOK > want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } + // We should get very close to the number of requests allowed. + t.Log("numOK =", numOK, "want", int32(0.999*ideal), "ideal", ideal) + if want := int32(0.999 * ideal); numOK < want { + t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal) + } +}