Support refund the tokens to limiter (#25598)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/25630/head
yihao.dai 2023-07-14 15:58:32 +08:00 committed by GitHub
parent 66fdc71479
commit f19621fc8c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 66 additions and 0 deletions

View File

@ -98,6 +98,9 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i
if ret == commonpb.ErrorCode_Success && !IsDDLRequest(rt) {
// only dml and dql have collection level rate limits
ret = checkFunc(m.collectionLimiters[collectionID])
if ret != commonpb.ErrorCode_Success {
m.globalDDLLimiter.cancel(rt, n)
}
}
return ret
@ -195,6 +198,14 @@ func (rl *rateLimiter) limit(rt internalpb.RateType, n int) (bool, float64) {
return !limit.AllowN(time.Now(), n), float64(limit.Limit())
}
func (rl *rateLimiter) cancel(rt internalpb.RateType, n int) {
limit, ok := rl.limiters.Get(rt)
if !ok {
return
}
limit.Cancel(n)
}
func (rl *rateLimiter) setRates(collectionRate *proxypb.CollectionRate) error {
log := log.Ctx(context.TODO()).WithRateGroup("proxy.rateLimiter", 1.0, 60.0).With(
zap.Int64("proxyNodeID", paramtable.GetNodeID()),

View File

@ -121,6 +121,13 @@ func (lim *Limiter) SetLimit(newLimit Limit) {
}
}
// Cancel the AllowN operation and refund the tokens that have already been deducted by the limiter.
func (lim *Limiter) Cancel(n int) {
lim.mu.Lock()
defer lim.mu.Unlock()
lim.tokens += float64(n)
}
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed. advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {

View File

@ -343,3 +343,51 @@ func BenchmarkLimiter_AllowN(b *testing.B) {
}
})
}
func TestLimiter_Cancel(t *testing.T) {
// The test runs for a few (fake) seconds executing many requests
// and then checks that overall number of requests is reasonable.
const (
limit = 100
burst = 100
)
var (
numOK = int32(0)
tt = makeTestTime(t)
)
lim := NewLimiter(limit, burst)
start := tt.now()
end := start.Add(5 * time.Second)
i := 0
cancelNum := 100
for tt.now().Before(end) {
if ok := lim.AllowN(tt.now(), 1); ok {
numOK++
// inject some cancellations
if i <= cancelNum {
lim.Cancel(1)
numOK--
}
}
// This will still offer ~500 requests per second, but won't consume
// outrageous amount of CPU.
tt.advance(100 * time.Microsecond)
i++
}
elapsed := tt.since(start)
ideal := burst + (limit * float64(elapsed) / float64(time.Second))
// We should never get more requests than allowed.
t.Log("numOK =", numOK, "want", int32(ideal), "ideal", ideal)
if want := int32(ideal); numOK > want {
t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
}
// We should get very close to the number of requests allowed.
t.Log("numOK =", numOK, "want", int32(0.999*ideal), "ideal", ideal)
if want := int32(0.999 * ideal); numOK < want {
t.Errorf("numOK = %d, want %d (ideal %f)", numOK, want, ideal)
}
}