Refine DataCoord status (#27262)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/27373/head
yah01 2023-09-26 17:15:27 +08:00 committed by GitHub
parent e02670eae9
commit 6539a5ae2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 542 additions and 818 deletions

View File

@ -74,7 +74,10 @@ linters-settings:
forbidigo: forbidigo:
forbid: forbid:
- '^time\.Tick$' - '^time\.Tick$'
- '^return merr.Err[a-zA-Z]+$' - 'return merr\.Err[a-zA-Z]+'
- 'merr\.Wrap\w+\(\)\.Error\(\)'
- '\.(ErrorCode|Reason) = '
- 'Reason:\s+\w+\.Error\(\)'
#- 'fmt\.Print.*' WIP #- 'fmt\.Print.*' WIP
issues: issues:

View File

@ -22,12 +22,6 @@ import (
"github.com/cockroachdb/errors" "github.com/cockroachdb/errors"
) )
// errNilKvClient stands for a nil kv client is detected when initialized
var errNilKvClient = errors.New("kv client not initialized")
// serverNotServingErrMsg used for Status Reason when DataCoord is not healthy
const serverNotServingErrMsg = "DataCoord is not serving"
// errors for VerifyResponse // errors for VerifyResponse
var ( var (
errNilResponse = errors.New("response is nil") errNilResponse = errors.New("response is nil")

View File

@ -148,41 +148,34 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
log.Info("receive CreateIndex request", log.Info("receive CreateIndex request",
zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()), zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()),
zap.Any("TypeParams", req.GetTypeParams()), zap.Any("TypeParams", req.GetTypeParams()),
zap.Any("IndexParams", req.GetIndexParams())) zap.Any("IndexParams", req.GetIndexParams()),
errResp := &commonpb.Status{ )
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "", if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
} log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
if s.isClosed() { return merr.Status(err), nil
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()))
errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA
errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return errResp, nil
} }
metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc()
indexID, err := s.meta.CanCreateIndex(req) indexID, err := s.meta.CanCreateIndex(req)
if err != nil { if err != nil {
log.Error("CreateIndex failed", zap.Error(err))
errResp.Reason = err.Error()
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return errResp, nil return merr.Status(err), nil
} }
if indexID == 0 { if indexID == 0 {
indexID, err = s.allocator.allocID(ctx) indexID, err = s.allocator.allocID(ctx)
if err != nil { if err != nil {
log.Warn("failed to alloc indexID", zap.Error(err)) log.Warn("failed to alloc indexID", zap.Error(err))
errResp.Reason = "failed to alloc indexID"
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return errResp, nil return merr.Status(err), nil
} }
if getIndexType(req.GetIndexParams()) == diskAnnIndex && !s.indexNodeManager.ClientSupportDisk() { if getIndexType(req.GetIndexParams()) == diskAnnIndex && !s.indexNodeManager.ClientSupportDisk() {
errMsg := "all IndexNodes do not support disk indexes, please verify" errMsg := "all IndexNodes do not support disk indexes, please verify"
log.Warn(errMsg) log.Warn(errMsg)
errResp.Reason = errMsg err = merr.WrapErrIndexNotSupported(diskAnnIndex)
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return errResp, nil return merr.Status(err), nil
} }
} }
@ -204,9 +197,8 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
if err != nil { if err != nil {
log.Error("CreateIndex fail", log.Error("CreateIndex fail",
zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err)) zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err))
errResp.Reason = err.Error()
metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc()
return errResp, nil return merr.Status(err), nil
} }
select { select {
@ -217,9 +209,8 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
log.Info("CreateIndex successfully", log.Info("CreateIndex successfully",
zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()), zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()),
zap.Int64("IndexID", indexID)) zap.Int64("IndexID", indexID))
errResp.ErrorCode = commonpb.ErrorCode_Success
metrics.IndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc() metrics.IndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc()
return errResp, nil return merr.Status(nil), nil
} }
// GetIndexState gets the index state of the index name in the request from Proxy. // GetIndexState gets the index state of the index name in the request from Proxy.
@ -231,35 +222,27 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe
log.Info("receive GetIndexState request", log.Info("receive GetIndexState request",
zap.String("indexName", req.GetIndexName())) zap.String("indexName", req.GetIndexName()))
errResp := &commonpb.Status{ if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
ErrorCode: commonpb.ErrorCode_UnexpectedError, log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
Reason: "",
}
if s.isClosed() {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()))
errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA
errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return &indexpb.GetIndexStateResponse{ return &indexpb.GetIndexStateResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 { if len(indexes) == 0 {
errResp.ErrorCode = commonpb.ErrorCode_IndexNotExist err := merr.WrapErrIndexNotFound(req.GetIndexName())
errResp.Reason = fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) log.Warn("GetIndexState fail",
log.Error("GetIndexState fail", zap.String("indexName", req.GetIndexName()), zap.Error(err))
zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errResp.Reason))
return &indexpb.GetIndexStateResponse{ return &indexpb.GetIndexStateResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
if len(indexes) > 1 { if len(indexes) > 1 {
log.Warn(msgAmbiguousIndexName()) log.Warn(msgAmbiguousIndexName())
errResp.ErrorCode = commonpb.ErrorCode_UnexpectedError err := merr.WrapErrIndexDuplicate(req.GetIndexName())
errResp.Reason = msgAmbiguousIndexName()
return &indexpb.GetIndexStateResponse{ return &indexpb.GetIndexStateResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
ret := &indexpb.GetIndexStateResponse{ ret := &indexpb.GetIndexStateResponse{
@ -289,17 +272,14 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collectionID", req.GetCollectionID()),
) )
log.Info("receive GetSegmentIndexState", log.Info("receive GetSegmentIndexState",
zap.String("IndexName", req.GetIndexName()), zap.Int64s("fieldID", req.GetSegmentIDs())) zap.String("IndexName", req.GetIndexName()),
errResp := &commonpb.Status{ zap.Int64s("fieldID", req.GetSegmentIDs()),
ErrorCode: commonpb.ErrorCode_UnexpectedError, )
Reason: "",
} if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
if s.isClosed() { log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()))
errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA
errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return &indexpb.GetSegmentIndexStateResponse{ return &indexpb.GetSegmentIndexStateResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
@ -309,13 +289,10 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme
} }
indexID2CreateTs := s.meta.GetIndexIDByName(req.GetCollectionID(), req.GetIndexName()) indexID2CreateTs := s.meta.GetIndexIDByName(req.GetCollectionID(), req.GetIndexName())
if len(indexID2CreateTs) == 0 { if len(indexID2CreateTs) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.GetIndexName()) err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetSegmentIndexState fail", zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errMsg)) log.Warn("GetSegmentIndexState fail", zap.String("indexName", req.GetIndexName()), zap.Error(err))
return &indexpb.GetSegmentIndexStateResponse{ return &indexpb.GetSegmentIndexStateResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_IndexNotExist,
Reason: errMsg,
},
}, nil }, nil
} }
for _, segID := range req.GetSegmentIDs() { for _, segID := range req.GetSegmentIDs() {
@ -464,37 +441,28 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collectionID", req.GetCollectionID()),
) )
log.Info("receive GetIndexBuildProgress request", zap.String("indexName", req.GetIndexName())) log.Info("receive GetIndexBuildProgress request", zap.String("indexName", req.GetIndexName()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError, if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
Reason: "", log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
}
if s.isClosed() {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()))
errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA
errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return &indexpb.GetIndexBuildProgressResponse{ return &indexpb.GetIndexBuildProgressResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 { if len(indexes) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetIndexBuildProgress fail", zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg)) log.Warn("GetIndexBuildProgress fail", zap.String("indexName", req.IndexName), zap.Error(err))
return &indexpb.GetIndexBuildProgressResponse{ return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_IndexNotExist,
Reason: errMsg,
},
}, nil }, nil
} }
if len(indexes) > 1 { if len(indexes) > 1 {
log.Warn(msgAmbiguousIndexName()) log.Warn(msgAmbiguousIndexName())
errResp.ErrorCode = commonpb.ErrorCode_UnexpectedError err := merr.WrapErrIndexDuplicate(req.GetIndexName())
errResp.Reason = msgAmbiguousIndexName()
return &indexpb.GetIndexBuildProgressResponse{ return &indexpb.GetIndexBuildProgressResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
indexInfo := &indexpb.IndexInfo{ indexInfo := &indexpb.IndexInfo{
@ -525,28 +493,20 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe
) )
log.Info("receive DescribeIndex request", zap.String("indexName", req.GetIndexName()), log.Info("receive DescribeIndex request", zap.String("indexName", req.GetIndexName()),
zap.Uint64("timestamp", req.GetTimestamp())) zap.Uint64("timestamp", req.GetTimestamp()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError, if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
Reason: "", log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
}
if s.isClosed() {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()))
errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA
errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return &indexpb.DescribeIndexResponse{ return &indexpb.DescribeIndexResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 { if len(indexes) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("DescribeIndex fail", zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errMsg)) log.Warn("DescribeIndex fail", zap.String("indexName", req.GetIndexName()), zap.Error(err))
return &indexpb.DescribeIndexResponse{ return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_IndexNotExist,
Reason: fmt.Sprint("index doesn't exist, collectionID ", req.GetCollectionID()),
},
}, nil }, nil
} }
@ -590,24 +550,21 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collectionID", req.GetCollectionID()),
) )
log.Info("receive GetIndexStatistics request", zap.String("indexName", req.GetIndexName())) log.Info("receive GetIndexStatistics request", zap.String("indexName", req.GetIndexName()))
if s.isClosed() { if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
log.Warn(msgDataCoordIsUnhealthy(s.serverID())) log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
return &indexpb.GetIndexStatisticsResponse{ return &indexpb.GetIndexStatisticsResponse{
Status: s.UnhealthyStatus(), Status: merr.Status(err),
}, nil }, nil
} }
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 { if len(indexes) == 0 {
errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) err := merr.WrapErrIndexNotFound(req.GetIndexName())
log.Warn("GetIndexStatistics fail", log.Warn("GetIndexStatistics fail",
zap.String("indexName", req.GetIndexName()), zap.String("indexName", req.GetIndexName()),
zap.String("fail reason", errMsg)) zap.Error(err))
return &indexpb.GetIndexStatisticsResponse{ return &indexpb.GetIndexStatisticsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_IndexNotExist,
Reason: fmt.Sprint("index doesn't exist, collectionID ", req.GetCollectionID()),
},
}, nil }, nil
} }
@ -652,30 +609,22 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (
log.Info("receive DropIndex request", log.Info("receive DropIndex request",
zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.String("indexName", req.GetIndexName()), zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.String("indexName", req.GetIndexName()),
zap.Bool("drop all indexes", req.GetDropAll())) zap.Bool("drop all indexes", req.GetDropAll()))
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "",
}
if s.isClosed() {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()))
errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA
errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return errResp, nil
}
ret := merr.Status(nil) if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
}
indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName())
if len(indexes) == 0 { if len(indexes) == 0 {
log.Info(fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)) log.Info(fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName))
return ret, nil return merr.Status(nil), nil
} }
if !req.GetDropAll() && len(indexes) > 1 { if !req.GetDropAll() && len(indexes) > 1 {
log.Warn(msgAmbiguousIndexName()) log.Warn(msgAmbiguousIndexName())
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError err := merr.WrapErrIndexDuplicate(req.GetIndexName())
ret.Reason = msgAmbiguousIndexName() return merr.Status(err), nil
return ret, nil
} }
indexIDs := make([]UniqueID, 0) indexIDs := make([]UniqueID, 0)
for _, index := range indexes { for _, index := range indexes {
@ -686,15 +635,13 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (
err := s.meta.MarkIndexAsDeleted(req.GetCollectionID(), indexIDs) err := s.meta.MarkIndexAsDeleted(req.GetCollectionID(), indexIDs)
if err != nil { if err != nil {
log.Warn("DropIndex fail", zap.String("indexName", req.IndexName), zap.Error(err)) log.Warn("DropIndex fail", zap.String("indexName", req.IndexName), zap.Error(err))
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError return merr.Status(err), nil
ret.Reason = err.Error()
return ret, nil
} }
} }
log.Debug("DropIndex success", zap.Int64s("partitionIDs", req.GetPartitionIDs()), log.Debug("DropIndex success", zap.Int64s("partitionIDs", req.GetPartitionIDs()),
zap.String("indexName", req.GetIndexName()), zap.Int64s("indexIDs", indexIDs)) zap.String("indexName", req.GetIndexName()), zap.Int64s("indexIDs", indexIDs))
return ret, nil return merr.Status(nil), nil
} }
// GetIndexInfos gets the index file paths for segment from DataCoord. // GetIndexInfos gets the index file paths for segment from DataCoord.
@ -702,16 +649,11 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq
log := log.Ctx(ctx).With( log := log.Ctx(ctx).With(
zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("collectionID", req.GetCollectionID()),
) )
errResp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError, if err := merr.CheckHealthy(s.GetStateCode()); err != nil {
Reason: "", log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err))
}
if s.isClosed() {
log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()))
errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA
errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID())
return &indexpb.GetIndexInfoResponse{ return &indexpb.GetIndexInfoResponse{
Status: errResp, Status: merr.Status(err),
}, nil }, nil
} }
ret := &indexpb.GetIndexInfoResponse{ ret := &indexpb.GetIndexInfoResponse{

View File

@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
) )
func TestServerId(t *testing.T) { func TestServerId(t *testing.T) {
@ -100,7 +101,7 @@ func TestServer_CreateIndex(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Abnormal) s.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := s.CreateIndex(ctx, req) resp, err := s.CreateIndex(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetErrorCode()) assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
}) })
t.Run("index not consistent", func(t *testing.T) { t.Run("index not consistent", func(t *testing.T) {
@ -192,7 +193,7 @@ func TestServer_GetIndexState(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing) s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.GetIndexState(ctx, req) resp, err := s.GetIndexState(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
}) })
s.stateCode.Store(commonpb.StateCode_Healthy) s.stateCode.Store(commonpb.StateCode_Healthy)
@ -384,7 +385,7 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Abnormal) s.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := s.GetSegmentIndexState(ctx, req) resp, err := s.GetSegmentIndexState(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
}) })
t.Run("no indexes", func(t *testing.T) { t.Run("no indexes", func(t *testing.T) {
@ -518,7 +519,7 @@ func TestServer_GetIndexBuildProgress(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing) s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.GetIndexBuildProgress(ctx, req) resp, err := s.GetIndexBuildProgress(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
}) })
t.Run("no indexes", func(t *testing.T) { t.Run("no indexes", func(t *testing.T) {
@ -998,7 +999,7 @@ func TestServer_DescribeIndex(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing) s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.DescribeIndex(ctx, req) resp, err := s.DescribeIndex(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
}) })
s.stateCode.Store(commonpb.StateCode_Healthy) s.stateCode.Store(commonpb.StateCode_Healthy)
@ -1442,7 +1443,7 @@ func TestServer_DropIndex(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing) s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.DropIndex(ctx, req) resp, err := s.DropIndex(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetErrorCode()) assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
}) })
s.stateCode.Store(commonpb.StateCode_Healthy) s.stateCode.Store(commonpb.StateCode_Healthy)
@ -1602,7 +1603,7 @@ func TestServer_GetIndexInfos(t *testing.T) {
s.stateCode.Store(commonpb.StateCode_Initializing) s.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := s.GetIndexInfos(ctx, req) resp, err := s.GetIndexInfos(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
}) })
s.stateCode.Store(commonpb.StateCode_Healthy) s.stateCode.Store(commonpb.StateCode_Healthy)

