make ddl rate limit be global level (#23962)

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/23988/head
wei liu 2023-05-09 16:56:40 +08:00 committed by GitHub
parent fb6e4ceee7
commit d06ae53f79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 41 deletions

View File

@ -51,15 +51,18 @@ func GetQuotaErrorString(errCode commonpb.ErrorCode) string {
// MultiRateLimiter includes multilevel rate limiters, such as global rateLimiter,
// collection level rateLimiter and so on. It also implements Limiter interface.
type MultiRateLimiter struct {
// TODO: add collection level rateLimiter
quotaStatesMu sync.RWMutex
quotaStatesMu sync.RWMutex
// for DML and DQL
collectionLimiters map[int64]*rateLimiter
// for DDL
globalDDLLimiter *rateLimiter
}
// NewMultiRateLimiter returns a new MultiRateLimiter.
func NewMultiRateLimiter() *MultiRateLimiter {
m := &MultiRateLimiter{
collectionLimiters: make(map[int64]*rateLimiter, 0),
globalDDLLimiter: newRateLimiter(),
}
return m
}
@ -73,7 +76,13 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i
m.quotaStatesMu.RLock()
defer m.quotaStatesMu.RUnlock()
limiter := m.collectionLimiters[collectionID]
var limiter *rateLimiter
if IsDDLRequest(rt) {
limiter = m.globalDDLLimiter
} else {
limiter = m.collectionLimiters[collectionID]
}
if limiter == nil {
return commonpb.ErrorCode_Success
}
@ -88,6 +97,16 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i
return commonpb.ErrorCode_Success
}
func IsDDLRequest(rt internalpb.RateType) bool {
switch rt {
case internalpb.RateType_DDLCollection | internalpb.RateType_DDLPartition | internalpb.RateType_DDLIndex |
internalpb.RateType_DDLFlush | internalpb.RateType_DDLCompaction:
return true
default:
return false
}
}
// GetQuotaStates returns quota states.
func (m *MultiRateLimiter) GetQuotaStates() ([]milvuspb.QuotaState, []string) {
m.quotaStatesMu.RLock()

View File

@ -42,15 +42,29 @@ func TestMultiRateLimiter(t *testing.T) {
multiLimiter := NewMultiRateLimiter()
multiLimiter.collectionLimiters[collectionID] = newRateLimiter()
for _, rt := range internalpb.RateType_value {
multiLimiter.collectionLimiters[collectionID].limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1))
if IsDDLRequest(internalpb.RateType(rt)) {
multiLimiter.globalDDLLimiter.limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(5), 1))
} else {
multiLimiter.collectionLimiters[collectionID].limiters.Insert(internalpb.RateType(rt), ratelimitutil.NewLimiter(ratelimitutil.Limit(1000), 1))
}
}
for _, rt := range internalpb.RateType_value {
errCode := multiLimiter.Check(collectionID, internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(collectionID, internalpb.RateType(rt), math.MaxInt)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(collectionID, internalpb.RateType(rt), math.MaxInt)
assert.Equal(t, commonpb.ErrorCode_RateLimit, errCode)
if IsDDLRequest(internalpb.RateType(rt)) {
errCode := multiLimiter.Check(collectionID, internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(collectionID, internalpb.RateType(rt), 5)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(collectionID, internalpb.RateType(rt), 5)
assert.Equal(t, commonpb.ErrorCode_RateLimit, errCode)
} else {
errCode := multiLimiter.Check(collectionID, internalpb.RateType(rt), 1)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(collectionID, internalpb.RateType(rt), math.MaxInt)
assert.Equal(t, commonpb.ErrorCode_Success, errCode)
errCode = multiLimiter.Check(collectionID, internalpb.RateType(rt), math.MaxInt)
assert.Equal(t, commonpb.ErrorCode_RateLimit, errCode)
}
}
Params.QuotaConfig.QuotaAndLimitsEnabled = bak
})

View File

@ -22,14 +22,12 @@ import (
"reflect"
"github.com/golang/protobuf/proto"
"go.uber.org/zap"
"google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/milvuspb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log"
)
// RateLimitInterceptor returns a new unary server interceptors that performs request rate limiting.
@ -40,37 +38,14 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor {
return handler(ctx, req)
}
switch request := req.(type) {
case *milvuspb.FlushRequest:
collectionNames := request.GetCollectionNames()
for _, collectionName := range collectionNames {
collectionID, err = globalMetaCache.GetCollectionID(context.TODO(), collectionName)
if err != nil {
log.Info("skip limit check",
zap.String("method", info.FullMethod),
zap.Error(err),
)
return handler(ctx, req)
}
code := limiter.Check(collectionID, rt, n)
if code != commonpb.ErrorCode_Success {
rsp := getFailedResponse(req, rt, code, info.FullMethod)
if rsp != nil {
return rsp, nil
}
}
code := limiter.Check(collectionID, rt, n)
if code != commonpb.ErrorCode_Success {
rsp := getFailedResponse(req, rt, code, info.FullMethod)
if rsp != nil {
return rsp, nil
}
return handler(ctx, req)
default:
code := limiter.Check(collectionID, rt, n)
if code != commonpb.ErrorCode_Success {
rsp := getFailedResponse(req, rt, code, info.FullMethod)
if rsp != nil {
return rsp, nil
}
}
return handler(ctx, req)
}
return handler(ctx, req)
}
}