From 9087b6f42e3ab069b2c814f67b1a0be520b48b7e Mon Sep 17 00:00:00 2001 From: aoiasd <45024769+aoiasd@users.noreply.github.com> Date: Thu, 4 Jul 2024 10:08:10 +0800 Subject: [PATCH] enhance: [Cherry-Pick] support mark error as user error (#33498) (#34396) relate: https://github.com/milvus-io/milvus/issues/33492 pr: https://github.com/milvus-io/milvus/pull/33498 --------- Signed-off-by: aoiasd --- internal/proxy/accesslog/info/grpc_info.go | 36 ++++++++++--- .../proxy/accesslog/info/grpc_info_test.go | 16 ++++++ internal/proxy/accesslog/info/info.go | 6 +++ internal/proxy/accesslog/info/restful_info.go | 4 ++ internal/proxy/impl.go | 2 +- internal/proxy/task.go | 3 +- internal/proxy/task_delete.go | 7 +-- internal/proxy/task_insert.go | 6 +-- internal/proxy/task_query.go | 16 +++--- internal/proxy/task_search.go | 6 +-- internal/proxy/task_upsert.go | 4 +- internal/proxy/util.go | 4 +- pkg/util/merr/errors.go | 45 ++++++++++++---- pkg/util/merr/utils.go | 52 +++++++++++++++++-- 14 files changed, 165 insertions(+), 42 deletions(-) diff --git a/internal/proxy/accesslog/info/grpc_info.go b/internal/proxy/accesslog/info/grpc_info.go index 7dcd7c3bd7..a609440537 100644 --- a/internal/proxy/accesslog/info/grpc_info.go +++ b/internal/proxy/accesslog/info/grpc_info.go @@ -181,24 +181,46 @@ func (i *GrpcAccessInfo) ErrorCode() string { return fmt.Sprint(merr.Code(i.err)) } +func (i *GrpcAccessInfo) respStatus() *commonpb.Status { + baseResp, ok := i.resp.(BaseResponse) + if ok { + return baseResp.GetStatus() + } + + status, ok := i.resp.(*commonpb.Status) + if ok { + return status + } + return nil +} + func (i *GrpcAccessInfo) ErrorMsg() string { if i.err != nil { return i.err.Error() } - baseResp, ok := i.resp.(BaseResponse) - if ok { - status := baseResp.GetStatus() + if status := i.respStatus(); status != nil { return status.GetReason() } - status, ok := i.resp.(*commonpb.Status) - if ok { - return status.GetReason() - } return Unknown } +func (i *GrpcAccessInfo) ErrorType() string { + if i.err != nil { + return merr.GetErrorType(i.err).String() + } + + if status := i.respStatus(); status.GetCode() > 0 { + if _, ok := status.ExtraInfo[merr.InputErrorFlagKey]; ok { + return merr.InputError.String() + } + return merr.SystemError.String() + } + + return "" +} + func (i *GrpcAccessInfo) DbName() string { name, ok := requestutil.GetDbNameFromRequest(i.req) if !ok { diff --git a/internal/proxy/accesslog/info/grpc_info_test.go b/internal/proxy/accesslog/info/grpc_info_test.go index 6d4eb7f968..9d5de78543 100644 --- a/internal/proxy/accesslog/info/grpc_info_test.go +++ b/internal/proxy/accesslog/info/grpc_info_test.go @@ -103,6 +103,22 @@ func (s *GrpcAccessInfoSuite) TestErrorMsg() { s.Equal("rpc error: code = Unavailable desc = mock", result[0]) } +func (s *GrpcAccessInfoSuite) TestErrorType() { + s.info.resp = &milvuspb.QueryResults{ + Status: merr.Status(nil), + } + result := Get(s.info, "$error_type") + s.Equal("", result[0]) + + s.info.resp = merr.Status(merr.WrapErrAsInputError(merr.ErrParameterInvalid)) + result = Get(s.info, "$error_type") + s.Equal(merr.InputError.String(), result[0]) + + s.info.err = merr.ErrParameterInvalid + result = Get(s.info, "$error_type") + s.Equal(merr.SystemError.String(), result[0]) +} + func (s *GrpcAccessInfoSuite) TestDbName() { s.info.req = nil result := Get(s.info, "$database_name") diff --git a/internal/proxy/accesslog/info/info.go b/internal/proxy/accesslog/info/info.go index 3f69fc8519..12ac0dc9d8 100644 --- a/internal/proxy/accesslog/info/info.go +++ b/internal/proxy/accesslog/info/info.go @@ -34,6 +34,7 @@ var MetricFuncMap = map[string]getMetricFunc{ "$response_size": getResponseSize, "$error_code": getErrorCode, "$error_msg": getErrorMsg, + "$error_type": getErrorType, "$database_name": getDbName, "$collection_name": getCollectionName, "$partition_name": getPartitionName, @@ -61,6 +62,7 @@ type AccessInfo interface { ResponseSize() string ErrorCode() string ErrorMsg() string + ErrorType() string DbName() string CollectionName() string PartitionName() string @@ -115,6 +117,10 @@ func getErrorMsg(i AccessInfo) string { return i.ErrorMsg() } +func getErrorType(i AccessInfo) string { + return i.ErrorType() +} + func getDbName(i AccessInfo) string { return i.DbName() } diff --git a/internal/proxy/accesslog/info/restful_info.go b/internal/proxy/accesslog/info/restful_info.go index 21aef6d3b4..f64530f4de 100644 --- a/internal/proxy/accesslog/info/restful_info.go +++ b/internal/proxy/accesslog/info/restful_info.go @@ -131,6 +131,10 @@ func (i *RestfulInfo) ErrorMsg() string { return fmt.Sprint(message) } +func (i *RestfulInfo) ErrorType() string { + return Unknown +} + func (i *RestfulInfo) SdkVersion() string { return "Restful" } diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index c16b953cef..35a48c1ba4 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2551,7 +2551,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest) log.Warn("Failed to enqueue insert task: " + err.Error()) metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc() - return constructFailedResponse(err), nil + return constructFailedResponse(merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)), nil } log.Debug("Detail of insert request in Proxy") diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 5824bce103..1cef315a2b 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -622,6 +622,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error { t.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError // nolint t.result.Status.Reason = fmt.Sprintf("can't find collection[database=%s][collection=%s]", t.GetDbName(), t.GetCollectionName()) + t.result.Status.ExtraInfo = map[string]string{merr.InputErrorFlagKey: "true"} } return nil } @@ -1439,7 +1440,7 @@ func (t *flushTask) Execute(ctx context.Context) error { for _, collName := range t.CollectionNames { collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collName) if err != nil { - return err + return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound) } flushReq := &datapb.FlushRequest{ Base: commonpbutil.UpdateMsgBase( diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index 8822410576..aa1973b1bf 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -259,9 +259,10 @@ func (dr *deleteRunner) Init(ctx context.Context) error { if err := validateCollectionName(collName); err != nil { return ErrWithLog(log, "Invalid collection name", err) } + dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName) if err != nil { - return ErrWithLog(log, "Failed to get collection id", err) + return ErrWithLog(log, "Failed to get collection id", merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound)) } dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName) @@ -307,11 +308,11 @@ func (dr *deleteRunner) Init(ctx context.Context) error { func (dr *deleteRunner) Run(ctx context.Context) error { plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr()) if err != nil { - return merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err) + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err)) } if planparserv2.IsAlwaysTruePlan(plan) { - return merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()) + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr())) } isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan) diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 77536e874f..b46466a4b4 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -116,13 +116,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error { if maxInsertSize != -1 && it.insertMsg.Size() > maxInsertSize { log.Warn("insert request size exceeds maxInsertSize", zap.Int("request size", it.insertMsg.Size()), zap.Int("maxInsertSize", maxInsertSize)) - return merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize") + return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize")) } schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName) if err != nil { log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err)) - return err + return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound) } it.schema = schema.CollectionSchema @@ -208,7 +208,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error { if err := newValidateUtil(withNANCheck(), withOverflowCheck(), withMaxLenCheck(), withMaxCapCheck()). Validate(it.insertMsg.GetFieldsData(), schema.CollectionSchema, it.insertMsg.NRows()); err != nil { - return err + return merr.WrapErrAsInputError(err) } log.Debug("Proxy Insert PreExecute done") diff --git a/internal/proxy/task_query.go b/internal/proxy/task_query.go index 7a91943ce0..8af0c20997 100644 --- a/internal/proxy/task_query.go +++ b/internal/proxy/task_query.go @@ -200,7 +200,7 @@ func createCntPlan(expr string, schemaHelper *typeutil.SchemaHelper) (*planpb.Pl plan, err := planparserv2.CreateRetrievePlan(schemaHelper, expr) if err != nil { - return nil, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err) + return nil, merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)) } plan.Node.(*planpb.PlanNode_Query).Query.IsCount = true @@ -223,7 +223,7 @@ func (t *queryTask) createPlan(ctx context.Context) error { if t.plan == nil { t.plan, err = planparserv2.CreateRetrievePlan(schema.schemaHelper, t.request.Expr) if err != nil { - return merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err) + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)) } } @@ -291,7 +291,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName) if err != nil { log.Warn("Failed to get collection id.", zap.String("collectionName", collectionName), zap.Error(err)) - return err + return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound) } t.CollectionID = collID log.Debug("Get collection ID by name", zap.Int64("collectionID", t.CollectionID)) @@ -302,11 +302,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error { return err } if t.partitionKeyMode && len(t.request.GetPartitionNames()) != 0 { - return errors.New("not support manually specifying the partition names if partition key mode is used") + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("not support manually specifying the partition names if partition key mode is used")) } if t.mustUsePartitionKey && !t.partitionKeyMode { - return merr.WrapErrParameterInvalidMsg("must use partition key in the query request " + - "because the mustUsePartitionKey config is true") + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the query request " + + "because the mustUsePartitionKey config is true")) } for _, tag := range t.request.PartitionNames { @@ -363,7 +363,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { t.plan.Node.(*planpb.PlanNode_Query).Query.Limit = t.RetrieveRequest.Limit if planparserv2.IsAlwaysTruePlan(t.plan) && t.RetrieveRequest.Limit == typeutil.Unlimited { - return fmt.Errorf("empty expression should be used with limit") + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("empty expression should be used with limit")) } // convert partition names only when requery is false @@ -390,7 +390,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error { // count with pagination if t.plan.GetQuery().GetIsCount() && t.queryParams.limit != typeutil.Unlimited { - return fmt.Errorf("count entities with pagination is not allowed") + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("count entities with pagination is not allowed")) } t.RetrieveRequest.IsCount = t.plan.GetQuery().GetIsCount() diff --git a/internal/proxy/task_search.go b/internal/proxy/task_search.go index 2a680e6beb..e0ba357950 100644 --- a/internal/proxy/task_search.go +++ b/internal/proxy/task_search.go @@ -114,7 +114,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error { t.collectionName = collectionName collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName) if err != nil { // err is not nil if collection not exists - return err + return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound) } t.SearchRequest.DbID = 0 // todo @@ -135,8 +135,8 @@ func (t *searchTask) PreExecute(ctx context.Context) error { return errors.New("not support manually specifying the partition names if partition key mode is used") } if t.mustUsePartitionKey && !t.partitionKeyMode { - return merr.WrapErrParameterInvalidMsg("must use partition key in the search request " + - "because the mustUsePartitionKey config is true") + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the search request " + + "because the mustUsePartitionKey config is true")) } if !t.partitionKeyMode && len(t.request.GetPartitionNames()) > 0 { diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index f44fe455c3..b8ad04b8cf 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -187,14 +187,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error { if err != nil { log.Warn("check primary field data and hash primary key failed when upsert", zap.Error(err)) - return err + return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid) } // set field ID to insert field data err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema) if err != nil { log.Warn("insert set fieldID to fieldData failed when upsert", zap.Error(err)) - return err + return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid) } if it.partitionKeyMode { diff --git a/internal/proxy/util.go b/internal/proxy/util.go index a679fd4730..61c51ec000 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -646,10 +646,10 @@ func parsePrimaryFieldData2IDs(fieldData *schemapb.FieldData) (*schemapb.IDs, er StrId: scalarField.GetStringData(), } default: - return nil, errors.New("currently only support DataType Int64 or VarChar as PrimaryField") + return nil, merr.WrapErrParameterInvalidMsg("currently only support DataType Int64 or VarChar as PrimaryField") } default: - return nil, errors.New("currently not support vector field as PrimaryField") + return nil, merr.WrapErrParameterInvalidMsg("currently not support vector field as PrimaryField") } return primaryData, nil diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index d6f39af93b..3cf4a5378d 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -26,6 +26,22 @@ const ( TimeoutCode int32 = 10001 ) +type ErrorType int32 + +const ( + SystemError ErrorType = 0 + InputError ErrorType = 1 +) + +var ErrorTypeName = map[ErrorType]string{ + SystemError: "system_error", + InputError: "input_error", +} + +func (err ErrorType) String() string { + return ErrorTypeName[err] +} + // Define leaf errors here, // WARN: take care to add new error, // check whether you can use the errors below before adding a new one. @@ -194,29 +210,40 @@ var ( ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false) ) +type errorOption func(*milvusError) + +func WithDetail(detail string) errorOption { + return func(err *milvusError) { + err.detail = detail + } +} + +func WithErrorType(etype ErrorType) errorOption { + return func(err *milvusError) { + err.errType = etype + } +} + type milvusError struct { msg string detail string retriable bool errCode int32 + errType ErrorType } -func newMilvusError(msg string, code int32, retriable bool) milvusError { - return milvusError{ +func newMilvusError(msg string, code int32, retriable bool, options ...errorOption) milvusError { + err := milvusError{ msg: msg, detail: msg, retriable: retriable, errCode: code, } -} -func newMilvusErrorWithDetail(msg string, detail string, code int32, retriable bool) milvusError { - return milvusError{ - msg: msg, - detail: detail, - retriable: retriable, - errCode: code, + for _, option := range options { + option(&err) } + return err } func (e milvusError) code() int32 { diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 203e2bb86d..9c9a123bfc 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -22,12 +22,16 @@ import ( "strings" "github.com/cockroachdb/errors" + "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" ) +const InputErrorFlagKey string = "is_input_error" + // Code returns the error code of the given error, // WARN: DO NOT use this for now func Code(err error) int32 { @@ -71,7 +75,8 @@ func Status(err error) *commonpb.Status { } code := Code(err) - return &commonpb.Status{ + + status := &commonpb.Status{ Code: code, Reason: previousLastError(err).Error(), // Deprecated, for compatibility @@ -79,6 +84,11 @@ func Status(err error) *commonpb.Status { Retriable: IsRetryableErr(err), Detail: err.Error(), } + + if GetErrorType(err) == InputError { + status.ExtraInfo = map[string]string{InputErrorFlagKey: "true"} + } + return status } func previousLastError(err error) error { @@ -233,12 +243,18 @@ func Error(status *commonpb.Status) error { return nil } + var eType ErrorType + _, ok := status.GetExtraInfo()[InputErrorFlagKey] + if ok { + eType = InputError + } + // use code first code := status.GetCode() if code == 0 { - return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), Code(OldCodeToMerr(status.GetErrorCode())), false) + return newMilvusError(status.GetReason(), Code(OldCodeToMerr(status.GetErrorCode())), false, WithDetail(status.GetDetail()), WithErrorType(eType)) } - return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), code, status.GetRetriable()) + return newMilvusError(status.GetReason(), code, status.GetRetriable(), WithDetail(status.GetDetail()), WithErrorType(eType)) } // SegcoreError returns a merr according to the given segcore error code and message @@ -293,6 +309,36 @@ func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) er return nil } +func WrapErrAsInputError(err error) error { + if merr, ok := err.(milvusError); ok { + WithErrorType(InputError)(&merr) + return merr + } + return err +} + +func WrapErrAsInputErrorWhen(err error, targets ...milvusError) error { + if merr, ok := err.(milvusError); ok { + for _, target := range targets { + if target.errCode == merr.errCode { + log.Info("mark error as input error", zap.Error(err)) + WithErrorType(InputError)(&merr) + log.Info("test--", zap.String("type", merr.errType.String())) + return merr + } + } + } + return err +} + +func GetErrorType(err error) ErrorType { + if merr, ok := err.(milvusError); ok { + return merr.errType + } + + return SystemError +} + // Service related func WrapErrServiceNotReady(role string, sessionID int64, state string, msg ...string) error { err := wrapFieldsWithDesc(ErrServiceNotReady,