View File

@ -25,7 +25,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/mocks"
"github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/indexpb"
@ -57,47 +56,32 @@ func TestIndexNodeManager_PeekClient(t *testing.T) {
return ic return ic
} }
err := errors.New("error")
t.Run("multiple unavailable IndexNode", func(t *testing.T) { t.Run("multiple unavailable IndexNode", func(t *testing.T) {
nm := &IndexNodeManager{ nm := &IndexNodeManager{
ctx: context.TODO(), ctx: context.TODO(),
nodeClients: map[UniqueID]types.IndexNodeClient{ nodeClients: map[UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError, }, err),
},
}, errors.New("error")),
2: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 2: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError, }, err),
},
}, errors.New("error")),
3: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 3: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError, }, err),
},
}, errors.New("error")),
4: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 4: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError, }, err),
},
}, errors.New("error")),
5: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 5: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
}, nil), }, nil),
6: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 6: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
}, nil), }, nil),
7: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 7: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
}, nil), }, nil),
8: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 8: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
TaskSlots: 1, TaskSlots: 1,
@ -123,6 +107,8 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
return ic return ic
} }
err := errors.New("error")
t.Run("support", func(t *testing.T) { t.Run("support", func(t *testing.T) {
nm := &IndexNodeManager{ nm := &IndexNodeManager{
ctx: context.Background(), ctx: context.Background(),
@ -175,7 +161,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
ctx: context.Background(), ctx: context.Background(),
lock: sync.RWMutex{}, lock: sync.RWMutex{},
nodeClients: map[UniqueID]types.IndexNodeClient{ nodeClients: map[UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(nil, errors.New("error")), 1: getMockedGetJobStatsClient(nil, err),
}, },
} }
@ -189,10 +175,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) {
lock: sync.RWMutex{}, lock: sync.RWMutex{},
nodeClients: map[UniqueID]types.IndexNodeClient{ nodeClients: map[UniqueID]types.IndexNodeClient{
1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "fail reason",
},
TaskSlots: 0, TaskSlots: 0,
JobInfos: nil, JobInfos: nil,
EnableDisk: false, EnableDisk: false,

View File

@ -27,6 +27,7 @@ import (
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil" "github.com/milvus-io/milvus/pkg/util/typeutil"
@ -88,20 +89,16 @@ func (s *Server) getSystemInfoMetrics(
} }
resp := &milvuspb.GetMetricsResponse{ resp := &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{ Status: merr.Status(nil),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()),
} }
var err error var err error
resp.Response, err = metricsinfo.MarshalTopology(coordTopology) resp.Response, err = metricsinfo.MarshalTopology(coordTopology)
if err != nil { if err != nil {
resp.Status.Reason = err.Error() resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
resp.Status.ErrorCode = commonpb.ErrorCode_Success
return resp, nil return resp, nil
} }

View File

@ -24,7 +24,6 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
@ -97,13 +96,11 @@ func TestGetDataNodeMetrics(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.True(t, info.HasError) assert.True(t, info.HasError)
mockErr := errors.New("mocked error")
// mock status not success // mock status not success
mockFailClientCreator = getMockFailedClientCreator(func() (*milvuspb.GetMetricsResponse, error) { mockFailClientCreator = getMockFailedClientCreator(func() (*milvuspb.GetMetricsResponse, error) {
return &milvuspb.GetMetricsResponse{ return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{ Status: merr.Status(mockErr),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "mocked error",
},
}, nil }, nil
}) })
@ -143,14 +140,11 @@ func TestGetIndexNodeMetrics(t *testing.T) {
assert.True(t, info.HasError) assert.True(t, info.HasError)
// failed // failed
mockErr := errors.New("mocked error")
info, err = svr.getIndexNodeMetrics(ctx, req, &mockMetricIndexNodeClient{ info, err = svr.getIndexNodeMetrics(ctx, req, &mockMetricIndexNodeClient{
mock: func() (*milvuspb.GetMetricsResponse, error) { mock: func() (*milvuspb.GetMetricsResponse, error) {
return &milvuspb.GetMetricsResponse{ return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{ Status: merr.Status(mockErr),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "mock fail",
},
Response: "",
ComponentName: "indexnode100", ComponentName: "indexnode100",
}, nil }, nil
}, },
@ -187,11 +181,7 @@ func TestGetIndexNodeMetrics(t *testing.T) {
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil { if err != nil {
return &milvuspb.GetMetricsResponse{ return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{ Status: merr.Status(err),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, nodeID), ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, nodeID),
}, nil }, nil
} }

View File

@ -186,8 +186,7 @@ func TestAssignSegmentID(t *testing.T) {
SegmentIDRequests: []*datapb.SegmentIDRequest{req}, SegmentIDRequests: []*datapb.SegmentIDRequest{req},
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
t.Run("assign segment with invalid collection", func(t *testing.T) { t.Run("assign segment with invalid collection", func(t *testing.T) {
@ -327,8 +326,7 @@ func TestFlush(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.Flush(context.Background(), req) resp, err := svr.Flush(context.Background(), req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -437,8 +435,7 @@ func TestGetSegmentStates(t *testing.T) {
SegmentIDs: []int64{0}, SegmentIDs: []int64{0},
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -503,7 +500,7 @@ func TestGetInsertBinlogPaths(t *testing.T) {
} }
resp, err := svr.GetInsertBinlogPaths(svr.ctx, req) resp, err := svr.GetInsertBinlogPaths(svr.ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrSegmentNotFound)
}) })
t.Run("with closed server", func(t *testing.T) { t.Run("with closed server", func(t *testing.T) {
@ -513,8 +510,7 @@ func TestGetInsertBinlogPaths(t *testing.T) {
SegmentID: 0, SegmentID: 0,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -537,8 +533,7 @@ func TestGetCollectionStatistics(t *testing.T) {
CollectionID: 0, CollectionID: 0,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -560,8 +555,7 @@ func TestGetPartitionStatistics(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.GetPartitionStatistics(context.Background(), &datapb.GetPartitionStatisticsRequest{}) resp, err := svr.GetPartitionStatistics(context.Background(), &datapb.GetPartitionStatisticsRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -623,7 +617,7 @@ func TestGetSegmentInfo(t *testing.T) {
} }
resp, err := svr.GetSegmentInfo(svr.ctx, req) resp, err := svr.GetSegmentInfo(svr.ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrSegmentNotFound)
}) })
t.Run("with closed server", func(t *testing.T) { t.Run("with closed server", func(t *testing.T) {
svr := newTestServer(t, nil) svr := newTestServer(t, nil)
@ -632,8 +626,7 @@ func TestGetSegmentInfo(t *testing.T) {
SegmentIDs: []int64{}, SegmentIDs: []int64{},
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
t.Run("with dropped segment", func(t *testing.T) { t.Run("with dropped segment", func(t *testing.T) {
svr := newTestServer(t, nil) svr := newTestServer(t, nil)
@ -826,8 +819,7 @@ func TestGetFlushedSegments(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.GetFlushedSegments(context.Background(), &datapb.GetFlushedSegmentsRequest{}) resp, err := svr.GetFlushedSegments(context.Background(), &datapb.GetFlushedSegmentsRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
}) })
} }
@ -931,8 +923,7 @@ func TestGetSegmentsByStates(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.GetSegmentsByStates(context.Background(), &datapb.GetSegmentsByStatesRequest{}) resp, err := svr.GetSegmentsByStates(context.Background(), &datapb.GetSegmentsByStatesRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
}) })
} }
@ -1137,7 +1128,7 @@ func TestServer_ShowConfigurations(t *testing.T) {
svr.stateCode.Store(commonpb.StateCode_Initializing) svr.stateCode.Store(commonpb.StateCode_Initializing)
resp, err := svr.ShowConfigurations(svr.ctx, req) resp, err := svr.ShowConfigurations(svr.ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
// normal case // normal case
svr.stateCode.Store(stateSave) svr.stateCode.Store(stateSave)
@ -1486,7 +1477,7 @@ func TestSaveBinlogPaths(t *testing.T) {
Flushed: false, Flushed: false,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_SegmentNotFound) assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound)
}) })
t.Run("SaveNotExistSegment", func(t *testing.T) { t.Run("SaveNotExistSegment", func(t *testing.T) {
@ -1540,7 +1531,7 @@ func TestSaveBinlogPaths(t *testing.T) {
Flushed: false, Flushed: false,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_SegmentNotFound) assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound)
}) })
t.Run("with channel not matched", func(t *testing.T) { t.Run("with channel not matched", func(t *testing.T) {
@ -1562,7 +1553,7 @@ func TestSaveBinlogPaths(t *testing.T) {
Channel: "test", Channel: "test",
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetErrorCode()) assert.ErrorIs(t, merr.Error(resp), merr.ErrChannelNotFound)
}) })
t.Run("with closed server", func(t *testing.T) { t.Run("with closed server", func(t *testing.T) {
@ -1570,8 +1561,7 @@ func TestSaveBinlogPaths(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{}) resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetReason())
}) })
/* /*
t.Run("test save dropped segment and remove channel", func(t *testing.T) { t.Run("test save dropped segment and remove channel", func(t *testing.T) {
@ -1760,7 +1750,7 @@ func TestDropVirtualChannel(t *testing.T) {
ChannelName: "ch2", ChannelName: "ch2",
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrChannelNotFound)
}) })
t.Run("with closed server", func(t *testing.T) { t.Run("with closed server", func(t *testing.T) {
@ -1768,8 +1758,7 @@ func TestDropVirtualChannel(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{}) resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -2426,13 +2415,13 @@ func TestShouldDropChannel(t *testing.T) {
} }
myRoot := &myRootCoord{} myRoot := &myRootCoord{}
myRoot.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{ myRoot.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0),
Count: 1, Count: 1,
}, nil) }, nil)
myRoot.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{ myRoot.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
ID: int64(tsoutil.ComposeTSByTime(time.Now(), 0)), ID: int64(tsoutil.ComposeTSByTime(time.Now(), 0)),
Count: 1, Count: 1,
}, nil) }, nil)
@ -2479,7 +2468,7 @@ func TestShouldDropChannel(t *testing.T) {
t.Run("channel name not in kv, collection exist", func(t *testing.T) { t.Run("channel name not in kv, collection exist", func(t *testing.T) {
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything). myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{ Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
CollectionID: 0, CollectionID: 0,
}, nil).Once() }, nil).Once()
assert.False(t, svr.handler.CheckShouldDropChannel("ch99", 0)) assert.False(t, svr.handler.CheckShouldDropChannel("ch99", 0))
@ -2488,7 +2477,7 @@ func TestShouldDropChannel(t *testing.T) {
t.Run("collection name in kv, collection exist", func(t *testing.T) { t.Run("collection name in kv, collection exist", func(t *testing.T) {
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything). myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{ Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
CollectionID: 0, CollectionID: 0,
}, nil).Once() }, nil).Once()
assert.False(t, svr.handler.CheckShouldDropChannel("ch1", 0)) assert.False(t, svr.handler.CheckShouldDropChannel("ch1", 0))
@ -2508,7 +2497,7 @@ func TestShouldDropChannel(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything). myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{ Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
CollectionID: 0, CollectionID: 0,
}, nil).Once() }, nil).Once()
assert.True(t, svr.handler.CheckShouldDropChannel("ch1", 0)) assert.True(t, svr.handler.CheckShouldDropChannel("ch1", 0))
@ -3011,8 +3000,7 @@ func TestGetRecoveryInfo(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.GetRecoveryInfo(context.TODO(), &datapb.GetRecoveryInfoRequest{}) resp, err := svr.GetRecoveryInfo(context.TODO(), &datapb.GetRecoveryInfoRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -3077,8 +3065,7 @@ func TestGetCompactionState(t *testing.T) {
resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{}) resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.GetStatus().GetReason())
}) })
} }
@ -3139,8 +3126,7 @@ func TestManualCompaction(t *testing.T) {
Timetravel: 1, Timetravel: 1,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.GetStatus().GetReason())
}) })
} }
@ -3190,8 +3176,7 @@ func TestGetCompactionStateWithPlans(t *testing.T) {
CompactionID: 1, CompactionID: 1,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.GetStatus().GetReason())
}) })
} }
@ -3347,7 +3332,7 @@ type rootCoordSegFlushComplete struct {
// SegmentFlushCompleted, override default behavior // SegmentFlushCompleted, override default behavior
func (rc *rootCoordSegFlushComplete) SegmentFlushCompleted(ctx context.Context, req *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { func (rc *rootCoordSegFlushComplete) SegmentFlushCompleted(ctx context.Context, req *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) {
if rc.flag { if rc.flag {
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil return merr.Status(nil), nil
} }
return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil
} }
@ -3424,7 +3409,7 @@ func TestGetFlushState(t *testing.T) {
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
Flushed: true, Flushed: true,
}, resp) }, resp)
}) })
@ -3472,7 +3457,7 @@ func TestGetFlushState(t *testing.T) {
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
Flushed: false, Flushed: false,
}, resp) }, resp)
}) })
@ -3520,7 +3505,7 @@ func TestGetFlushState(t *testing.T) {
resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}})
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
Flushed: true, Flushed: true,
}, resp) }, resp)
}) })
@ -3556,7 +3541,7 @@ func TestGetFlushState(t *testing.T) {
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
Flushed: true, Flushed: true,
}, resp) }, resp)
}) })
@ -3592,7 +3577,7 @@ func TestGetFlushState(t *testing.T) {
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
Flushed: false, Flushed: false,
}, resp) }, resp)
}) })
@ -3611,7 +3596,7 @@ func TestGetFlushState(t *testing.T) {
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ assert.EqualValues(t, &milvuspb.GetFlushStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
Flushed: true, Flushed: true,
}, resp) }, resp)
}) })
@ -3680,7 +3665,7 @@ func TestGetFlushAllState(t *testing.T) {
svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{ Return(&milvuspb.ListDatabasesResponse{
DbNames: []string{"db1"}, DbNames: []string{"db1"},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
}, nil).Maybe() }, nil).Maybe()
} }
@ -3692,7 +3677,7 @@ func TestGetFlushAllState(t *testing.T) {
} else { } else {
svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything).
Return(&milvuspb.ShowCollectionsResponse{ Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
CollectionIds: []int64{collection}, CollectionIds: []int64{collection},
}, nil).Maybe() }, nil).Maybe()
} }
@ -3705,7 +3690,7 @@ func TestGetFlushAllState(t *testing.T) {
} else { } else {
svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{ Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
VirtualChannelNames: vchannels, VirtualChannelNames: vchannels,
}, nil).Maybe() }, nil).Maybe()
} }
@ -3723,8 +3708,10 @@ func TestGetFlushAllState(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
if test.ExpectedSuccess { if test.ExpectedSuccess {
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
} else { } else if test.ServerIsHealthy {
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
} else {
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
} }
assert.Equal(t, test.ExpectedFlushed, resp.GetFlushed()) assert.Equal(t, test.ExpectedFlushed, resp.GetFlushed())
}) })
@ -3760,25 +3747,25 @@ func TestGetFlushAllStateWithDB(t *testing.T) {
svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{ Return(&milvuspb.ListDatabasesResponse{
DbNames: []string{dbName}, DbNames: []string{dbName},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
}, nil).Maybe() }, nil).Maybe()
} else { } else {
svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything).
Return(&milvuspb.ListDatabasesResponse{ Return(&milvuspb.ListDatabasesResponse{
DbNames: []string{}, DbNames: []string{},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
}, nil).Maybe() }, nil).Maybe()
} }
svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything).
Return(&milvuspb.ShowCollectionsResponse{ Return(&milvuspb.ShowCollectionsResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
CollectionIds: []int64{collectionID}, CollectionIds: []int64{collectionID},
}, nil).Maybe() }, nil).Maybe()
svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything).
Return(&milvuspb.DescribeCollectionResponse{ Return(&milvuspb.DescribeCollectionResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, Status: merr.Status(nil),
VirtualChannelNames: vchannels, VirtualChannelNames: vchannels,
CollectionID: collectionID, CollectionID: collectionID,
CollectionName: collectionName, CollectionName: collectionName,
@ -3882,8 +3869,7 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) {
NewState: commonpb.SegmentState_Flushed, NewState: commonpb.SegmentState_Flushed,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason())
}) })
} }
@ -3958,8 +3944,7 @@ func TestDataCoord_Import(t *testing.T) {
}, },
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.Status.GetReason())
}) })
t.Run("test update segment stat", func(t *testing.T) { t.Run("test update segment stat", func(t *testing.T) {
@ -3987,7 +3972,7 @@ func TestDataCoord_Import(t *testing.T) {
}}, }},
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) assert.ErrorIs(t, merr.Error(status), merr.ErrServiceNotReady)
}) })
} }
@ -4119,7 +4104,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) {
status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{}) status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_DataCoordNA, status.GetErrorCode()) assert.ErrorIs(t, merr.Error(status), merr.ErrServiceNotReady)
}) })
} }

File diff suppressed because it is too large Load Diff

View File

@ -17,6 +17,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metautil"
) )
@ -586,7 +587,7 @@ func TestGetRecoveryInfoV2(t *testing.T) {
closeTestServer(t, svr) closeTestServer(t, svr)
resp, err := svr.GetRecoveryInfoV2(context.TODO(), &datapb.GetRecoveryInfoRequestV2{}) resp, err := svr.GetRecoveryInfoV2(context.TODO(), &datapb.GetRecoveryInfoRequestV2{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) err = merr.Error(resp.GetStatus())
assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) assert.ErrorIs(t, err, merr.ErrServiceNotReady)
}) })
} }

View File

@ -22,7 +22,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/cockroachdb/errors"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -30,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
) )
// Response response interface for verification // Response response interface for verification
@ -51,32 +51,16 @@ func VerifyResponse(response interface{}, err error) error {
if resp.GetStatus() == nil { if resp.GetStatus() == nil {
return errNilStatusResponse return errNilStatusResponse
} }
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { return merr.Error(resp.GetStatus())
return errors.New(resp.GetStatus().GetReason())
}
case *commonpb.Status: case *commonpb.Status:
if resp == nil { if resp == nil {
return errNilResponse return errNilResponse
} }
if resp.ErrorCode != commonpb.ErrorCode_Success { return merr.Error(resp)
return errors.New(resp.GetReason())
}
default: default:
return errUnknownResponseType return errUnknownResponseType
} }
return nil
}
// failResponse sets status to failed with unexpected error and reason.
func failResponse(status *commonpb.Status, reason string) {
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = reason
}
// failResponseWithCode sets status to failed with error code and reason.
func failResponseWithCode(status *commonpb.Status, errCode commonpb.ErrorCode, reason string) {
status.ErrorCode = errCode
status.Reason = reason
} }
func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo) []*SegmentInfo { func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo) []*SegmentInfo {

View File

@ -110,7 +110,7 @@ func (suite *UtilSuite) TestVerifyResponse() {
for _, c := range cases { for _, c := range cases {
r := VerifyResponse(c.resp, c.err) r := VerifyResponse(c.resp, c.err)
if c.equalValue { if c.equalValue {
suite.EqualValues(c.expected.Error(), r.Error()) suite.Contains(r.Error(), c.expected.Error())
} else { } else {
suite.Equal(c.expected, r) suite.Equal(c.expected, r)
} }

View File

@ -59,10 +59,7 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha
log.Warn("DataNode WatchDmChannels is not in use") log.Warn("DataNode WatchDmChannels is not in use")
// TODO ERROR OF GRPC NOT IN USE // TODO ERROR OF GRPC NOT IN USE
return &commonpb.Status{ return merr.Status(nil), nil
ErrorCode: commonpb.ErrorCode_Success,
Reason: "watchDmChannels do nothing",
}, nil
} }
// GetComponentStates will return current state of DataNode // GetComponentStates will return current state of DataNode

View File

@ -3,9 +3,10 @@ package utils
import ( import (
"time" "time"
"google.golang.org/grpc"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
"google.golang.org/grpc"
) )
func GracefulStopGRPCServer(s *grpc.Server) { func GracefulStopGRPCServer(s *grpc.Server) {

View File

@ -3,8 +3,9 @@ package utils
import ( import (
"testing" "testing"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"google.golang.org/grpc" "google.golang.org/grpc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
) )
func TestGracefulStopGrpcServer(t *testing.T) { func TestGracefulStopGrpcServer(t *testing.T) {

View File

@ -116,8 +116,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil { if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", req.GetBuildID()), log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", req.GetBuildID()),
zap.String("clusterID", req.GetClusterID()), zap.Error(err)) zap.String("clusterID", req.GetClusterID()), zap.Error(err))
ret.ErrorCode = commonpb.ErrorCode_UnexpectedError ret = merr.Status(err)
ret.Reason = err.Error()
metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc() metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc()
return ret, nil return ret, nil
} }

View File

@ -594,7 +594,7 @@ func (kc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collecti
} }
} }
return nil, merr.WrapErrCollectionNotFoundWithDB(dbID, collectionName, fmt.Sprintf("timestample = %d", ts)) return nil, merr.WrapErrCollectionNotFoundWithDB(dbID, collectionName, fmt.Sprintf("timestamp = %d", ts))
} }
func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) { func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) {

View File

@ -2338,22 +2338,6 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
chTicker: node.chTicker, chTicker: node.chTicker,
} }
constructFailedResponse := func(err error, errCode commonpb.ErrorCode) *milvuspb.MutationResult {
numRows := request.NumRows
errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ {
errIndex[i] = i
}
return &milvuspb.MutationResult{
Status: &commonpb.Status{
ErrorCode: errCode,
Reason: err.Error(),
},
ErrIndex: errIndex,
}
}
log.Debug("Enqueue upsert request in Proxy", log.Debug("Enqueue upsert request in Proxy",
zap.Int("len(FieldsData)", len(request.FieldsData)), zap.Int("len(FieldsData)", len(request.FieldsData)),
zap.Int("len(HashKeys)", len(request.HashKeys))) zap.Int("len(HashKeys)", len(request.HashKeys)))
@ -2380,9 +2364,19 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest)
// Not every error case changes the status internally // Not every error case changes the status internally
// change status there to handle it // change status there to handle it
if it.result.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success { if it.result.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
} }
return constructFailedResponse(err, it.result.GetStatus().GetErrorCode()), nil
numRows := request.NumRows
errIndex := make([]uint32, numRows)
for i := uint32(0); i < numRows; i++ {
errIndex[i] = i
}
return &milvuspb.MutationResult{
Status: merr.Status(err),
ErrIndex: errIndex,
}, nil
} }
if it.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { if it.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
@ -2538,7 +2532,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
}, },
} }
if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil {
resp.Status.Reason = "proxy is not healthy" resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
@ -2588,7 +2582,6 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (*
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
@ -2945,10 +2938,10 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest)
log := log.With(zap.String("db", req.GetDbName())) log := log.With(zap.String("db", req.GetDbName()))
resp := &milvuspb.FlushAllResponse{ resp := &milvuspb.FlushAllResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, Status: merr.Status(nil),
} }
if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil {
resp.Status.Reason = "proxy is not healthy" resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
log.Info(rpcReceived("FlushAll")) log.Info(rpcReceived("FlushAll"))
@ -2979,7 +2972,7 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest)
return dbName == req.GetDbName() return dbName == req.GetDbName()
}) })
if len(dbNames) == 0 { if len(dbNames) == 0 {
resp.Status.Reason = fmt.Sprintf("failed to get db %s", req.GetDbName()) resp.Status = merr.Status(merr.WrapErrDatabaseNotFound(req.GetDbName()))
return resp, nil return resp, nil
} }
} }
@ -3027,7 +3020,6 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest)
} }
resp.FlushAllTs = ts resp.FlushAllTs = ts
resp.Status.ErrorCode = commonpb.ErrorCode_Success
log.Info(rpcDone("FlushAll"), zap.Uint64("FlushAllTs", ts), log.Info(rpcDone("FlushAll"), zap.Uint64("FlushAllTs", ts),
zap.Time("FlushAllTime", tsoutil.PhysicalTime(ts))) zap.Time("FlushAllTime", tsoutil.PhysicalTime(ts)))
@ -3057,9 +3049,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
zap.Any("collection", req.CollectionName)) zap.Any("collection", req.CollectionName))
resp := &milvuspb.GetPersistentSegmentInfoResponse{ resp := &milvuspb.GetPersistentSegmentInfoResponse{
Status: &commonpb.Status{ Status: merr.Status(nil),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
} }
if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil {
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
@ -3074,7 +3064,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil { if err != nil {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error() resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
@ -3086,7 +3076,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
}) })
if err != nil { if err != nil {
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error() resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
@ -3104,7 +3094,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
metrics.FailLabel).Inc() metrics.FailLabel).Inc()
log.Warn("GetPersistentSegmentInfo fail", log.Warn("GetPersistentSegmentInfo fail",
zap.Error(err)) zap.Error(err))
resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error() resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
err = merr.Error(infoResp.GetStatus()) err = merr.Error(infoResp.GetStatus())
@ -3130,7 +3120,6 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method,
metrics.SuccessLabel).Inc() metrics.SuccessLabel).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.Infos = persistentInfos resp.Infos = persistentInfos
return resp, nil return resp, nil
} }
@ -3148,9 +3137,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
zap.Any("collection", req.CollectionName)) zap.Any("collection", req.CollectionName))
resp := &milvuspb.GetQuerySegmentInfoResponse{ resp := &milvuspb.GetQuerySegmentInfoResponse{
Status: &commonpb.Status{ Status: merr.Status(nil),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
} }
if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil {
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
@ -3206,7 +3193,6 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc()
metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds()))
resp.Status.ErrorCode = commonpb.ErrorCode_Success
resp.Infos = queryInfos resp.Infos = queryInfos
return resp, nil return resp, nil
} }
@ -3448,16 +3434,14 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq
return unhealthyStatus(), nil return unhealthyStatus(), nil
} }
status := &commonpb.Status{ status := merr.Status(nil)
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName())
if err != nil { if err != nil {
log.Warn("failed to get collection id", log.Warn("failed to get collection id",
zap.String("collectionName", req.GetCollectionName()), zap.String("collectionName", req.GetCollectionName()),
zap.Error(err)) zap.Error(err))
status.Reason = err.Error() status = merr.Status(err)
return status, nil return status, nil
} }
infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{ infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{
@ -3476,19 +3460,18 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq
log.Warn("Failed to LoadBalance from Query Coordinator", log.Warn("Failed to LoadBalance from Query Coordinator",
zap.Any("req", req), zap.Any("req", req),
zap.Error(err)) zap.Error(err))
status.Reason = err.Error() status = merr.Status(err)
return status, nil return status, nil
} }
if infoResp.ErrorCode != commonpb.ErrorCode_Success { if infoResp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("Failed to LoadBalance from Query Coordinator", log.Warn("Failed to LoadBalance from Query Coordinator",
zap.String("errMsg", infoResp.Reason)) zap.String("errMsg", infoResp.Reason))
status.Reason = infoResp.Reason status = infoResp
return status, nil return status, nil
} }
log.Debug("LoadBalance Done", log.Debug("LoadBalance Done",
zap.Any("req", req), zap.Any("req", req),
zap.Any("status", infoResp)) zap.Any("status", infoResp))
status.ErrorCode = commonpb.ErrorCode_Success
return status, nil return status, nil
} }
@ -3710,8 +3693,7 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi
if err != nil { if err != nil {
log.Error("failed to execute import request", log.Error("failed to execute import request",
zap.Error(err)) zap.Error(err))
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status = merr.Status(err)
resp.Status.Reason = "request options is not illegal \n" + err.Error() + " \nIllegal option format \n" + importutil.OptionFormat
return resp, nil return resp, nil
} }
@ -3726,7 +3708,6 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
log.Error("failed to execute bulk insert request", log.Error("failed to execute bulk insert request",
zap.Error(err)) zap.Error(err))
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
@ -3745,7 +3726,9 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt
log.Debug("received get import state request", log.Debug("received get import state request",
zap.Int64("taskID", req.GetTask())) zap.Int64("taskID", req.GetTask()))
resp := &milvuspb.GetImportStateResponse{} resp := &milvuspb.GetImportStateResponse{
Status: merr.Status(nil),
}
if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil {
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
@ -3760,7 +3743,6 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
log.Error("failed to execute get import state", log.Error("failed to execute get import state",
zap.Error(err)) zap.Error(err))
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
@ -3781,7 +3763,9 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport
log := log.Ctx(ctx) log := log.Ctx(ctx)
log.Debug("received list import tasks request") log.Debug("received list import tasks request")
resp := &milvuspb.ListImportTasksResponse{} resp := &milvuspb.ListImportTasksResponse{
Status: merr.Status(nil),
}
if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil {
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
@ -3795,7 +3779,6 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport
metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc()
log.Error("failed to execute list import tasks", log.Error("failed to execute list import tasks",
zap.Error(err)) zap.Error(err))
resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
resp.Status = merr.Status(err) resp.Status = merr.Status(err)
return resp, nil return resp, nil
} }
@ -4363,9 +4346,7 @@ func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refr
// SetRates limits the rates of requests. // SetRates limits the rates of requests.
func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) { func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) {
resp := &commonpb.Status{ resp := merr.Status(nil)
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil {
resp = unhealthyStatus() resp = unhealthyStatus()
return resp, nil return resp, nil
@ -4374,10 +4355,10 @@ func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesReques
err := node.multiRateLimiter.SetRates(request.GetRates()) err := node.multiRateLimiter.SetRates(request.GetRates())
// TODO: set multiple rate limiter rates // TODO: set multiple rate limiter rates
if err != nil { if err != nil {
resp.Reason = err.Error() resp = merr.Status(err)
return resp, nil return resp, nil
} }
resp.ErrorCode = commonpb.ErrorCode_Success
return resp, nil return resp, nil
} }

View File

@ -486,7 +486,7 @@ func TestProxy_FlushAll(t *testing.T) {
node.stateCode.Store(commonpb.StateCode_Abnormal) node.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := node.FlushAll(ctx, &milvuspb.FlushAllRequest{}) resp, err := node.FlushAll(ctx, &milvuspb.FlushAllRequest{})
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady)
node.stateCode.Store(commonpb.StateCode_Healthy) node.stateCode.Store(commonpb.StateCode_Healthy)
}) })

View File

@ -302,7 +302,7 @@ func TestRateLimiter(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second) ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() defer cancel()
// avoid production precision issues when comparing 0-terminated numbers // avoid production precision issues when comparing 0-terminated numbers
newRate := fmt.Sprintf("%.3f1", rand.Float64()) newRate := fmt.Sprintf("%.2f1", rand.Float64())
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/collectionRate", newRate) etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/collectionRate", newRate)
etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/partitionRate", "invalid") etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/partitionRate", "invalid")

View File

@ -1518,7 +1518,7 @@ func TestProxy(t *testing.T) {
Base: nil, Base: nil,
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) assert.ErrorIs(t, merr.Error(resp), merr.ErrCollectionNotFound)
}) })
// TODO(dragondriver): dummy // TODO(dragondriver): dummy
@ -2047,7 +2047,7 @@ func TestProxy(t *testing.T) {
resp, err := proxy.Upsert(ctx, req) resp, err := proxy.Upsert(ctx, req)
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_UpsertAutoIDTrue, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
assert.Equal(t, 0, len(resp.SuccIndex)) assert.Equal(t, 0, len(resp.SuccIndex))
assert.Equal(t, rowNum, len(resp.ErrIndex)) assert.Equal(t, rowNum, len(resp.ErrIndex))
assert.Equal(t, int64(0), resp.UpsertCnt) assert.Equal(t, int64(0), resp.UpsertCnt)

View File

@ -536,7 +536,9 @@ func (dct *describeCollectionTask) Execute(ctx context.Context) error {
// compatibility with PyMilvus existing implementation // compatibility with PyMilvus existing implementation
err := merr.Error(dct.result.GetStatus()) err := merr.Error(dct.result.GetStatus())
if errors.Is(err, merr.ErrCollectionNotFound) { if errors.Is(err, merr.ErrCollectionNotFound) {
// nolint
dct.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError dct.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
// nolint
dct.result.Status.Reason = "can't find collection " + dct.result.GetStatus().GetReason() dct.result.Status.Reason = "can't find collection " + dct.result.GetStatus().GetReason()
} }
} else { } else {

View File

@ -233,8 +233,7 @@ func (it *insertTask) Execute(ctx context.Context) error {
channelNames, err := it.chMgr.getVChannels(collID) channelNames, err := it.chMgr.getVChannels(collID)
if err != nil { if err != nil {
log.Warn("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err)) log.Warn("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
it.result.Status.Reason = err.Error()
return err return err
} }
@ -255,8 +254,7 @@ func (it *insertTask) Execute(ctx context.Context) error {
} }
if err != nil { if err != nil {
log.Warn("assign segmentID and repack insert data failed", zap.Error(err)) log.Warn("assign segmentID and repack insert data failed", zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
it.result.Status.Reason = err.Error()
return err return err
} }
assignSegmentIDDur := tr.RecordSpan() assignSegmentIDDur := tr.RecordSpan()
@ -266,8 +264,7 @@ func (it *insertTask) Execute(ctx context.Context) error {
err = stream.Produce(msgPack) err = stream.Produce(msgPack)
if err != nil { if err != nil {
log.Warn("fail to produce insert msg", zap.Error(err)) log.Warn("fail to produce insert msg", zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
it.result.Status.Reason = err.Error()
return err return err
} }
sendMsgDur := tr.RecordSpan() sendMsgDur := tr.RecordSpan()

View File

@ -389,8 +389,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
if err != nil { if err != nil {
log.Warn("get vChannels failed when insertExecute", log.Warn("get vChannels failed when insertExecute",
zap.Error(err)) zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
it.result.Status.Reason = err.Error()
return err return err
} }
@ -413,8 +412,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP
if err != nil { if err != nil {
log.Warn("assign segmentID and repack insert data failed when insertExecute", log.Warn("assign segmentID and repack insert data failed when insertExecute",
zap.Error(err)) zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
it.result.Status.Reason = err.Error()
return err return err
} }
assignSegmentIDDur := tr.RecordSpan() assignSegmentIDDur := tr.RecordSpan()
@ -438,8 +436,7 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP
channelNames, err := it.chMgr.getVChannels(collID) channelNames, err := it.chMgr.getVChannels(collID)
if err != nil { if err != nil {
log.Warn("get vChannels failed when deleteExecute", zap.Error(err)) log.Warn("get vChannels failed when deleteExecute", zap.Error(err))
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
it.result.Status.Reason = err.Error()
return err return err
} }
it.upsertMsg.DeleteMsg.PrimaryKeys = it.result.IDs it.upsertMsg.DeleteMsg.PrimaryKeys = it.result.IDs
@ -539,8 +536,7 @@ func (it *upsertTask) Execute(ctx context.Context) (err error) {
tr.RecordSpan() tr.RecordSpan()
err = stream.Produce(msgPack) err = stream.Produce(msgPack)
if err != nil { if err != nil {
it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError it.result.Status = merr.Status(err)
it.result.Status.Reason = err.Error()
return err return err
} }
sendMsgDur := tr.RecordSpan() sendMsgDur := tr.RecordSpan()

View File

@ -1192,8 +1192,9 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M
// upsert has not supported when autoID == true // upsert has not supported when autoID == true
log.Info("can not upsert when auto id enabled", log.Info("can not upsert when auto id enabled",
zap.String("primaryFieldSchemaName", primaryFieldSchema.Name)) zap.String("primaryFieldSchemaName", primaryFieldSchema.Name))
result.Status.ErrorCode = commonpb.ErrorCode_UpsertAutoIDTrue err := merr.WrapErrParameterInvalidMsg(fmt.Sprintf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.GetName()))
return nil, fmt.Errorf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name) result.Status = merr.Status(err)
return nil, err
} }
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema) primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
if err != nil { if err != nil {

View File

@ -1701,7 +1701,7 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) {
case4.schema.Fields[0].IsPrimaryKey = true case4.schema.Fields[0].IsPrimaryKey = true
case4.schema.Fields[0].AutoID = true case4.schema.Fields[0].AutoID = true
_, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, false) _, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, false)
assert.Equal(t, commonpb.ErrorCode_UpsertAutoIDTrue, case4.result.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(case4.result.GetStatus()), merr.ErrParameterInvalid)
assert.NotEqual(t, nil, err) assert.NotEqual(t, nil, err)
// primary field data is nil, GetPrimaryFieldData fail // primary field data is nil, GetPrimaryFieldData fail

View File

@ -212,7 +212,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
return nil, err return nil, err
} }
if resp.GetSegmentInfo() == nil { if resp.GetSegmentInfo() == nil {
err = merr.WrapErrCollectionNotFound(segmentID) err = merr.WrapErrIndexNotFoundForSegment(segmentID)
log.Warn("failed to get segment index info", log.Warn("failed to get segment index info",
zap.Error(err)) zap.Error(err))
return nil, err return nil, err
@ -220,7 +220,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID
segmentInfo, ok := resp.GetSegmentInfo()[segmentID] segmentInfo, ok := resp.GetSegmentInfo()[segmentID]
if !ok || len(segmentInfo.GetIndexInfos()) == 0 { if !ok || len(segmentInfo.GetIndexInfos()) == 0 {
return nil, merr.WrapErrIndexNotFound() return nil, merr.WrapErrIndexNotFoundForSegment(segmentID)
} }
indexes := make([]*querypb.FieldIndexInfo, 0) indexes := make([]*querypb.FieldIndexInfo, 0)

View File

@ -674,7 +674,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
srcNode := req.GetSourceNodeIDs()[0] srcNode := req.GetSourceNodeIDs()[0]
replica := s.meta.ReplicaManager.GetByCollectionAndNode(req.GetCollectionID(), srcNode) replica := s.meta.ReplicaManager.GetByCollectionAndNode(req.GetCollectionID(), srcNode)
if replica == nil { if replica == nil {
err := merr.WrapErrReplicaNotFound(-1, fmt.Sprintf("replica not found for collection %d and node %d", req.GetCollectionID(), srcNode)) err := merr.WrapErrNodeNotFound(srcNode, fmt.Sprintf("source node not found in any replica of collection %d", req.GetCollectionID()))
msg := "source node not found in any replica" msg := "source node not found in any replica"
log.Warn(msg) log.Warn(msg)
return merr.Status(err), nil return merr.Status(err), nil
@ -685,9 +685,8 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques
} }
for _, dstNode := range req.GetDstNodeIDs() { for _, dstNode := range req.GetDstNodeIDs() {
if !replica.Contains(dstNode) { if !replica.Contains(dstNode) {
err := merr.WrapErrParameterInvalid("destination node in the same replica as source node", fmt.Sprintf("destination node %d not in replica %d", dstNode, replica.GetID())) err := merr.WrapErrNodeNotFound(dstNode, "destination node not found in the same replica")
msg := "destination nodes have to be in the same replica of source node" log.Warn("failed to balance to the destination node", zap.Error(err))
log.Warn(msg)
return merr.Status(err), nil return merr.Status(err), nil
} }
if err := s.isStoppingNode(dstNode); err != nil { if err := s.isStoppingNode(dstNode); err != nil {

View File

@ -1241,7 +1241,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() {
} }
resp, err := server.LoadBalance(ctx, req) resp, err := server.LoadBalance(ctx, req)
suite.NoError(err) suite.NoError(err)
suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid) suite.ErrorIs(merr.Error(resp), merr.ErrNodeNotFound)
} }
// Test balance task failed // Test balance task failed

View File

@ -502,7 +502,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
}, },
}, },
}, nil) }, nil)
suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFound()) suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFoundForSegment(segment))
} }
suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil) suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil)

View File

@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/delegator"
"github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
@ -100,10 +99,10 @@ func (node *QueryNode) loadDeltaLogs(ctx context.Context, req *querypb.LoadSegme
if finalErr != nil { if finalErr != nil {
log.Warn("failed to load delta logs", zap.Error(finalErr)) log.Warn("failed to load delta logs", zap.Error(finalErr))
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to load delta logs", finalErr) return merr.Status(finalErr)
} }
return util.SuccessStatus() return merr.Status(nil)
} }
func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsRequest) *commonpb.Status { func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsRequest) *commonpb.Status {
@ -112,7 +111,7 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR
zap.Int64s("segmentIDs", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })), zap.Int64s("segmentIDs", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })),
) )
status := util.SuccessStatus() status := merr.Status(nil)
log.Info("start to load index") log.Info("start to load index")
for _, info := range req.GetInfos() { for _, info := range req.GetInfos() {

View File

@ -42,7 +42,6 @@ import (
"github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/querynodev2/tasks"
"github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/internal/util/streamrpc"
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
@ -234,7 +233,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
if !node.subscribingChannels.Insert(channel.GetChannelName()) { if !node.subscribingChannels.Insert(channel.GetChannelName()) {
msg := "channel subscribing..." msg := "channel subscribing..."
log.Warn(msg) log.Warn(msg)
return util.SuccessStatus(msg), nil return merr.Status(nil), nil
} }
defer node.subscribingChannels.Remove(channel.GetChannelName()) defer node.subscribingChannels.Remove(channel.GetChannelName())
@ -248,7 +247,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
_, exist := node.delegators.Get(channel.GetChannelName()) _, exist := node.delegators.Get(channel.GetChannelName())
if exist { if exist {
log.Info("channel already subscribed") log.Info("channel already subscribed")
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(),
@ -259,7 +258,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
node.clusterManager, node.manager, node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp()) node.clusterManager, node.manager, node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp())
if err != nil { if err != nil {
log.Warn("failed to create shard delegator", zap.Error(err)) log.Warn("failed to create shard delegator", zap.Error(err))
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to create shard delegator", err), nil return merr.Status(err), nil
} }
node.delegators.Insert(channel.GetChannelName(), delegator) node.delegators.Insert(channel.GetChannelName(), delegator)
defer func() { defer func() {
@ -280,7 +279,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
if err != nil { if err != nil {
msg := "failed to create pipeline" msg := "failed to create pipeline"
log.Warn(msg, zap.Error(err)) log.Warn(msg, zap.Error(err))
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil return merr.Status(err), nil
} }
defer func() { defer func() {
if err != nil { if err != nil {
@ -316,7 +315,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
if err != nil { if err != nil {
msg := "failed to load growing segments" msg := "failed to load growing segments"
log.Warn(msg, zap.Error(err)) log.Warn(msg, zap.Error(err))
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil return merr.Status(err), nil
} }
position := &msgpb.MsgPosition{ position := &msgpb.MsgPosition{
ChannelName: channel.SeekPosition.ChannelName, ChannelName: channel.SeekPosition.ChannelName,
@ -338,7 +337,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
// delegator after all steps done // delegator after all steps done
delegator.Start() delegator.Start()
log.Info("watch dml channel success") log.Info("watch dml channel success")
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) { func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) {
@ -382,7 +381,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
} }
log.Info("unsubscribed channel") log.Info("unsubscribed channel")
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
@ -434,9 +433,8 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
node.lifetime.Done() node.lifetime.Done()
// check target matches // check target matches
if req.GetBase().GetTargetID() != paramtable.GetNodeID() { if err := merr.CheckTargetID(req.GetBase()); err != nil {
return util.WrapStatus(commonpb.ErrorCode_NodeIDNotMatch, return merr.Status(err), nil
common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID())), nil
} }
// Delegates request to workers // Delegates request to workers
@ -445,17 +443,18 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
if !ok { if !ok {
msg := "failed to load segments, delegator not found" msg := "failed to load segments, delegator not found"
log.Warn(msg) log.Warn(msg)
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil err := merr.WrapErrChannelNotFound(segment.GetInsertChannel())
return merr.Status(err), nil
} }
req.NeedTransfer = false req.NeedTransfer = false
err := delegator.LoadSegments(ctx, req) err := delegator.LoadSegments(ctx, req)
if err != nil { if err != nil {
log.Warn("delegator failed to load segments", zap.Error(err)) log.Warn("delegator failed to load segments", zap.Error(err))
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil return merr.Status(err), nil
} }
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
if req.GetLoadScope() == querypb.LoadScope_Delta { if req.GetLoadScope() == querypb.LoadScope_Delta {
@ -486,7 +485,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
log.Info("load segments done...", log.Info("load segments done...",
zap.Int64s("segments", lo.Map(loaded, func(s segments.Segment, _ int) int64 { return s.ID() }))) zap.Int64s("segments", lo.Map(loaded, func(s segments.Segment, _ int) int64 { return s.ID() })))
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
// ReleaseCollection clears all data related to this collection on the querynode // ReleaseCollection clears all data related to this collection on the querynode
@ -498,7 +497,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.Releas
} }
defer node.lifetime.Done() defer node.lifetime.Done()
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
// ReleasePartitions clears all data related to this partition on the querynode // ReleasePartitions clears all data related to this partition on the querynode
@ -526,7 +525,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.Relea
} }
log.Info("release partitions done") log.Info("release partitions done")
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
// ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID // ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID
@ -565,7 +564,8 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
if !ok { if !ok {
msg := "failed to release segment, delegator not found" msg := "failed to release segment, delegator not found"
log.Warn(msg) log.Warn(msg)
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil err := merr.WrapErrChannelNotFound(req.GetShard())
return merr.Status(err), nil
} }
// when we try to release a segment, add it to pipeline's exclude list first // when we try to release a segment, add it to pipeline's exclude list first
@ -587,10 +587,10 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
err := delegator.ReleaseSegments(ctx, req, false) err := delegator.ReleaseSegments(ctx, req, false)
if err != nil { if err != nil {
log.Warn("delegator failed to release segment", zap.Error(err)) log.Warn("delegator failed to release segment", zap.Error(err))
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil return merr.Status(err), nil
} }
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
log.Info("start to release segments") log.Info("start to release segments")
@ -601,7 +601,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
} }
node.manager.Collection.Unref(req.GetCollectionID(), uint32(sealedCount)) node.manager.Collection.Unref(req.GetCollectionID(), uint32(sealedCount))
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
// GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ... // GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ...
@ -831,7 +831,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
result, err := segments.ReduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) result, err := segments.ReduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType())
if err != nil { if err != nil {
log.Warn("failed to reduce search results", zap.Error(err)) log.Warn("failed to reduce search results", zap.Error(err))
failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError
failRet.Status = merr.Status(err) failRet.Status = merr.Status(err)
return failRet, nil return failRet, nil
} }
@ -852,10 +851,8 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
// only used for delegator query segments from worker // only used for delegator query segments from worker
func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) {
failRet := &internalpb.RetrieveResults{ resp := &internalpb.RetrieveResults{
Status: &commonpb.Status{ Status: merr.Status(nil),
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
} }
msgID := req.Req.Base.GetMsgID() msgID := req.Req.Base.GetMsgID()
traceID := trace.SpanFromContext(ctx).SpanContext().TraceID() traceID := trace.SpanFromContext(ctx).SpanContext().TraceID()
@ -869,14 +866,14 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
if !node.lifetime.Add(commonpbutil.IsHealthy) { if !node.lifetime.Add(commonpbutil.IsHealthy) {
err := merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())) err := merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID()))
failRet.Status = merr.Status(err) resp.Status = merr.Status(err)
return failRet, nil return resp, nil
} }
defer node.lifetime.Done() defer node.lifetime.Done()
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc() metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc()
defer func() { defer func() {
if failRet.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc() metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc()
} }
}() }()
@ -892,22 +889,22 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
tr := timerecord.NewTimeRecorder("querySegments") tr := timerecord.NewTimeRecorder("querySegments")
collection := node.manager.Collection.Get(req.Req.GetCollectionID()) collection := node.manager.Collection.Get(req.Req.GetCollectionID())
if collection == nil { if collection == nil {
failRet.Status = merr.Status(merr.WrapErrCollectionNotLoaded(req.Req.GetCollectionID())) resp.Status = merr.Status(merr.WrapErrCollectionNotLoaded(req.Req.GetCollectionID()))
return failRet, nil return resp, nil
} }
// Send task to scheduler and wait until it finished. // Send task to scheduler and wait until it finished.
task := tasks.NewQueryTask(queryCtx, collection, node.manager, req) task := tasks.NewQueryTask(queryCtx, collection, node.manager, req)
if err := node.scheduler.Add(task); err != nil { if err := node.scheduler.Add(task); err != nil {
log.Warn("failed to add query task into scheduler", zap.Error(err)) log.Warn("failed to add query task into scheduler", zap.Error(err))
failRet.Status = merr.Status(err) resp.Status = merr.Status(err)
return failRet, nil return resp, nil
} }
err := task.Wait() err := task.Wait()
if err != nil { if err != nil {
log.Warn("failed to query channel", zap.Error(err)) log.Warn("failed to query channel", zap.Error(err))
failRet.Status = merr.Status(err) resp.Status = merr.Status(err)
return failRet, nil return resp, nil
} }
tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v",
@ -917,7 +914,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ
req.GetSegmentIDs(), req.GetSegmentIDs(),
)) ))
failRet.Status.ErrorCode = commonpb.ErrorCode_Success
// TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency // TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency
latency := tr.ElapseSpan() latency := tr.ElapseSpan()
metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds())) metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds()))
@ -1143,7 +1139,7 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp
// SyncReplicaSegments syncs replica node & segments states // SyncReplicaSegments syncs replica node & segments states
func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) { func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) {
return util.SuccessStatus(), nil return merr.Status(nil), nil
} }
// ShowConfigurations returns the configurations of queryNode matching req.Pattern // ShowConfigurations returns the configurations of queryNode matching req.Pattern
@ -1416,7 +1412,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
LoadScope: querypb.LoadScope_Delta, LoadScope: querypb.LoadScope_Delta,
}) })
if err != nil { if err != nil {
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to sync(load) segment", err), nil return merr.Status(err), nil
} }
} }

View File

@ -780,20 +780,19 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
// Delegator not found // Delegator not found
status, err := suite.node.LoadSegments(ctx, req) status, err := suite.node.LoadSegments(ctx, req)
suite.NoError(err) suite.NoError(err)
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) suite.ErrorIs(merr.Error(status), merr.ErrChannelNotFound)
suite.Contains(status.GetReason(), "failed to load segments, delegator not found")
// target not match // target not match
req.Base.TargetID = -1 req.Base.TargetID = -1
status, err = suite.node.LoadSegments(ctx, req) status, err = suite.node.LoadSegments(ctx, req)
suite.NoError(err) suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NodeIDNotMatch, status.GetErrorCode()) suite.ErrorIs(merr.Error(status), merr.ErrNodeNotMatch)
// node not healthy // node not healthy
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
status, err = suite.node.LoadSegments(ctx, req) status, err = suite.node.LoadSegments(ctx, req)
suite.NoError(err) suite.NoError(err)
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) suite.ErrorIs(merr.Error(status), merr.ErrServiceNotReady)
} }
func (suite *ServiceSuite) TestLoadSegments_Transfer() { func (suite *ServiceSuite) TestLoadSegments_Transfer() {

View File

@ -16,7 +16,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/collector"
"github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/segments"
"github.com/milvus-io/milvus/internal/util"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
@ -220,7 +219,7 @@ func (t *SearchTask) Execute() error {
Base: &commonpb.MsgBase{ Base: &commonpb.MsgBase{
SourceID: paramtable.GetNodeID(), SourceID: paramtable.GetNodeID(),
}, },
Status: util.WrapStatus(commonpb.ErrorCode_Success, ""), Status: merr.Status(nil),
MetricType: req.GetReq().GetMetricType(), MetricType: req.GetReq().GetMetricType(),
NumQueries: t.originNqs[i], NumQueries: t.originNqs[i],
TopK: t.originTopks[i], TopK: t.originTopks[i],

View File

@ -1941,7 +1941,6 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask
err = fmt.Errorf("failed to find collection ID from its name: '%s', error: %w", req.GetCollectionName(), err) err = fmt.Errorf("failed to find collection ID from its name: '%s', error: %w", req.GetCollectionName(), err)
log.Error("ListImportTasks failed", zap.Error(err)) log.Error("ListImportTasks failed", zap.Error(err))
status := merr.Status(err) status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_IllegalCollectionName
return &milvuspb.ListImportTasksResponse{ return &milvuspb.ListImportTasksResponse{
Status: status, Status: status,
}, nil }, nil
@ -2231,7 +2230,6 @@ func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequ
metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc()
status := merr.Status(err) status := merr.Status(err)
status.ErrorCode = commonpb.ErrorCode_ListCredUsersFailure
return &milvuspb.ListCredUsersResponse{Status: status}, nil return &milvuspb.ListCredUsersResponse{Status: status}, nil
} }
ctxLog.Debug("ListCredUsers success") ctxLog.Debug("ListCredUsers success")

View File

@ -19,7 +19,6 @@ package rootcoord
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/milvus-io/milvus/pkg/common"
"math/rand" "math/rand"
"os" "os"
"sync" "sync"
@ -47,6 +46,7 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/merr"
@ -1216,6 +1216,7 @@ func TestCore_Import(t *testing.T) {
}, },
}, },
}) })
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp2.GetStatus()), merr.ErrBulkInsertPartitionNotFound) assert.ErrorIs(t, merr.Error(resp2.GetStatus()), merr.ErrBulkInsertPartitionNotFound)
}) })
} }
@ -1336,7 +1337,7 @@ func TestCore_ListImportTasks(t *testing.T) {
CollectionID: ti3.CollectionId, CollectionID: ti3.CollectionId,
}, nil }, nil
} }
return nil, errors.New("GetCollectionByName error") return nil, merr.WrapErrCollectionNotFound(collectionName)
} }
ctx := context.Background() ctx := context.Background()
@ -1374,7 +1375,7 @@ func TestCore_ListImportTasks(t *testing.T) {
}) })
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 0, len(resp.GetTasks())) assert.Equal(t, 0, len(resp.GetTasks()))
assert.Equal(t, commonpb.ErrorCode_IllegalCollectionName, resp.GetStatus().GetErrorCode()) assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrCollectionNotFound)
// list the latest 2 tasks // list the latest 2 tasks
resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{ resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{

View File

@ -1,50 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package util
import (
"fmt"
"strings"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
)
// WrapStatus wraps status with given error code, message and errors
func WrapStatus(code commonpb.ErrorCode, msg string, errs ...error) *commonpb.Status {
status := &commonpb.Status{
ErrorCode: code,
Reason: msg,
}
for _, err := range errs {
status.Reason = fmt.Sprintf("%s, err=%v", status.Reason, err)
}
return status
}
// SuccessStatus returns a success status with given message
func SuccessStatus(msgs ...string) *commonpb.Status {
return &commonpb.Status{
Reason: strings.Join(msgs, "; "),
}
}
// WrapError wraps error with given message
func WrapError(msg string, err error) error {
return fmt.Errorf("%s[%w]", msg, err)
}

View File

@ -74,7 +74,9 @@ var (
ErrSegmentReduplicate = newMilvusError("segment reduplicates", 603, false) ErrSegmentReduplicate = newMilvusError("segment reduplicates", 603, false)
// Index related // Index related
ErrIndexNotFound = newMilvusError("index not found", 700, false) ErrIndexNotFound = newMilvusError("index not found", 700, false)
ErrIndexNotSupported = newMilvusError("index type not supported", 701, false)
ErrIndexDuplicate = newMilvusError("index duplicates", 702, false)
// Database related // Database related
ErrDatabaseNotFound = newMilvusError("database not found", 800, false) ErrDatabaseNotFound = newMilvusError("database not found", 800, false)

View File

@ -27,12 +27,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
) )
// For compatibility
var oldErrCodes = map[int32]commonpb.ErrorCode{
ErrServiceNotReady.code(): commonpb.ErrorCode_NotReadyServe,
ErrCollectionNotFound.code(): commonpb.ErrorCode_CollectionNotExists,
}
// Code returns the error code of the given error, // Code returns the error code of the given error,
// WARN: DO NOT use this for now // WARN: DO NOT use this for now
func Code(err error) int32 { func Code(err error) int32 {
@ -125,6 +119,12 @@ func oldCode(code int32) commonpb.ErrorCode {
case ErrServiceForceDeny.code(): case ErrServiceForceDeny.code():
return commonpb.ErrorCode_ForceDeny return commonpb.ErrorCode_ForceDeny
case ErrIndexNotFound.code():
return commonpb.ErrorCode_IndexNotExist
case ErrSegmentNotFound.code():
return commonpb.ErrorCode_SegmentNotFound
default: default:
return commonpb.ErrorCode_UnexpectedError return commonpb.ErrorCode_UnexpectedError
} }
@ -156,6 +156,12 @@ func OldCodeToMerr(code commonpb.ErrorCode) error {
case commonpb.ErrorCode_ForceDeny: case commonpb.ErrorCode_ForceDeny:
return ErrServiceForceDeny return ErrServiceForceDeny
case commonpb.ErrorCode_IndexNotExist:
return ErrIndexNotFound
case commonpb.ErrorCode_SegmentNotFound:
return ErrSegmentNotFound
default: default:
return errUnexpected return errUnexpected
} }
@ -450,8 +456,32 @@ func WrapErrSegmentReduplicate(id int64, msg ...string) error {
} }
// Index related // Index related
func WrapErrIndexNotFound(msg ...string) error { func WrapErrIndexNotFound(indexName string, msg ...string) error {
err := error(ErrIndexNotFound) err := wrapWithField(ErrIndexNotFound, "indexName", indexName)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrIndexNotFoundForSegment(segmentID int64, msg ...string) error {
err := wrapWithField(ErrIndexNotFound, "segmentID", segmentID)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrIndexNotSupported(indexType string, msg ...string) error {
err := wrapWithField(ErrIndexNotSupported, "indexType", indexType)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrIndexDuplicate(indexName string, msg ...string) error {
err := wrapWithField(ErrIndexDuplicate, "indexName", indexName)
if len(msg) > 0 { if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; ")) err = errors.Wrap(err, strings.Join(msg, "; "))
} }
@ -483,6 +513,14 @@ func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error {
return err return err
} }
func WrapErrNodeLackAny(msg ...string) error {
err := error(ErrNodeLack)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
func WrapErrNodeNotAvailable(id int64, msg ...string) error { func WrapErrNodeNotAvailable(id int64, msg ...string) error {
err := wrapWithField(ErrNodeNotAvailable, "node", id) err := wrapWithField(ErrNodeNotAvailable, "node", id)
if len(msg) > 0 { if len(msg) > 0 {

View File

@ -1924,7 +1924,7 @@ class TestUtilityAdvanced(TestcaseBase):
# load balance # load balance
self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids, self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids,
check_task=CheckTasks.err_res, check_task=CheckTasks.err_res,
check_items={ct.err_code: 1, ct.err_msg: "no available queryNode to allocate"}) check_items={ct.err_code: 1, ct.err_msg: "destination node not found in the same replica"})
@pytest.mark.tags(CaseLabel.L1) @pytest.mark.tags(CaseLabel.L1)
@pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19441") @pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19441")