mirror of https://github.com/milvus-io/milvus.git
				
				
				
			enhance: support mark error as user error (#33498)
relate: https://github.com/milvus-io/milvus/issues/33492 --------- Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/34294/head
							parent
							
								
									d6afb31b94
								
							
						
					
					
						commit
						186757e622
					
				| 
						 | 
				
			
			@ -181,24 +181,46 @@ func (i *GrpcAccessInfo) ErrorCode() string {
 | 
			
		|||
	return fmt.Sprint(merr.Code(i.err))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (i *GrpcAccessInfo) respStatus() *commonpb.Status {
 | 
			
		||||
	baseResp, ok := i.resp.(BaseResponse)
 | 
			
		||||
	if ok {
 | 
			
		||||
		return baseResp.GetStatus()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	status, ok := i.resp.(*commonpb.Status)
 | 
			
		||||
	if ok {
 | 
			
		||||
		return status
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (i *GrpcAccessInfo) ErrorMsg() string {
 | 
			
		||||
	if i.err != nil {
 | 
			
		||||
		return i.err.Error()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	baseResp, ok := i.resp.(BaseResponse)
 | 
			
		||||
	if ok {
 | 
			
		||||
		status := baseResp.GetStatus()
 | 
			
		||||
	if status := i.respStatus(); status != nil {
 | 
			
		||||
		return status.GetReason()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	status, ok := i.resp.(*commonpb.Status)
 | 
			
		||||
	if ok {
 | 
			
		||||
		return status.GetReason()
 | 
			
		||||
	}
 | 
			
		||||
	return Unknown
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (i *GrpcAccessInfo) ErrorType() string {
 | 
			
		||||
	if i.err != nil {
 | 
			
		||||
		return merr.GetErrorType(i.err).String()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if status := i.respStatus(); status.GetCode() > 0 {
 | 
			
		||||
		if _, ok := status.ExtraInfo[merr.InputErrorFlagKey]; ok {
 | 
			
		||||
			return merr.InputError.String()
 | 
			
		||||
		}
 | 
			
		||||
		return merr.SystemError.String()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return ""
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (i *GrpcAccessInfo) DbName() string {
 | 
			
		||||
	name, ok := requestutil.GetDbNameFromRequest(i.req)
 | 
			
		||||
	if !ok {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -103,6 +103,22 @@ func (s *GrpcAccessInfoSuite) TestErrorMsg() {
 | 
			
		|||
	s.Equal("rpc error: code = Unavailable desc = mock", result[0])
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *GrpcAccessInfoSuite) TestErrorType() {
 | 
			
		||||
	s.info.resp = &milvuspb.QueryResults{
 | 
			
		||||
		Status: merr.Status(nil),
 | 
			
		||||
	}
 | 
			
		||||
	result := Get(s.info, "$error_type")
 | 
			
		||||
	s.Equal("", result[0])
 | 
			
		||||
 | 
			
		||||
	s.info.resp = merr.Status(merr.WrapErrAsInputError(merr.ErrParameterInvalid))
 | 
			
		||||
	result = Get(s.info, "$error_type")
 | 
			
		||||
	s.Equal(merr.InputError.String(), result[0])
 | 
			
		||||
 | 
			
		||||
	s.info.err = merr.ErrParameterInvalid
 | 
			
		||||
	result = Get(s.info, "$error_type")
 | 
			
		||||
	s.Equal(merr.SystemError.String(), result[0])
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *GrpcAccessInfoSuite) TestDbName() {
 | 
			
		||||
	s.info.req = nil
 | 
			
		||||
	result := Get(s.info, "$database_name")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -34,6 +34,7 @@ var MetricFuncMap = map[string]getMetricFunc{
 | 
			
		|||
	"$response_size":     getResponseSize,
 | 
			
		||||
	"$error_code":        getErrorCode,
 | 
			
		||||
	"$error_msg":         getErrorMsg,
 | 
			
		||||
	"$error_type":        getErrorType,
 | 
			
		||||
	"$database_name":     getDbName,
 | 
			
		||||
	"$collection_name":   getCollectionName,
 | 
			
		||||
	"$partition_name":    getPartitionName,
 | 
			
		||||
| 
						 | 
				
			
			@ -61,6 +62,7 @@ type AccessInfo interface {
 | 
			
		|||
	ResponseSize() string
 | 
			
		||||
	ErrorCode() string
 | 
			
		||||
	ErrorMsg() string
 | 
			
		||||
	ErrorType() string
 | 
			
		||||
	DbName() string
 | 
			
		||||
	CollectionName() string
 | 
			
		||||
	PartitionName() string
 | 
			
		||||
| 
						 | 
				
			
			@ -115,6 +117,10 @@ func getErrorMsg(i AccessInfo) string {
 | 
			
		|||
	return i.ErrorMsg()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getErrorType(i AccessInfo) string {
 | 
			
		||||
	return i.ErrorType()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getDbName(i AccessInfo) string {
 | 
			
		||||
	return i.DbName()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -131,6 +131,10 @@ func (i *RestfulInfo) ErrorMsg() string {
 | 
			
		|||
	return fmt.Sprint(message)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (i *RestfulInfo) ErrorType() string {
 | 
			
		||||
	return Unknown
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (i *RestfulInfo) SdkVersion() string {
 | 
			
		||||
	return "Restful"
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2552,7 +2552,7 @@ func (node *Proxy) Insert(ctx context.Context, request *milvuspb.InsertRequest)
 | 
			
		|||
		log.Warn("Failed to enqueue insert task: " + err.Error())
 | 
			
		||||
		metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
 | 
			
		||||
			metrics.AbandonLabel, request.GetDbName(), request.GetCollectionName()).Inc()
 | 
			
		||||
		return constructFailedResponse(err), nil
 | 
			
		||||
		return constructFailedResponse(merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)), nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Debug("Detail of insert request in Proxy")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -622,6 +622,7 @@ func (t *describeCollectionTask) Execute(ctx context.Context) error {
 | 
			
		|||
			t.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
 | 
			
		||||
			// nolint
 | 
			
		||||
			t.result.Status.Reason = fmt.Sprintf("can't find collection[database=%s][collection=%s]", t.GetDbName(), t.GetCollectionName())
 | 
			
		||||
			t.result.Status.ExtraInfo = map[string]string{merr.InputErrorFlagKey: "true"}
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -1440,7 +1441,7 @@ func (t *flushTask) Execute(ctx context.Context) error {
 | 
			
		|||
	for _, collName := range t.CollectionNames {
 | 
			
		||||
		collID, err := globalMetaCache.GetCollectionID(ctx, t.GetDbName(), collName)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
			return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
 | 
			
		||||
		}
 | 
			
		||||
		flushReq := &datapb.FlushRequest{
 | 
			
		||||
			Base: commonpbutil.UpdateMsgBase(
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -264,13 +264,13 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
 | 
			
		|||
 | 
			
		||||
	db, err := globalMetaCache.GetDatabaseInfo(ctx, dr.req.GetDbName())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
		return merr.WrapErrAsInputErrorWhen(err, merr.ErrDatabaseNotFound)
 | 
			
		||||
	}
 | 
			
		||||
	dr.dbID = db.dbID
 | 
			
		||||
 | 
			
		||||
	dr.collectionID, err = globalMetaCache.GetCollectionID(ctx, dr.req.GetDbName(), collName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return ErrWithLog(log, "Failed to get collection id", err)
 | 
			
		||||
		return ErrWithLog(log, "Failed to get collection id", merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	dr.schema, err = globalMetaCache.GetCollectionSchema(ctx, dr.req.GetDbName(), collName)
 | 
			
		||||
| 
						 | 
				
			
			@ -316,11 +316,11 @@ func (dr *deleteRunner) Init(ctx context.Context) error {
 | 
			
		|||
func (dr *deleteRunner) Run(ctx context.Context) error {
 | 
			
		||||
	plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr())
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err)
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if planparserv2.IsAlwaysTruePlan(plan) {
 | 
			
		||||
		return merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr())
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr()))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -116,13 +116,13 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
	if maxInsertSize != -1 && it.insertMsg.Size() > maxInsertSize {
 | 
			
		||||
		log.Warn("insert request size exceeds maxInsertSize",
 | 
			
		||||
			zap.Int("request size", it.insertMsg.Size()), zap.Int("maxInsertSize", maxInsertSize))
 | 
			
		||||
		return merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize")
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterTooLarge("insert request size exceeds maxInsertSize"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	schema, err := globalMetaCache.GetCollectionSchema(ctx, it.insertMsg.GetDbName(), collectionName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("get collection schema from global meta cache failed", zap.String("collectionName", collectionName), zap.Error(err))
 | 
			
		||||
		return err
 | 
			
		||||
		return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
 | 
			
		||||
	}
 | 
			
		||||
	it.schema = schema.CollectionSchema
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -208,7 +208,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
 | 
			
		||||
	if err := newValidateUtil(withNANCheck(), withOverflowCheck(), withMaxLenCheck(), withMaxCapCheck()).
 | 
			
		||||
		Validate(it.insertMsg.GetFieldsData(), schema.CollectionSchema, it.insertMsg.NRows()); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
		return merr.WrapErrAsInputError(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	log.Debug("Proxy Insert PreExecute done")
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -200,7 +200,7 @@ func createCntPlan(expr string, schemaHelper *typeutil.SchemaHelper) (*planpb.Pl
 | 
			
		|||
 | 
			
		||||
	plan, err := planparserv2.CreateRetrievePlan(schemaHelper, expr)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
 | 
			
		||||
		return nil, merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	plan.Node.(*planpb.PlanNode_Query).Query.IsCount = true
 | 
			
		||||
| 
						 | 
				
			
			@ -223,7 +223,7 @@ func (t *queryTask) createPlan(ctx context.Context) error {
 | 
			
		|||
	if t.plan == nil {
 | 
			
		||||
		t.plan, err = planparserv2.CreateRetrievePlan(schema.schemaHelper, t.request.Expr)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err)
 | 
			
		||||
			return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create query plan: %v", err))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -291,7 +291,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
	collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("Failed to get collection id.", zap.String("collectionName", collectionName), zap.Error(err))
 | 
			
		||||
		return err
 | 
			
		||||
		return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
 | 
			
		||||
	}
 | 
			
		||||
	t.CollectionID = collID
 | 
			
		||||
	log.Debug("Get collection ID by name", zap.Int64("collectionID", t.CollectionID))
 | 
			
		||||
| 
						 | 
				
			
			@ -302,11 +302,11 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	if t.partitionKeyMode && len(t.request.GetPartitionNames()) != 0 {
 | 
			
		||||
		return errors.New("not support manually specifying the partition names if partition key mode is used")
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("not support manually specifying the partition names if partition key mode is used"))
 | 
			
		||||
	}
 | 
			
		||||
	if t.mustUsePartitionKey && !t.partitionKeyMode {
 | 
			
		||||
		return merr.WrapErrParameterInvalidMsg("must use partition key in the query request " +
 | 
			
		||||
			"because the mustUsePartitionKey config is true")
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the query request " +
 | 
			
		||||
			"because the mustUsePartitionKey config is true"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, tag := range t.request.PartitionNames {
 | 
			
		||||
| 
						 | 
				
			
			@ -363,7 +363,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
	t.plan.Node.(*planpb.PlanNode_Query).Query.Limit = t.RetrieveRequest.Limit
 | 
			
		||||
 | 
			
		||||
	if planparserv2.IsAlwaysTruePlan(t.plan) && t.RetrieveRequest.Limit == typeutil.Unlimited {
 | 
			
		||||
		return fmt.Errorf("empty expression should be used with limit")
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("empty expression should be used with limit"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// convert partition names only when requery is false
 | 
			
		||||
| 
						 | 
				
			
			@ -390,7 +390,7 @@ func (t *queryTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
 | 
			
		||||
	// count with pagination
 | 
			
		||||
	if t.plan.GetQuery().GetIsCount() && t.queryParams.limit != typeutil.Unlimited {
 | 
			
		||||
		return fmt.Errorf("count entities with pagination is not allowed")
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("count entities with pagination is not allowed"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t.RetrieveRequest.IsCount = t.plan.GetQuery().GetIsCount()
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -114,7 +114,7 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
	t.collectionName = collectionName
 | 
			
		||||
	collID, err := globalMetaCache.GetCollectionID(ctx, t.request.GetDbName(), collectionName)
 | 
			
		||||
	if err != nil { // err is not nil if collection not exists
 | 
			
		||||
		return err
 | 
			
		||||
		return merr.WrapErrAsInputErrorWhen(err, merr.ErrCollectionNotFound, merr.ErrDatabaseNotFound)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	t.SearchRequest.DbID = 0 // todo
 | 
			
		||||
| 
						 | 
				
			
			@ -135,8 +135,8 @@ func (t *searchTask) PreExecute(ctx context.Context) error {
 | 
			
		|||
		return errors.New("not support manually specifying the partition names if partition key mode is used")
 | 
			
		||||
	}
 | 
			
		||||
	if t.mustUsePartitionKey && !t.partitionKeyMode {
 | 
			
		||||
		return merr.WrapErrParameterInvalidMsg("must use partition key in the search request " +
 | 
			
		||||
			"because the mustUsePartitionKey config is true")
 | 
			
		||||
		return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("must use partition key in the search request " +
 | 
			
		||||
			"because the mustUsePartitionKey config is true"))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !t.partitionKeyMode && len(t.request.GetPartitionNames()) > 0 {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -187,14 +187,14 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		log.Warn("check primary field data and hash primary key failed when upsert",
 | 
			
		||||
			zap.Error(err))
 | 
			
		||||
		return err
 | 
			
		||||
		return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
 | 
			
		||||
	}
 | 
			
		||||
	// set field ID to insert field data
 | 
			
		||||
	err = fillFieldIDBySchema(it.upsertMsg.InsertMsg.GetFieldsData(), it.schema.CollectionSchema)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Warn("insert set fieldID to fieldData failed when upsert",
 | 
			
		||||
			zap.Error(err))
 | 
			
		||||
		return err
 | 
			
		||||
		return merr.WrapErrAsInputErrorWhen(err, merr.ErrParameterInvalid)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if it.partitionKeyMode {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -646,10 +646,10 @@ func parsePrimaryFieldData2IDs(fieldData *schemapb.FieldData) (*schemapb.IDs, er
 | 
			
		|||
				StrId: scalarField.GetStringData(),
 | 
			
		||||
			}
 | 
			
		||||
		default:
 | 
			
		||||
			return nil, errors.New("currently only support DataType Int64 or VarChar as PrimaryField")
 | 
			
		||||
			return nil, merr.WrapErrParameterInvalidMsg("currently only support DataType Int64 or VarChar as PrimaryField")
 | 
			
		||||
		}
 | 
			
		||||
	default:
 | 
			
		||||
		return nil, errors.New("currently not support vector field as PrimaryField")
 | 
			
		||||
		return nil, merr.WrapErrParameterInvalidMsg("currently not support vector field as PrimaryField")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return primaryData, nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -26,6 +26,22 @@ const (
 | 
			
		|||
	TimeoutCode  int32 = 10001
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type ErrorType int32
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	SystemError ErrorType = 0
 | 
			
		||||
	InputError  ErrorType = 1
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var ErrorTypeName = map[ErrorType]string{
 | 
			
		||||
	SystemError: "system_error",
 | 
			
		||||
	InputError:  "input_error",
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (err ErrorType) String() string {
 | 
			
		||||
	return ErrorTypeName[err]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Define leaf errors here,
 | 
			
		||||
// WARN: take care to add new error,
 | 
			
		||||
// check whether you can use the errors below before adding a new one.
 | 
			
		||||
| 
						 | 
				
			
			@ -196,29 +212,40 @@ var (
 | 
			
		|||
	ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false)
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type errorOption func(*milvusError)
 | 
			
		||||
 | 
			
		||||
func WithDetail(detail string) errorOption {
 | 
			
		||||
	return func(err *milvusError) {
 | 
			
		||||
		err.detail = detail
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func WithErrorType(etype ErrorType) errorOption {
 | 
			
		||||
	return func(err *milvusError) {
 | 
			
		||||
		err.errType = etype
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type milvusError struct {
 | 
			
		||||
	msg       string
 | 
			
		||||
	detail    string
 | 
			
		||||
	retriable bool
 | 
			
		||||
	errCode   int32
 | 
			
		||||
	errType   ErrorType
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newMilvusError(msg string, code int32, retriable bool) milvusError {
 | 
			
		||||
	return milvusError{
 | 
			
		||||
func newMilvusError(msg string, code int32, retriable bool, options ...errorOption) milvusError {
 | 
			
		||||
	err := milvusError{
 | 
			
		||||
		msg:       msg,
 | 
			
		||||
		detail:    msg,
 | 
			
		||||
		retriable: retriable,
 | 
			
		||||
		errCode:   code,
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func newMilvusErrorWithDetail(msg string, detail string, code int32, retriable bool) milvusError {
 | 
			
		||||
	return milvusError{
 | 
			
		||||
		msg:       msg,
 | 
			
		||||
		detail:    detail,
 | 
			
		||||
		retriable: retriable,
 | 
			
		||||
		errCode:   code,
 | 
			
		||||
	for _, option := range options {
 | 
			
		||||
		option(&err)
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (e milvusError) code() int32 {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -22,12 +22,16 @@ import (
 | 
			
		|||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"github.com/cockroachdb/errors"
 | 
			
		||||
	"go.uber.org/zap"
 | 
			
		||||
 | 
			
		||||
	"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
 | 
			
		||||
	"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/log"
 | 
			
		||||
	"github.com/milvus-io/milvus/pkg/util/paramtable"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const InputErrorFlagKey string = "is_input_error"
 | 
			
		||||
 | 
			
		||||
// Code returns the error code of the given error,
 | 
			
		||||
// WARN: DO NOT use this for now
 | 
			
		||||
func Code(err error) int32 {
 | 
			
		||||
| 
						 | 
				
			
			@ -71,7 +75,8 @@ func Status(err error) *commonpb.Status {
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	code := Code(err)
 | 
			
		||||
	return &commonpb.Status{
 | 
			
		||||
 | 
			
		||||
	status := &commonpb.Status{
 | 
			
		||||
		Code:   code,
 | 
			
		||||
		Reason: previousLastError(err).Error(),
 | 
			
		||||
		// Deprecated, for compatibility
 | 
			
		||||
| 
						 | 
				
			
			@ -79,6 +84,11 @@ func Status(err error) *commonpb.Status {
 | 
			
		|||
		Retriable: IsRetryableErr(err),
 | 
			
		||||
		Detail:    err.Error(),
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if GetErrorType(err) == InputError {
 | 
			
		||||
		status.ExtraInfo = map[string]string{InputErrorFlagKey: "true"}
 | 
			
		||||
	}
 | 
			
		||||
	return status
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func previousLastError(err error) error {
 | 
			
		||||
| 
						 | 
				
			
			@ -233,12 +243,18 @@ func Error(status *commonpb.Status) error {
 | 
			
		|||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var eType ErrorType
 | 
			
		||||
	_, ok := status.GetExtraInfo()[InputErrorFlagKey]
 | 
			
		||||
	if ok {
 | 
			
		||||
		eType = InputError
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// use code first
 | 
			
		||||
	code := status.GetCode()
 | 
			
		||||
	if code == 0 {
 | 
			
		||||
		return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), Code(OldCodeToMerr(status.GetErrorCode())), false)
 | 
			
		||||
		return newMilvusError(status.GetReason(), Code(OldCodeToMerr(status.GetErrorCode())), false, WithDetail(status.GetDetail()), WithErrorType(eType))
 | 
			
		||||
	}
 | 
			
		||||
	return newMilvusErrorWithDetail(status.GetReason(), status.GetDetail(), code, status.GetRetriable())
 | 
			
		||||
	return newMilvusError(status.GetReason(), code, status.GetRetriable(), WithDetail(status.GetDetail()), WithErrorType(eType))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SegcoreError returns a merr according to the given segcore error code and message
 | 
			
		||||
| 
						 | 
				
			
			@ -293,6 +309,36 @@ func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) er
 | 
			
		|||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func WrapErrAsInputError(err error) error {
 | 
			
		||||
	if merr, ok := err.(milvusError); ok {
 | 
			
		||||
		WithErrorType(InputError)(&merr)
 | 
			
		||||
		return merr
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func WrapErrAsInputErrorWhen(err error, targets ...milvusError) error {
 | 
			
		||||
	if merr, ok := err.(milvusError); ok {
 | 
			
		||||
		for _, target := range targets {
 | 
			
		||||
			if target.errCode == merr.errCode {
 | 
			
		||||
				log.Info("mark error as input error", zap.Error(err))
 | 
			
		||||
				WithErrorType(InputError)(&merr)
 | 
			
		||||
				log.Info("test--", zap.String("type", merr.errType.String()))
 | 
			
		||||
				return merr
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func GetErrorType(err error) ErrorType {
 | 
			
		||||
	if merr, ok := err.(milvusError); ok {
 | 
			
		||||
		return merr.errType
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return SystemError
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Service related
 | 
			
		||||
func WrapErrServiceNotReady(role string, sessionID int64, state string, msg ...string) error {
 | 
			
		||||
	err := wrapFieldsWithDesc(ErrServiceNotReady,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue