mirror of https://github.com/milvus-io/milvus.git
fix: Return time tick delay error and refine quota error messages (#29289)
This pr: 1. Handles the time tick delay error when converting old error codes to milvus errors. 2. Enhances quota error messages by eliminating "force deny" and substituting it with "quota exceeded." issue: https://github.com/milvus-io/milvus/issues/29288 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/29373/head
parent
1eacdc591b
commit
f457b9f7c9
|
@ -39,9 +39,9 @@ import (
|
|||
)
|
||||
|
||||
var QuotaErrorString = map[commonpb.ErrorCode]string{
|
||||
commonpb.ErrorCode_ForceDeny: "manually force deny",
|
||||
commonpb.ErrorCode_MemoryQuotaExhausted: "memory quota exhausted, please allocate more resources",
|
||||
commonpb.ErrorCode_DiskQuotaExhausted: "disk quota exhausted, please allocate more resources",
|
||||
commonpb.ErrorCode_ForceDeny: "the writing has been deactivated by the administrator",
|
||||
commonpb.ErrorCode_MemoryQuotaExhausted: "memory quota exceeded, please allocate more resources",
|
||||
commonpb.ErrorCode_DiskQuotaExhausted: "disk quota exceeded, please allocate more resources",
|
||||
commonpb.ErrorCode_TimeTickLongDelay: "time tick long delay",
|
||||
}
|
||||
|
||||
|
@ -84,10 +84,10 @@ func (m *MultiRateLimiter) Check(collectionID int64, rt internalpb.RateType, n i
|
|||
|
||||
limit, rate := limiter.limit(rt, n)
|
||||
if rate == 0 {
|
||||
return limiter.getError(rt)
|
||||
return limiter.getQuotaExceededError(rt)
|
||||
}
|
||||
if limit {
|
||||
return merr.WrapErrServiceRateLimit(rate)
|
||||
return limiter.getRateLimitError(rate)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -238,20 +238,24 @@ func (rl *rateLimiter) setRates(collectionRate *proxypb.CollectionRate) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (rl *rateLimiter) getError(rt internalpb.RateType) error {
|
||||
func (rl *rateLimiter) getQuotaExceededError(rt internalpb.RateType) error {
|
||||
switch rt {
|
||||
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
|
||||
if errCode, ok := rl.quotaStates.Get(milvuspb.QuotaState_DenyToWrite); ok {
|
||||
return merr.OldCodeToMerr(errCode)
|
||||
return merr.WrapErrServiceQuotaExceeded(GetQuotaErrorString(errCode))
|
||||
}
|
||||
case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery:
|
||||
if errCode, ok := rl.quotaStates.Get(milvuspb.QuotaState_DenyToRead); ok {
|
||||
return merr.OldCodeToMerr(errCode)
|
||||
return merr.WrapErrServiceQuotaExceeded(GetQuotaErrorString(errCode))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rl *rateLimiter) getRateLimitError(rate float64) error {
|
||||
return merr.WrapErrServiceRateLimit(rate, "request is rejected by grpc RateLimiter middleware, please retry later")
|
||||
}
|
||||
|
||||
// setRateGaugeByRateType sets ProxyLimiterRate metrics.
|
||||
func setRateGaugeByRateType(rateType internalpb.RateType, nodeID int64, collectionID int64, rate float64) {
|
||||
if ratelimitutil.Limit(rate) == ratelimitutil.Inf {
|
||||
|
|
|
@ -283,8 +283,8 @@ func TestRateLimiter(t *testing.T) {
|
|||
},
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.ErrorIs(t, limiter.getError(internalpb.RateType_DQLQuery), merr.ErrServiceForceDeny)
|
||||
assert.Equal(t, limiter.getError(internalpb.RateType_DMLInsert), merr.ErrServiceDiskLimitExceeded)
|
||||
assert.Error(t, limiter.getQuotaExceededError(internalpb.RateType_DQLQuery))
|
||||
assert.Error(t, limiter.getQuotaExceededError(internalpb.RateType_DMLInsert))
|
||||
})
|
||||
|
||||
t.Run("tests refresh rate by config", func(t *testing.T) {
|
||||
|
|
|
@ -21,7 +21,6 @@ import (
|
|||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/golang/protobuf/proto"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
|
@ -41,7 +40,7 @@ func RateLimitInterceptor(limiter types.Limiter) grpc.UnaryServerInterceptor {
|
|||
|
||||
err = limiter.Check(collectionID, rt, n)
|
||||
if err != nil {
|
||||
rsp := getFailedResponse(req, rt, err, info.FullMethod)
|
||||
rsp := getFailedResponse(req, err)
|
||||
if rsp != nil {
|
||||
return rsp, nil
|
||||
}
|
||||
|
@ -121,26 +120,8 @@ func failedMutationResult(err error) *milvuspb.MutationResult {
|
|||
}
|
||||
}
|
||||
|
||||
func wrapQuotaError(rt internalpb.RateType, err error, fullMethod string) error {
|
||||
if errors.Is(err, merr.ErrServiceRateLimit) {
|
||||
return errors.Wrapf(err, "request %s is rejected by grpc RateLimiter middleware, please retry later", fullMethod)
|
||||
}
|
||||
|
||||
// deny to write/read
|
||||
var op string
|
||||
switch rt {
|
||||
case internalpb.RateType_DMLInsert, internalpb.RateType_DMLUpsert, internalpb.RateType_DMLDelete, internalpb.RateType_DMLBulkLoad:
|
||||
op = "write"
|
||||
case internalpb.RateType_DQLSearch, internalpb.RateType_DQLQuery:
|
||||
op = "read"
|
||||
}
|
||||
|
||||
return merr.WrapErrServiceForceDeny(op, err, fullMethod)
|
||||
}
|
||||
|
||||
// getFailedResponse returns failed response.
|
||||
func getFailedResponse(req any, rt internalpb.RateType, err error, fullMethod string) any {
|
||||
err = wrapQuotaError(rt, err, fullMethod)
|
||||
func getFailedResponse(req any, err error) any {
|
||||
switch req.(type) {
|
||||
case *milvuspb.InsertRequest, *milvuspb.DeleteRequest, *milvuspb.UpsertRequest:
|
||||
return failedMutationResult(err)
|
||||
|
|
|
@ -40,7 +40,7 @@ type limiterMock struct {
|
|||
|
||||
func (l *limiterMock) Check(collection int64, rt internalpb.RateType, n int) error {
|
||||
if l.rate == 0 {
|
||||
return merr.ErrServiceForceDeny
|
||||
return merr.ErrServiceQuotaExceeded
|
||||
}
|
||||
if l.limit {
|
||||
return merr.ErrServiceRateLimit
|
||||
|
@ -167,23 +167,23 @@ func TestRateLimitInterceptor(t *testing.T) {
|
|||
|
||||
t.Run("test getFailedResponse", func(t *testing.T) {
|
||||
testGetFailedResponse := func(req interface{}, rt internalpb.RateType, err error, fullMethod string) {
|
||||
rsp := getFailedResponse(req, rt, err, fullMethod)
|
||||
rsp := getFailedResponse(req, err)
|
||||
assert.NotNil(t, rsp)
|
||||
}
|
||||
|
||||
testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, merr.ErrServiceForceDeny, "delete")
|
||||
testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLUpsert, merr.ErrServiceForceDeny, "upsert")
|
||||
testGetFailedResponse(&milvuspb.DeleteRequest{}, internalpb.RateType_DMLDelete, merr.ErrServiceQuotaExceeded, "delete")
|
||||
testGetFailedResponse(&milvuspb.UpsertRequest{}, internalpb.RateType_DMLUpsert, merr.ErrServiceQuotaExceeded, "upsert")
|
||||
testGetFailedResponse(&milvuspb.ImportRequest{}, internalpb.RateType_DMLBulkLoad, merr.ErrServiceMemoryLimitExceeded, "import")
|
||||
testGetFailedResponse(&milvuspb.SearchRequest{}, internalpb.RateType_DQLSearch, merr.ErrServiceDiskLimitExceeded, "search")
|
||||
testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, merr.ErrServiceForceDeny, "query")
|
||||
testGetFailedResponse(&milvuspb.QueryRequest{}, internalpb.RateType_DQLQuery, merr.ErrServiceQuotaExceeded, "query")
|
||||
testGetFailedResponse(&milvuspb.CreateCollectionRequest{}, internalpb.RateType_DDLCollection, merr.ErrServiceRateLimit, "createCollection")
|
||||
testGetFailedResponse(&milvuspb.FlushRequest{}, internalpb.RateType_DDLFlush, merr.ErrServiceRateLimit, "flush")
|
||||
testGetFailedResponse(&milvuspb.ManualCompactionRequest{}, internalpb.RateType_DDLCompaction, merr.ErrServiceRateLimit, "compaction")
|
||||
|
||||
// test illegal
|
||||
rsp := getFailedResponse(&milvuspb.SearchResults{}, internalpb.RateType_DQLSearch, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError), "method")
|
||||
rsp := getFailedResponse(&milvuspb.SearchResults{}, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError))
|
||||
assert.Nil(t, rsp)
|
||||
rsp = getFailedResponse(nil, internalpb.RateType_DQLSearch, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError), "method")
|
||||
rsp = getFailedResponse(nil, merr.OldCodeToMerr(commonpb.ErrorCode_UnexpectedError))
|
||||
assert.Nil(t, rsp)
|
||||
})
|
||||
|
||||
|
|
|
@ -40,8 +40,9 @@ var (
|
|||
ErrServiceCrossClusterRouting = newMilvusError("cross cluster routing", 6, false)
|
||||
ErrServiceDiskLimitExceeded = newMilvusError("disk limit exceeded", 7, false)
|
||||
ErrServiceRateLimit = newMilvusError("rate limit exceeded", 8, true)
|
||||
ErrServiceForceDeny = newMilvusError("force deny", 9, false)
|
||||
ErrServiceQuotaExceeded = newMilvusError("quota exceeded", 9, false)
|
||||
ErrServiceUnimplemented = newMilvusError("service unimplemented", 10, false)
|
||||
ErrServiceTimeTickLongDelay = newMilvusError("time tick long delay", 11, false)
|
||||
|
||||
// Collection related
|
||||
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)
|
||||
|
|
|
@ -149,7 +149,7 @@ func (s *ErrSuite) TestOldCode() {
|
|||
s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_MemoryQuotaExhausted), ErrServiceMemoryLimitExceeded)
|
||||
s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_DiskQuotaExhausted), ErrServiceDiskLimitExceeded)
|
||||
s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_RateLimit), ErrServiceRateLimit)
|
||||
s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_ForceDeny), ErrServiceForceDeny)
|
||||
s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_ForceDeny), ErrServiceQuotaExceeded)
|
||||
s.ErrorIs(OldCodeToMerr(commonpb.ErrorCode_UnexpectedError), errUnexpected)
|
||||
}
|
||||
|
||||
|
|
|
@ -153,10 +153,16 @@ func oldCode(code int32) commonpb.ErrorCode {
|
|||
case ErrServiceMemoryLimitExceeded.code():
|
||||
return commonpb.ErrorCode_InsufficientMemoryToLoad
|
||||
|
||||
case ErrServiceDiskLimitExceeded.code():
|
||||
return commonpb.ErrorCode_DiskQuotaExhausted
|
||||
|
||||
case ErrServiceTimeTickLongDelay.code():
|
||||
return commonpb.ErrorCode_TimeTickLongDelay
|
||||
|
||||
case ErrServiceRateLimit.code():
|
||||
return commonpb.ErrorCode_RateLimit
|
||||
|
||||
case ErrServiceForceDeny.code():
|
||||
case ErrServiceQuotaExceeded.code():
|
||||
return commonpb.ErrorCode_ForceDeny
|
||||
|
||||
case ErrIndexNotFound.code():
|
||||
|
@ -193,11 +199,14 @@ func OldCodeToMerr(code commonpb.ErrorCode) error {
|
|||
case commonpb.ErrorCode_DiskQuotaExhausted:
|
||||
return ErrServiceDiskLimitExceeded
|
||||
|
||||
case commonpb.ErrorCode_TimeTickLongDelay:
|
||||
return ErrServiceTimeTickLongDelay
|
||||
|
||||
case commonpb.ErrorCode_RateLimit:
|
||||
return ErrServiceRateLimit
|
||||
|
||||
case commonpb.ErrorCode_ForceDeny:
|
||||
return ErrServiceForceDeny
|
||||
return ErrServiceQuotaExceeded
|
||||
|
||||
case commonpb.ErrorCode_IndexNotExist:
|
||||
return ErrIndexNotFound
|
||||
|
@ -358,16 +367,20 @@ func WrapErrServiceDiskLimitExceeded(predict, limit float32, msg ...string) erro
|
|||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceRateLimit(rate float64) error {
|
||||
return wrapFields(ErrServiceRateLimit, value("rate", rate))
|
||||
func WrapErrServiceRateLimit(rate float64, msg ...string) error {
|
||||
err := wrapFields(ErrServiceRateLimit, value("rate", rate))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceForceDeny(op string, reason error, method string) error {
|
||||
return wrapFieldsWithDesc(ErrServiceForceDeny,
|
||||
reason.Error(),
|
||||
value("op", op),
|
||||
value("req", method),
|
||||
)
|
||||
func WrapErrServiceQuotaExceeded(reason string, msg ...string) error {
|
||||
err := wrapFields(ErrServiceQuotaExceeded, value("reason", reason))
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "->"))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func WrapErrServiceUnimplemented(grpcErr error) error {
|
||||
|
|
Loading…
Reference in New Issue