mirror of https://github.com/milvus-io/milvus.git
Refine RootCoord services error handle (#22689)
Signed-off-by: yah01 <yang.cen@zilliz.com>pull/22714/head
parent
be33911205
commit
e1cb307690
|
@ -60,6 +60,7 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/util/errorutil"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/logutil"
|
||||
"github.com/milvus-io/milvus/internal/util/merr"
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
|
@ -713,10 +714,7 @@ func (c *Core) GetComponentStates(ctx context.Context) (*milvuspb.ComponentState
|
|||
StateCode: code,
|
||||
ExtraInfo: nil,
|
||||
},
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Status: merr.Status(nil),
|
||||
SubcomponentStates: []*milvuspb.ComponentInfo{
|
||||
{
|
||||
NodeID: nodeID,
|
||||
|
@ -731,29 +729,23 @@ func (c *Core) GetComponentStates(ctx context.Context) (*milvuspb.ComponentState
|
|||
// GetTimeTickChannel get timetick channel name
|
||||
func (c *Core) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: Params.CommonCfg.RootCoordTimeTick.GetValue(),
|
||||
Status: merr.Status(nil),
|
||||
Value: Params.CommonCfg.RootCoordTimeTick.GetValue(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetStatisticsChannel get statistics channel name
|
||||
func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
},
|
||||
Value: Params.CommonCfg.RootCoordStatistics.GetValue(),
|
||||
Status: merr.Status(nil),
|
||||
Value: Params.CommonCfg.RootCoordStatistics.GetValue(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// CreateCollection create collection
|
||||
func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.TotalLabel).Inc()
|
||||
|
@ -779,7 +771,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
|
|||
zap.String("name", in.GetCollectionName()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
|
@ -790,7 +782,7 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateCollection", metrics.SuccessLabel).Inc()
|
||||
|
@ -801,13 +793,13 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti
|
|||
zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("name", in.GetCollectionName()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// DropCollection drop collection
|
||||
func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.TotalLabel).Inc()
|
||||
|
@ -831,7 +823,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
|
|||
zap.String("name", in.GetCollectionName()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
|
@ -841,7 +833,7 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropCollection", metrics.SuccessLabel).Inc()
|
||||
|
@ -851,15 +843,14 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe
|
|||
log.Ctx(ctx).Info("done to drop collection", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("name", in.GetCollectionName()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// HasCollection check collection existence
|
||||
func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Value: false,
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -882,8 +873,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
|
|||
log.Warn("failed to enqueue request to has collection", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()),
|
||||
Value: false,
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -891,8 +881,7 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ
|
|||
log.Warn("failed to has collection", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("HasCollection", metrics.FailLabel).Inc()
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasCollection failed: "+err.Error()),
|
||||
Value: false,
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -913,7 +902,7 @@ func (c *Core) describeCollection(ctx context.Context, in *milvuspb.DescribeColl
|
|||
}
|
||||
|
||||
func convertModelToDesc(collInfo *model.Collection, aliases []string) *milvuspb.DescribeCollectionResponse {
|
||||
resp := &milvuspb.DescribeCollectionResponse{Status: succStatus()}
|
||||
resp := &milvuspb.DescribeCollectionResponse{Status: merr.Status(nil)}
|
||||
|
||||
resp.Schema = &schemapb.CollectionSchema{
|
||||
Name: collInfo.Name,
|
||||
|
@ -943,7 +932,7 @@ func convertModelToDesc(collInfo *model.Collection, aliases []string) *milvuspb.
|
|||
func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.DescribeCollectionRequest, allowUnavailable bool) (*milvuspb.DescribeCollectionResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode"+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -963,7 +952,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe
|
|||
t := &describeCollectionTask{
|
||||
baseTask: newBaseTask(ctx, c),
|
||||
Req: in,
|
||||
Rsp: &milvuspb.DescribeCollectionResponse{},
|
||||
Rsp: &milvuspb.DescribeCollectionResponse{Status: merr.Status(nil)},
|
||||
allowUnavailable: allowUnavailable,
|
||||
}
|
||||
|
||||
|
@ -972,7 +961,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
// TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now.
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
|
||||
Status: merr.Status(err),
|
||||
// Status: common.StatusFromError(err),
|
||||
}, nil
|
||||
}
|
||||
|
@ -982,7 +971,7 @@ func (c *Core) describeCollectionImpl(ctx context.Context, in *milvuspb.Describe
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues("DescribeCollection", metrics.FailLabel).Inc()
|
||||
return &milvuspb.DescribeCollectionResponse{
|
||||
// TODO: use commonpb.ErrorCode_CollectionNotExists. SDK use commonpb.ErrorCode_UnexpectedError now.
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
|
||||
Status: merr.Status(err),
|
||||
// Status: common.StatusFromError(err),
|
||||
}, nil
|
||||
}
|
||||
|
@ -1014,7 +1003,7 @@ func (c *Core) DescribeCollectionInternal(ctx context.Context, in *milvuspb.Desc
|
|||
func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.ShowCollectionsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1037,7 +1026,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
|
|||
log.Warn("failed to enqueue request to show collections", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
|
||||
return &milvuspb.ShowCollectionsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1045,7 +1034,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
|
|||
log.Warn("failed to show collections", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowCollections", metrics.FailLabel).Inc()
|
||||
return &milvuspb.ShowCollectionsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1059,7 +1048,7 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections
|
|||
|
||||
func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollectionRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.TotalLabel).Inc()
|
||||
|
@ -1085,7 +1074,7 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
|
|||
zap.String("name", in.GetCollectionName()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
|
@ -1096,7 +1085,7 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterCollection", metrics.SuccessLabel).Inc()
|
||||
|
@ -1107,13 +1096,13 @@ func (c *Core) AlterCollection(ctx context.Context, in *milvuspb.AlterCollection
|
|||
zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("name", in.GetCollectionName()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// CreatePartition create partition
|
||||
func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.TotalLabel).Inc()
|
||||
|
@ -1141,7 +1130,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
|
|||
zap.String("partition", in.GetPartitionName()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
|
@ -1153,7 +1142,7 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreatePartition", metrics.SuccessLabel).Inc()
|
||||
|
@ -1164,13 +1153,13 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition
|
|||
zap.String("collection", in.GetCollectionName()),
|
||||
zap.String("partition", in.GetPartitionName()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// DropPartition drop partition
|
||||
func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.TotalLabel).Inc()
|
||||
|
@ -1198,7 +1187,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
|
|||
zap.String("partition", in.GetPartitionName()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
log.Ctx(ctx).Error("failed to drop partition",
|
||||
|
@ -1209,7 +1198,7 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropPartition", metrics.SuccessLabel).Inc()
|
||||
|
@ -1220,15 +1209,14 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ
|
|||
zap.String("collection", in.GetCollectionName()),
|
||||
zap.String("partition", in.GetPartitionName()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// HasPartition check partition existence
|
||||
func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Value: false,
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1253,8 +1241,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
|
|||
log.Warn("failed to enqueue request to has partition", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()),
|
||||
Value: false,
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1262,8 +1249,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
|
|||
log.Warn("failed to has partition", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("HasPartition", metrics.FailLabel).Inc()
|
||||
return &milvuspb.BoolResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()),
|
||||
Value: false,
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1278,7 +1264,7 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
|
|||
func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitionsRequest, allowUnavailable bool) (*milvuspb.ShowPartitionsResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.ShowPartitionsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1300,7 +1286,7 @@ func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitio
|
|||
log.Warn("failed to enqueue request to show partitions", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
|
||||
return &milvuspb.ShowPartitionsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()),
|
||||
Status: merr.Status(err),
|
||||
// Status: common.StatusFromError(err),
|
||||
}, nil
|
||||
}
|
||||
|
@ -1309,7 +1295,7 @@ func (c *Core) showPartitionsImpl(ctx context.Context, in *milvuspb.ShowPartitio
|
|||
log.Warn("failed to show partitions", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("ShowPartitions", metrics.FailLabel).Inc()
|
||||
return &milvuspb.ShowPartitionsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()),
|
||||
Status: merr.Status(err),
|
||||
// Status: common.StatusFromError(err),
|
||||
}, nil
|
||||
}
|
||||
|
@ -1336,14 +1322,14 @@ func (c *Core) ShowPartitionsInternal(ctx context.Context, in *milvuspb.ShowPart
|
|||
func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) {
|
||||
// ShowSegments Only used in GetPersistentSegmentInfo, it's already deprecated for a long time.
|
||||
// Though we continue to keep current logic, it's not right enough since RootCoord only contains indexed segments.
|
||||
return &milvuspb.ShowSegmentsResponse{Status: succStatus()}, nil
|
||||
return &milvuspb.ShowSegmentsResponse{Status: merr.Status(nil)}, nil
|
||||
}
|
||||
|
||||
// AllocTimestamp alloc timestamp
|
||||
func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &rootcoordpb.AllocTimestampResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1353,7 +1339,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam
|
|||
zap.Error(err))
|
||||
|
||||
return &rootcoordpb.AllocTimestampResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocTimestamp failed: "+err.Error()),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1361,7 +1347,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam
|
|||
ts = ts - uint64(in.GetCount()) + 1
|
||||
metrics.RootCoordTimestamp.Set(float64(ts))
|
||||
return &rootcoordpb.AllocTimestampResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
Timestamp: ts,
|
||||
Count: in.GetCount(),
|
||||
}, nil
|
||||
|
@ -1371,7 +1357,7 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam
|
|||
func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &rootcoordpb.AllocIDResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
start, _, err := c.idAllocator.Alloc(in.Count)
|
||||
|
@ -1381,14 +1367,14 @@ func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*ro
|
|||
zap.Error(err))
|
||||
|
||||
return &rootcoordpb.AllocIDResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocID failed: "+err.Error()),
|
||||
Status: merr.Status(err),
|
||||
Count: in.Count,
|
||||
}, nil
|
||||
}
|
||||
|
||||
metrics.RootCoordIDAllocCounter.Add(float64(in.Count))
|
||||
return &rootcoordpb.AllocIDResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
ID: start,
|
||||
Count: in.Count,
|
||||
}, nil
|
||||
|
@ -1399,40 +1385,39 @@ func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.Channel
|
|||
log := log.Ctx(ctx)
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
log.Warn("failed to updateTimeTick because rootcoord is not healthy", zap.Any("state", code))
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
if in.Base.MsgType != commonpb.MsgType_TimeTick {
|
||||
log.Warn("failed to updateTimeTick because base messasge is not timetick, state", zap.Any("base message type", in.Base.MsgType))
|
||||
msgTypeName := commonpb.MsgType_name[int32(in.Base.GetMsgType())]
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "invalid message type "+msgTypeName), nil
|
||||
return merr.Status(merr.WrapErrParameterInvalid(commonpb.MsgType_TimeTick.String(), in.Base.MsgType.String(), "invalid message type")), nil
|
||||
}
|
||||
err := c.chanTimeTick.updateTimeTick(in, "gRPC")
|
||||
if err != nil {
|
||||
log.Warn("failed to updateTimeTick",
|
||||
zap.String("role", typeutil.RootCoordRole),
|
||||
zap.Error(err))
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "UpdateTimeTick failed: "+err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// InvalidateCollectionMetaCache notifies RootCoord to release the collection cache in Proxies.
|
||||
func (c *Core) InvalidateCollectionMetaCache(ctx context.Context, in *proxypb.InvalidateCollMetaCacheRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
err := c.proxyClientManager.InvalidateCollectionMetaCache(ctx, in)
|
||||
if err != nil {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// ShowConfigurations returns the configurations of RootCoord matching req.Pattern
|
||||
func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &internalpb.ShowConfigurationsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
Configuations: nil,
|
||||
}, nil
|
||||
}
|
||||
|
@ -1459,7 +1444,7 @@ func (c *Core) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfi
|
|||
func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
|
@ -1469,7 +1454,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
|
|||
log.Warn("ParseMetricType failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.Int64("nodeID", c.session.ServerID), zap.String("req", in.Request), zap.Error(err))
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ParseMetricType failed: "+err.Error()),
|
||||
Status: merr.Status(err),
|
||||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
|
@ -1486,7 +1471,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
|
|||
zap.String("metricType", metricType),
|
||||
zap.Error(err))
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, fmt.Sprintf("getSystemInfoMetrics failed: %s", err.Error())),
|
||||
Status: merr.Status(err),
|
||||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
|
@ -1499,7 +1484,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
|
|||
zap.String("metricType", metricType))
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, metricsinfo.MsgUnimplementedMetric),
|
||||
Status: merr.Status(merr.WrapErrMetricNotFound(metricType)),
|
||||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
|
@ -1507,7 +1492,7 @@ func (c *Core) GetMetrics(ctx context.Context, in *milvuspb.GetMetricsRequest) (
|
|||
// CreateAlias create collection alias
|
||||
func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.TotalLabel).Inc()
|
||||
|
@ -1535,7 +1520,7 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
|
|||
zap.String("collection", in.GetCollectionName()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
|
@ -1547,7 +1532,7 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("CreateAlias", metrics.SuccessLabel).Inc()
|
||||
|
@ -1558,13 +1543,13 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest)
|
|||
zap.String("alias", in.GetAlias()),
|
||||
zap.String("collection", in.GetCollectionName()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// DropAlias drop collection alias
|
||||
func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
|
||||
|
@ -1590,7 +1575,7 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
|
|||
zap.String("alias", in.GetAlias()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
|
@ -1601,7 +1586,7 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.SuccessLabel).Inc()
|
||||
|
@ -1611,13 +1596,13 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c
|
|||
zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("alias", in.GetAlias()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// AlterAlias alter collection alias
|
||||
func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("DropAlias", metrics.TotalLabel).Inc()
|
||||
|
@ -1645,7 +1630,7 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
|
|||
zap.String("collection", in.GetCollectionName()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
|
@ -1657,7 +1642,7 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
|
|||
zap.Uint64("ts", t.GetTs()))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("AlterAlias", metrics.SuccessLabel).Inc()
|
||||
|
@ -1668,14 +1653,14 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (
|
|||
zap.String("alias", in.GetAlias()),
|
||||
zap.String("collection", in.GetCollectionName()),
|
||||
zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// Import imports large files (json, numpy, etc.) on MinIO/S3 storage into Milvus storage.
|
||||
func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvuspb.ImportResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.ImportResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1716,7 +1701,7 @@ func (c *Core) Import(ctx context.Context, req *milvuspb.ImportRequest) (*milvus
|
|||
func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateRequest) (*milvuspb.GetImportStateResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.GetImportStateResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
return c.importManager.getTaskState(req.GetTask()), nil
|
||||
|
@ -1726,7 +1711,7 @@ func (c *Core) GetImportState(ctx context.Context, req *milvuspb.GetImportStateR
|
|||
func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTasksRequest) (*milvuspb.ListImportTasksResponse, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return &milvuspb.ListImportTasksResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]),
|
||||
Status: merr.Status(merr.WrapErrServiceNotReady(code.String())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1739,8 +1724,10 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask
|
|||
if err != nil {
|
||||
err = fmt.Errorf("failed to find collection ID from its name: '%s', error: %w", req.GetCollectionName(), err)
|
||||
log.Error("ListImportTasks failed", zap.Error(err))
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_IllegalCollectionName
|
||||
return &milvuspb.ListImportTasksResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_IllegalCollectionName, err.Error()),
|
||||
Status: status,
|
||||
}, nil
|
||||
}
|
||||
colID = colInfo.CollectionID
|
||||
|
@ -1752,15 +1739,13 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask
|
|||
err = fmt.Errorf("failed to list import tasks, collection name: '%s', error: %w", req.GetCollectionName(), err)
|
||||
log.Error("ListImportTasks failed", zap.Error(err))
|
||||
return &milvuspb.ListImportTasksResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
resp := &milvuspb.ListImportTasksResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
},
|
||||
Tasks: tasks,
|
||||
Status: merr.Status(nil),
|
||||
Tasks: tasks,
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
@ -1771,7 +1756,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
|
|||
zap.Int64("task ID", ir.GetTaskId()),
|
||||
zap.Any("import state", ir.GetState()))
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
// This method update a busy node to idle node, and send import task to idle node
|
||||
|
@ -1801,6 +1786,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
|
|||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UpdateImportTaskFailure,
|
||||
Reason: err.Error(),
|
||||
Code: merr.Code(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -1823,10 +1809,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
|
|||
if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil {
|
||||
log.Error("failed to call Flush on bulk insert segments",
|
||||
zap.Int64("task ID", ir.GetTaskId()))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1879,7 +1862,10 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden
|
|||
log.Error("CreateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username), zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_CreateCredentialFailure, "CreateCredential failed: "+err.Error()), nil
|
||||
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_CreateCredentialFailure
|
||||
return status, nil
|
||||
}
|
||||
// update proxy's local cache
|
||||
err = c.UpdateCredCache(ctx, credInfo)
|
||||
|
@ -1894,7 +1880,7 @@ func (c *Core) CreateCredential(ctx context.Context, credInfo *internalpb.Creden
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfCredentials.Inc()
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// GetCredential get credential by username
|
||||
|
@ -1910,8 +1896,11 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR
|
|||
log.Error("GetCredential query credential failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", in.Username), zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
|
||||
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_GetCredentialFailure
|
||||
return &rootcoordpb.GetCredentialResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_GetCredentialFailure, "GetCredential failed: "+err.Error()),
|
||||
Status: status,
|
||||
}, err
|
||||
}
|
||||
log.Debug("GetCredential success", zap.String("role", typeutil.RootCoordRole),
|
||||
|
@ -1920,7 +1909,7 @@ func (c *Core) GetCredential(ctx context.Context, in *rootcoordpb.GetCredentialR
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return &rootcoordpb.GetCredentialResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
Username: credInfo.Username,
|
||||
Password: credInfo.EncryptedPassword,
|
||||
}, nil
|
||||
|
@ -1939,7 +1928,10 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden
|
|||
log.Error("UpdateCredential save credential failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username), zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil
|
||||
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_UpdateCredentialFailure
|
||||
return status, nil
|
||||
}
|
||||
// update proxy's local cache
|
||||
err = c.UpdateCredCache(ctx, credInfo)
|
||||
|
@ -1947,14 +1939,17 @@ func (c *Core) UpdateCredential(ctx context.Context, credInfo *internalpb.Creden
|
|||
log.Error("UpdateCredential update cache failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username), zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UpdateCredentialFailure, "UpdateCredential failed: "+err.Error()), nil
|
||||
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_UpdateCredentialFailure
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("UpdateCredential success", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", credInfo.Username))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// DeleteCredential delete a user
|
||||
|
@ -1969,7 +1964,10 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti
|
|||
log.Error("DeleteCredential remove credential failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", in.Username), zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), err
|
||||
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_DeleteCredentialFailure
|
||||
return status, nil
|
||||
}
|
||||
// invalidate proxy's local cache
|
||||
err = c.ExpireCredCache(ctx, in.Username)
|
||||
|
@ -1977,7 +1975,10 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti
|
|||
log.Error("DeleteCredential expire credential cache failed", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", in.Username), zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_DeleteCredentialFailure, "DeleteCredential failed: "+err.Error()), nil
|
||||
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_DeleteCredentialFailure
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("DeleteCredential success", zap.String("role", typeutil.RootCoordRole),
|
||||
zap.String("username", in.Username))
|
||||
|
@ -1985,7 +1986,7 @@ func (c *Core) DeleteCredential(ctx context.Context, in *milvuspb.DeleteCredenti
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfCredentials.Dec()
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// ListCredUsers list all usernames
|
||||
|
@ -2000,16 +2001,17 @@ func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequ
|
|||
zap.String("role", typeutil.RootCoordRole),
|
||||
zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
|
||||
return &milvuspb.ListCredUsersResponse{
|
||||
Status: failStatus(commonpb.ErrorCode_ListCredUsersFailure, "ListCredUsers failed: "+err.Error()),
|
||||
}, err
|
||||
|
||||
status := merr.Status(err)
|
||||
status.ErrorCode = commonpb.ErrorCode_ListCredUsersFailure
|
||||
return &milvuspb.ListCredUsersResponse{Status: status}, nil
|
||||
}
|
||||
log.Debug("ListCredUsers success", zap.String("role", typeutil.RootCoordRole))
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return &milvuspb.ListCredUsersResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
Usernames: credInfo.Usernames,
|
||||
}, nil
|
||||
}
|
||||
|
@ -2042,7 +2044,7 @@ func (c *Core) CreateRole(ctx context.Context, in *milvuspb.CreateRoleRequest) (
|
|||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfRoles.Inc()
|
||||
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// DropRole drop role
|
||||
|
@ -2111,7 +2113,7 @@ func (c *Core) DropRole(ctx context.Context, in *milvuspb.DropRoleRequest) (*com
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
metrics.RootCoordNumOfRoles.Dec()
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// OperateUserRole operate the relationship between a user and a role
|
||||
|
@ -2175,7 +2177,7 @@ func (c *Core) OperateUserRole(ctx context.Context, in *milvuspb.OperateUserRole
|
|||
logger.Debug(method + " success")
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// SelectRole select role
|
||||
|
@ -2196,7 +2198,7 @@ func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (
|
|||
if _, err := c.meta.SelectRole(util.DefaultTenant, &milvuspb.RoleEntity{Name: in.Role.Name}, false); err != nil {
|
||||
if common.IsKeyNotExistError(err) {
|
||||
return &milvuspb.SelectRoleResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
}, nil
|
||||
}
|
||||
errMsg := "fail to select the role to check the role name"
|
||||
|
@ -2219,7 +2221,7 @@ func (c *Core) SelectRole(ctx context.Context, in *milvuspb.SelectRoleRequest) (
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return &milvuspb.SelectRoleResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
Results: roleResults,
|
||||
}, nil
|
||||
}
|
||||
|
@ -2242,7 +2244,7 @@ func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (
|
|||
if _, err := c.meta.SelectUser(util.DefaultTenant, &milvuspb.UserEntity{Name: in.User.Name}, false); err != nil {
|
||||
if common.IsKeyNotExistError(err) {
|
||||
return &milvuspb.SelectUserResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
}, nil
|
||||
}
|
||||
errMsg := "fail to select the user to check the username"
|
||||
|
@ -2265,7 +2267,7 @@ func (c *Core) SelectUser(ctx context.Context, in *milvuspb.SelectUserRequest) (
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return &milvuspb.SelectUserResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
Results: userResults,
|
||||
}, nil
|
||||
}
|
||||
|
@ -2411,7 +2413,7 @@ func (c *Core) OperatePrivilege(ctx context.Context, in *milvuspb.OperatePrivile
|
|||
logger.Debug(method + " success")
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
// SelectGrant select grant
|
||||
|
@ -2455,7 +2457,7 @@ func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest)
|
|||
grantEntities, err := c.meta.SelectGrant(util.DefaultTenant, in.Entity)
|
||||
if common.IsKeyNotExistError(err) {
|
||||
return &milvuspb.SelectGrantResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
}, nil
|
||||
}
|
||||
if err != nil {
|
||||
|
@ -2470,7 +2472,7 @@ func (c *Core) SelectGrant(ctx context.Context, in *milvuspb.SelectGrantRequest)
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return &milvuspb.SelectGrantResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
Entities: grantEntities,
|
||||
}, nil
|
||||
}
|
||||
|
@ -2508,7 +2510,7 @@ func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest)
|
|||
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
return &internalpb.ListPolicyResponse{
|
||||
Status: succStatus(),
|
||||
Status: merr.Status(nil),
|
||||
PolicyInfos: policies,
|
||||
UserRoles: userRoles,
|
||||
}, nil
|
||||
|
@ -2516,7 +2518,7 @@ func (c *Core) ListPolicy(ctx context.Context, in *internalpb.ListPolicyRequest)
|
|||
|
||||
func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollectionRequest) (*commonpb.Status, error) {
|
||||
if code, ok := c.checkHealthy(); !ok {
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+commonpb.StateCode_name[int32(code)]), nil
|
||||
return merr.Status(merr.WrapErrServiceNotReady(code.String())), nil
|
||||
}
|
||||
|
||||
log := log.Ctx(ctx).With(zap.String("oldCollectionName", req.GetOldName()), zap.String("newCollectionName", req.GetNewName()))
|
||||
|
@ -2536,20 +2538,20 @@ func (c *Core) RenameCollection(ctx context.Context, req *milvuspb.RenameCollect
|
|||
if err := c.scheduler.AddTask(t); err != nil {
|
||||
log.Warn("failed to enqueue request to rename collection", zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
if err := t.WaitToFinish(); err != nil {
|
||||
log.Warn("failed to rename collection", zap.Uint64("ts", t.GetTs()), zap.Error(err))
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.FailLabel).Inc()
|
||||
return failStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
metrics.RootCoordDDLReqCounter.WithLabelValues("RenameCollection", metrics.SuccessLabel).Inc()
|
||||
metrics.RootCoordDDLReqLatency.WithLabelValues("RenameCollection").Observe(float64(tr.ElapseSpan().Milliseconds()))
|
||||
|
||||
log.Info("done to rename collection", zap.Uint64("ts", t.GetTs()))
|
||||
return succStatus(), nil
|
||||
return merr.Status(nil), nil
|
||||
}
|
||||
|
||||
func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
|
||||
|
|
|
@ -91,6 +91,9 @@ var (
|
|||
// Parameter related
|
||||
ErrParameterInvalid = newMilvusError("invalid parameter", 1100, false)
|
||||
|
||||
// Metrics related
|
||||
ErrMetricNotFound = newMilvusError("MetricNotFound", 1200, false)
|
||||
|
||||
// Do NOT export this,
|
||||
// never allow programmer using this, keep only for converting unknown error to milvusError
|
||||
errUnexpected = newMilvusError("unexpected error", (1<<16)-1, false)
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/util/paramtable"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
@ -53,7 +54,7 @@ func (s *ErrSuite) TestStatus() {
|
|||
|
||||
s.ErrorIs(err, restoredErr)
|
||||
s.Equal(int32(0), Status(nil).Code)
|
||||
s.Nil(Error(successStatus))
|
||||
s.Nil(Error(&commonpb.Status{}))
|
||||
}
|
||||
|
||||
func (s *ErrSuite) TestWrap() {
|
||||
|
@ -100,6 +101,9 @@ func (s *ErrSuite) TestWrap() {
|
|||
// Parameter related
|
||||
s.ErrorIs(WrapErrParameterInvalid(8, 1, "failed to create"), ErrParameterInvalid)
|
||||
s.ErrorIs(WrapErrParameterInvalidRange(1, 1<<16, 0, "topk should be in range"), ErrParameterInvalid)
|
||||
|
||||
// Metrics related
|
||||
s.ErrorIs(WrapErrMetricNotFound("unknown", "failed to get metric"), ErrMetricNotFound)
|
||||
}
|
||||
|
||||
func (s *ErrSuite) TestCombine() {
|
||||
|
|
|
@ -26,8 +26,13 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/commonpb"
|
||||
)
|
||||
|
||||
// Declare a success status, avoid create it every time
|
||||
var successStatus = &commonpb.Status{}
|
||||
var (
|
||||
// For compatibility
|
||||
oldErrCodes = map[int32]commonpb.ErrorCode{
|
||||
ErrServiceNotReady.code(): commonpb.ErrorCode_NotReadyServe,
|
||||
ErrCollectionNotFound.code(): commonpb.ErrorCode_CollectionNotExists,
|
||||
}
|
||||
)
|
||||
|
||||
// Code returns the error code of the given error,
|
||||
// WARN: DO NOT use this for now
|
||||
|
@ -60,7 +65,7 @@ func IsRetriable(err error) bool {
|
|||
// returns Success status if err is nil
|
||||
func Status(err error) *commonpb.Status {
|
||||
if err == nil {
|
||||
return successStatus
|
||||
return &commonpb.Status{}
|
||||
}
|
||||
|
||||
return &commonpb.Status{
|
||||
|
@ -274,6 +279,15 @@ func WrapErrParameterInvalidRange[T any](lower, upper, actual T, msg ...string)
|
|||
return err
|
||||
}
|
||||
|
||||
// Metrics related
|
||||
func WrapErrMetricNotFound(name string, msg ...string) error {
|
||||
err := errors.Wrapf(ErrMetricNotFound, "metric=%s", name)
|
||||
if len(msg) > 0 {
|
||||
err = errors.Wrap(err, strings.Join(msg, "; "))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func wrapWithField(err error, name string, value any) error {
|
||||
return errors.Wrapf(err, "%s=%v", name, value)
|
||||
}
|
||||
|
|
|
@ -191,6 +191,10 @@ func (gp *BaseTable) Get(key string) string {
|
|||
|
||||
// GetWithDefault loads an object with @key. If the object does not exist, @defaultValue will be returned.
|
||||
func (gp *BaseTable) GetWithDefault(key, defaultValue string) string {
|
||||
if gp.mgr == nil {
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
str, err := gp.mgr.GetConfig(key)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
|
|
Loading…
Reference in New Issue