enable global static rate limit (#24009)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/24085/head
wei liu 2023-05-12 15:49:20 +08:00 committed by GitHub
parent 113f9a0ebc
commit 4cd9f32dd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 57 additions and 14 deletions

View File

@ -76,25 +76,31 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
var limiter *rateLimiter
if IsDDLRequest(rt) {
limiter = m.globalDDLLimiter
} else {
limiter = m.collectionLimiters[collectionID]
}
checkFunc := func(limiter *rateLimiter) commonpb.ErrorCode {
if limiter == nil {
return commonpb.ErrorCode_Success
}
if limiter == nil {
limit, rate := limiter.limit(rt, n)
if rate == 0 {
return limiter.getErrorCode(rt)
}
if limit {
return commonpb.ErrorCode_RateLimit
}
return commonpb.ErrorCode_Success
}
limit, rate := limiter.limit(rt, n)
if rate == 0 {
return limiter.getErrorCode(rt)
// first, check global level rate limits
ret := checkFunc(m.globalDDLLimiter)
// second check collection level rate limits
if ret == commonpb.ErrorCode_Success && !IsDDLRequest(rt) {
// only dml and dql have collection level rate limits
ret = checkFunc(m.collectionLimiters[collectionID])
}
if limit {
return commonpb.ErrorCode_RateLimit
}
return commonpb.ErrorCode_Success
return ret
}
func IsDDLRequest(rt internalpb.RateType) bool {

View File

@ -69,6 +69,43 @@ func TestMultiRateLimiter(t *testing.T) {
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
t.Run("test global static limit", func(t *testing.T) {
bak := Params.QuotaConfig.QuotaAndLimitsEnabled
paramtable.Get().Save(Params.QuotaConfig.QuotaAndLimitsEnabled.Key, "true")
multiLimiter := NewMultiRateLimiter()
multiLimiter.collectionLimiters[1] = newRateLimiter()
multiLimiter.collectionLimiters[2] = newRateLimiter()
multiLimiter.collectionLimiters[3] = newRateLimiter()
for _, rt := range internalpb.RateType_value {
if IsDDLRequest(internalpb.RateType(rt)) {
multiLimiter.globalDDLLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(5), 1))
} else {
multiLimiter.globalDDLLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(2), 1))
multiLimiter.collectionLimiters[1].limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(2), 1))
multiLimiter.collectionLimiters[2].limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(2), 1))
multiLimiter.collectionLimiters[3].limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(2), 1))
}
}
for _, rt := range internalpb.RateType_value {
if IsDDLRequest(internalpb.RateType(rt)) {
errCode := multiLimiter.Check(1, internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(1, internalpb.RateType(rt), 5)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(1, internalpb.RateType(rt), 5)
assert.Equal(t, commonpb.ErrorCode_RateLimit, errCode)
} else {
errCode := multiLimiter.Check(1, internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(2, internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(3, internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_RateLimit, errCode)
}
}
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})
t.Run("not enable quotaAndLimit", func(t *testing.T) {
multiLimiter := NewMultiRateLimiter()
multiLimiter.collectionLimiters[collectionID] = newRateLimiter()