diff --git a/.golangci.yml b/.golangci.yml index 15a1f5c5db..a6c224f296 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -74,7 +74,10 @@ linters-settings: forbidigo: forbid: - '^time\.Tick$' - - '^return merr.Err[a-zA-Z]+$' + - 'return merr\.Err[a-zA-Z]+' + - 'merr\.Wrap\w+\(\)\.Error\(\)' + - '\.(ErrorCode|Reason) = ' + - 'Reason:\s+\w+\.Error\(\)' #- 'fmt\.Print.*' WIP issues: diff --git a/internal/datacoord/errors.go b/internal/datacoord/errors.go index e74b672646..9127a28f8a 100644 --- a/internal/datacoord/errors.go +++ b/internal/datacoord/errors.go @@ -22,12 +22,6 @@ import ( "github.com/cockroachdb/errors" ) -// errNilKvClient stands for a nil kv client is detected when initialized -var errNilKvClient = errors.New("kv client not initialized") - -// serverNotServingErrMsg used for Status Reason when DataCoord is not healthy -const serverNotServingErrMsg = "DataCoord is not serving" - // errors for VerifyResponse var ( errNilResponse = errors.New("response is nil") diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index 88a3a07122..fc1c28ba77 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -148,41 +148,34 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques log.Info("receive CreateIndex request", zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()), zap.Any("TypeParams", req.GetTypeParams()), - zap.Any("IndexParams", req.GetIndexParams())) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID())) - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return errResp, nil + zap.Any("IndexParams", req.GetIndexParams()), + ) + + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) + return merr.Status(err), nil } metrics.IndexRequestCounter.WithLabelValues(metrics.TotalLabel).Inc() indexID, err := s.meta.CanCreateIndex(req) if err != nil { - log.Error("CreateIndex failed", zap.Error(err)) - errResp.Reason = err.Error() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return errResp, nil + return merr.Status(err), nil } if indexID == 0 { indexID, err = s.allocator.allocID(ctx) if err != nil { log.Warn("failed to alloc indexID", zap.Error(err)) - errResp.Reason = "failed to alloc indexID" metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return errResp, nil + return merr.Status(err), nil } if getIndexType(req.GetIndexParams()) == diskAnnIndex && !s.indexNodeManager.ClientSupportDisk() { errMsg := "all IndexNodes do not support disk indexes, please verify" log.Warn(errMsg) - errResp.Reason = errMsg + err = merr.WrapErrIndexNotSupported(diskAnnIndex) metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return errResp, nil + return merr.Status(err), nil } } @@ -204,9 +197,8 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques if err != nil { log.Error("CreateIndex fail", zap.Int64("fieldID", req.GetFieldID()), zap.String("indexName", req.GetIndexName()), zap.Error(err)) - errResp.Reason = err.Error() metrics.IndexRequestCounter.WithLabelValues(metrics.FailLabel).Inc() - return errResp, nil + return merr.Status(err), nil } select { @@ -217,9 +209,8 @@ func (s *Server) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques log.Info("CreateIndex successfully", zap.String("IndexName", req.GetIndexName()), zap.Int64("fieldID", req.GetFieldID()), zap.Int64("IndexID", indexID)) - errResp.ErrorCode = commonpb.ErrorCode_Success metrics.IndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc() - return errResp, nil + return merr.Status(nil), nil } // GetIndexState gets the index state of the index name in the request from Proxy. @@ -231,35 +222,27 @@ func (s *Server) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRe log.Info("receive GetIndexState request", zap.String("indexName", req.GetIndexName())) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID())) - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) return &indexpb.GetIndexStateResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) if len(indexes) == 0 { - errResp.ErrorCode = commonpb.ErrorCode_IndexNotExist - errResp.Reason = fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) - log.Error("GetIndexState fail", - zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errResp.Reason)) + err := merr.WrapErrIndexNotFound(req.GetIndexName()) + log.Warn("GetIndexState fail", + zap.String("indexName", req.GetIndexName()), zap.Error(err)) return &indexpb.GetIndexStateResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } if len(indexes) > 1 { log.Warn(msgAmbiguousIndexName()) - errResp.ErrorCode = commonpb.ErrorCode_UnexpectedError - errResp.Reason = msgAmbiguousIndexName() + err := merr.WrapErrIndexDuplicate(req.GetIndexName()) return &indexpb.GetIndexStateResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } ret := &indexpb.GetIndexStateResponse{ @@ -289,17 +272,14 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme zap.Int64("collectionID", req.GetCollectionID()), ) log.Info("receive GetSegmentIndexState", - zap.String("IndexName", req.GetIndexName()), zap.Int64s("fieldID", req.GetSegmentIDs())) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID())) - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) + zap.String("IndexName", req.GetIndexName()), + zap.Int64s("fieldID", req.GetSegmentIDs()), + ) + + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) return &indexpb.GetSegmentIndexStateResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } @@ -309,13 +289,10 @@ func (s *Server) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegme } indexID2CreateTs := s.meta.GetIndexIDByName(req.GetCollectionID(), req.GetIndexName()) if len(indexID2CreateTs) == 0 { - errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.GetIndexName()) - log.Warn("GetSegmentIndexState fail", zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errMsg)) + err := merr.WrapErrIndexNotFound(req.GetIndexName()) + log.Warn("GetSegmentIndexState fail", zap.String("indexName", req.GetIndexName()), zap.Error(err)) return &indexpb.GetSegmentIndexStateResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - Reason: errMsg, - }, + Status: merr.Status(err), }, nil } for _, segID := range req.GetSegmentIDs() { @@ -464,37 +441,28 @@ func (s *Server) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetInde zap.Int64("collectionID", req.GetCollectionID()), ) log.Info("receive GetIndexBuildProgress request", zap.String("indexName", req.GetIndexName())) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID())) - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) + + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) return &indexpb.GetIndexBuildProgressResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) if len(indexes) == 0 { - errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) - log.Warn("GetIndexBuildProgress fail", zap.String("indexName", req.IndexName), zap.String("fail reason", errMsg)) + err := merr.WrapErrIndexNotFound(req.GetIndexName()) + log.Warn("GetIndexBuildProgress fail", zap.String("indexName", req.IndexName), zap.Error(err)) return &indexpb.GetIndexBuildProgressResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - Reason: errMsg, - }, + Status: merr.Status(err), }, nil } if len(indexes) > 1 { log.Warn(msgAmbiguousIndexName()) - errResp.ErrorCode = commonpb.ErrorCode_UnexpectedError - errResp.Reason = msgAmbiguousIndexName() + err := merr.WrapErrIndexDuplicate(req.GetIndexName()) return &indexpb.GetIndexBuildProgressResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } indexInfo := &indexpb.IndexInfo{ @@ -525,28 +493,20 @@ func (s *Server) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRe ) log.Info("receive DescribeIndex request", zap.String("indexName", req.GetIndexName()), zap.Uint64("timestamp", req.GetTimestamp())) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID())) - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) + + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) return &indexpb.DescribeIndexResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) if len(indexes) == 0 { - errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) - log.Warn("DescribeIndex fail", zap.String("indexName", req.GetIndexName()), zap.String("fail reason", errMsg)) + err := merr.WrapErrIndexNotFound(req.GetIndexName()) + log.Warn("DescribeIndex fail", zap.String("indexName", req.GetIndexName()), zap.Error(err)) return &indexpb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - Reason: fmt.Sprint("index doesn't exist, collectionID ", req.GetCollectionID()), - }, + Status: merr.Status(err), }, nil } @@ -590,24 +550,21 @@ func (s *Server) GetIndexStatistics(ctx context.Context, req *indexpb.GetIndexSt zap.Int64("collectionID", req.GetCollectionID()), ) log.Info("receive GetIndexStatistics request", zap.String("indexName", req.GetIndexName())) - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(s.serverID())) + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) return &indexpb.GetIndexStatisticsResponse{ - Status: s.UnhealthyStatus(), + Status: merr.Status(err), }, nil } indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) if len(indexes) == 0 { - errMsg := fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName) + err := merr.WrapErrIndexNotFound(req.GetIndexName()) log.Warn("GetIndexStatistics fail", zap.String("indexName", req.GetIndexName()), - zap.String("fail reason", errMsg)) + zap.Error(err)) return &indexpb.GetIndexStatisticsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - Reason: fmt.Sprint("index doesn't exist, collectionID ", req.GetCollectionID()), - }, + Status: merr.Status(err), }, nil } @@ -652,30 +609,22 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) ( log.Info("receive DropIndex request", zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.String("indexName", req.GetIndexName()), zap.Bool("drop all indexes", req.GetDropAll())) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID())) - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return errResp, nil - } - ret := merr.Status(nil) + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) + return merr.Status(err), nil + } indexes := s.meta.GetIndexesForCollection(req.GetCollectionID(), req.GetIndexName()) if len(indexes) == 0 { log.Info(fmt.Sprintf("there is no index on collection: %d with the index name: %s", req.CollectionID, req.IndexName)) - return ret, nil + return merr.Status(nil), nil } if !req.GetDropAll() && len(indexes) > 1 { log.Warn(msgAmbiguousIndexName()) - ret.ErrorCode = commonpb.ErrorCode_UnexpectedError - ret.Reason = msgAmbiguousIndexName() - return ret, nil + err := merr.WrapErrIndexDuplicate(req.GetIndexName()) + return merr.Status(err), nil } indexIDs := make([]UniqueID, 0) for _, index := range indexes { @@ -686,15 +635,13 @@ func (s *Server) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) ( err := s.meta.MarkIndexAsDeleted(req.GetCollectionID(), indexIDs) if err != nil { log.Warn("DropIndex fail", zap.String("indexName", req.IndexName), zap.Error(err)) - ret.ErrorCode = commonpb.ErrorCode_UnexpectedError - ret.Reason = err.Error() - return ret, nil + return merr.Status(err), nil } } log.Debug("DropIndex success", zap.Int64s("partitionIDs", req.GetPartitionIDs()), zap.String("indexName", req.GetIndexName()), zap.Int64s("indexIDs", indexIDs)) - return ret, nil + return merr.Status(nil), nil } // GetIndexInfos gets the index file paths for segment from DataCoord. @@ -702,16 +649,11 @@ func (s *Server) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoReq log := log.Ctx(ctx).With( zap.Int64("collectionID", req.GetCollectionID()), ) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID())) - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) + + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + log.Warn(msgDataCoordIsUnhealthy(paramtable.GetNodeID()), zap.Error(err)) return &indexpb.GetIndexInfoResponse{ - Status: errResp, + Status: merr.Status(err), }, nil } ret := &indexpb.GetIndexInfoResponse{ diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index 68b84792c4..cc93da75fd 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" ) func TestServerId(t *testing.T) { @@ -100,7 +101,7 @@ func TestServer_CreateIndex(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Abnormal) resp, err := s.CreateIndex(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) }) t.Run("index not consistent", func(t *testing.T) { @@ -192,7 +193,7 @@ func TestServer_GetIndexState(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) resp, err := s.GetIndexState(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) s.stateCode.Store(commonpb.StateCode_Healthy) @@ -384,7 +385,7 @@ func TestServer_GetSegmentIndexState(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Abnormal) resp, err := s.GetSegmentIndexState(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) t.Run("no indexes", func(t *testing.T) { @@ -518,7 +519,7 @@ func TestServer_GetIndexBuildProgress(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) resp, err := s.GetIndexBuildProgress(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) t.Run("no indexes", func(t *testing.T) { @@ -998,7 +999,7 @@ func TestServer_DescribeIndex(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) resp, err := s.DescribeIndex(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) s.stateCode.Store(commonpb.StateCode_Healthy) @@ -1442,7 +1443,7 @@ func TestServer_DropIndex(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) resp, err := s.DropIndex(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) }) s.stateCode.Store(commonpb.StateCode_Healthy) @@ -1602,7 +1603,7 @@ func TestServer_GetIndexInfos(t *testing.T) { s.stateCode.Store(commonpb.StateCode_Initializing) resp, err := s.GetIndexInfos(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) s.stateCode.Store(commonpb.StateCode_Healthy) diff --git a/internal/datacoord/indexnode_manager_test.go b/internal/datacoord/indexnode_manager_test.go index a0552e4bf6..f0152dac1d 100644 --- a/internal/datacoord/indexnode_manager_test.go +++ b/internal/datacoord/indexnode_manager_test.go @@ -25,7 +25,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -57,47 +56,32 @@ func TestIndexNodeManager_PeekClient(t *testing.T) { return ic } + err := errors.New("error") + t.Run("multiple unavailable IndexNode", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.TODO(), nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, errors.New("error")), + Status: merr.Status(err), + }, err), 2: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, errors.New("error")), + Status: merr.Status(err), + }, err), 3: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, errors.New("error")), + Status: merr.Status(err), + }, err), 4: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - }, errors.New("error")), + Status: merr.Status(err), + }, err), 5: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "fail reason", - }, + Status: merr.Status(err), }, nil), 6: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "fail reason", - }, + Status: merr.Status(err), }, nil), 7: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "fail reason", - }, + Status: merr.Status(err), }, nil), 8: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ TaskSlots: 1, @@ -123,6 +107,8 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { return ic } + err := errors.New("error") + t.Run("support", func(t *testing.T) { nm := &IndexNodeManager{ ctx: context.Background(), @@ -175,7 +161,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { ctx: context.Background(), lock: sync.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ - 1: getMockedGetJobStatsClient(nil, errors.New("error")), + 1: getMockedGetJobStatsClient(nil, err), }, } @@ -189,10 +175,7 @@ func TestIndexNodeManager_ClientSupportDisk(t *testing.T) { lock: sync.RWMutex{}, nodeClients: map[UniqueID]types.IndexNodeClient{ 1: getMockedGetJobStatsClient(&indexpb.GetJobStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "fail reason", - }, + Status: merr.Status(err), TaskSlots: 0, JobInfos: nil, EnableDisk: false, diff --git a/internal/datacoord/metrics_info.go b/internal/datacoord/metrics_info.go index eaa4868115..ee3ba9dfe7 100644 --- a/internal/datacoord/metrics_info.go +++ b/internal/datacoord/metrics_info.go @@ -27,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/hardware" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -88,20 +89,16 @@ func (s *Server) getSystemInfoMetrics( } resp := &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - Response: "", + Status: merr.Status(nil), ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), } var err error resp.Response, err = metricsinfo.MarshalTopology(coordTopology) if err != nil { - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } diff --git a/internal/datacoord/metrics_info_test.go b/internal/datacoord/metrics_info_test.go index ed4d4f2b3a..e9accb077b 100644 --- a/internal/datacoord/metrics_info_test.go +++ b/internal/datacoord/metrics_info_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/assert" "google.golang.org/grpc" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/pkg/util/merr" @@ -97,13 +96,11 @@ func TestGetDataNodeMetrics(t *testing.T) { assert.NoError(t, err) assert.True(t, info.HasError) + mockErr := errors.New("mocked error") // mock status not success mockFailClientCreator = getMockFailedClientCreator(func() (*milvuspb.GetMetricsResponse, error) { return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "mocked error", - }, + Status: merr.Status(mockErr), }, nil }) @@ -143,14 +140,11 @@ func TestGetIndexNodeMetrics(t *testing.T) { assert.True(t, info.HasError) // failed + mockErr := errors.New("mocked error") info, err = svr.getIndexNodeMetrics(ctx, req, &mockMetricIndexNodeClient{ mock: func() (*milvuspb.GetMetricsResponse, error) { return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "mock fail", - }, - Response: "", + Status: merr.Status(mockErr), ComponentName: "indexnode100", }, nil }, @@ -187,11 +181,7 @@ func TestGetIndexNodeMetrics(t *testing.T) { resp, err := metricsinfo.MarshalComponentInfos(nodeInfos) if err != nil { return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, - Response: "", + Status: merr.Status(err), ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, nodeID), }, nil } diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index aa7c4d1fa3..fb848cb4bc 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -186,8 +186,7 @@ func TestAssignSegmentID(t *testing.T) { SegmentIDRequests: []*datapb.SegmentIDRequest{req}, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) t.Run("assign segment with invalid collection", func(t *testing.T) { @@ -327,8 +326,7 @@ func TestFlush(t *testing.T) { closeTestServer(t, svr) resp, err := svr.Flush(context.Background(), req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -437,8 +435,7 @@ func TestGetSegmentStates(t *testing.T) { SegmentIDs: []int64{0}, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -503,7 +500,7 @@ func TestGetInsertBinlogPaths(t *testing.T) { } resp, err := svr.GetInsertBinlogPaths(svr.ctx, req) assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrSegmentNotFound) }) t.Run("with closed server", func(t *testing.T) { @@ -513,8 +510,7 @@ func TestGetInsertBinlogPaths(t *testing.T) { SegmentID: 0, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -537,8 +533,7 @@ func TestGetCollectionStatistics(t *testing.T) { CollectionID: 0, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -560,8 +555,7 @@ func TestGetPartitionStatistics(t *testing.T) { closeTestServer(t, svr) resp, err := svr.GetPartitionStatistics(context.Background(), &datapb.GetPartitionStatisticsRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -623,7 +617,7 @@ func TestGetSegmentInfo(t *testing.T) { } resp, err := svr.GetSegmentInfo(svr.ctx, req) assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrSegmentNotFound) }) t.Run("with closed server", func(t *testing.T) { svr := newTestServer(t, nil) @@ -632,8 +626,7 @@ func TestGetSegmentInfo(t *testing.T) { SegmentIDs: []int64{}, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) t.Run("with dropped segment", func(t *testing.T) { svr := newTestServer(t, nil) @@ -826,8 +819,7 @@ func TestGetFlushedSegments(t *testing.T) { closeTestServer(t, svr) resp, err := svr.GetFlushedSegments(context.Background(), &datapb.GetFlushedSegmentsRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) }) } @@ -931,8 +923,7 @@ func TestGetSegmentsByStates(t *testing.T) { closeTestServer(t, svr) resp, err := svr.GetSegmentsByStates(context.Background(), &datapb.GetSegmentsByStatesRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) }) } @@ -1137,7 +1128,7 @@ func TestServer_ShowConfigurations(t *testing.T) { svr.stateCode.Store(commonpb.StateCode_Initializing) resp, err := svr.ShowConfigurations(svr.ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) // normal case svr.stateCode.Store(stateSave) @@ -1486,7 +1477,7 @@ func TestSaveBinlogPaths(t *testing.T) { Flushed: false, }) assert.NoError(t, err) - assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_SegmentNotFound) + assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound) }) t.Run("SaveNotExistSegment", func(t *testing.T) { @@ -1540,7 +1531,7 @@ func TestSaveBinlogPaths(t *testing.T) { Flushed: false, }) assert.NoError(t, err) - assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_SegmentNotFound) + assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound) }) t.Run("with channel not matched", func(t *testing.T) { @@ -1562,7 +1553,7 @@ func TestSaveBinlogPaths(t *testing.T) { Channel: "test", }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrChannelNotFound) }) t.Run("with closed server", func(t *testing.T) { @@ -1570,8 +1561,7 @@ func TestSaveBinlogPaths(t *testing.T) { closeTestServer(t, svr) resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetReason()) + assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) }) /* t.Run("test save dropped segment and remove channel", func(t *testing.T) { @@ -1760,7 +1750,7 @@ func TestDropVirtualChannel(t *testing.T) { ChannelName: "ch2", }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_MetaFailed, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrChannelNotFound) }) t.Run("with closed server", func(t *testing.T) { @@ -1768,8 +1758,7 @@ func TestDropVirtualChannel(t *testing.T) { closeTestServer(t, svr) resp, err := svr.DropVirtualChannel(context.Background(), &datapb.DropVirtualChannelRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -2426,13 +2415,13 @@ func TestShouldDropChannel(t *testing.T) { } myRoot := &myRootCoord{} myRoot.EXPECT().AllocTimestamp(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocTimestampResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), Count: 1, }, nil) myRoot.EXPECT().AllocID(mock.Anything, mock.Anything).Return(&rootcoordpb.AllocIDResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), ID: int64(tsoutil.ComposeTSByTime(time.Now(), 0)), Count: 1, }, nil) @@ -2479,7 +2468,7 @@ func TestShouldDropChannel(t *testing.T) { t.Run("channel name not in kv, collection exist", func(t *testing.T) { myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), CollectionID: 0, }, nil).Once() assert.False(t, svr.handler.CheckShouldDropChannel("ch99", 0)) @@ -2488,7 +2477,7 @@ func TestShouldDropChannel(t *testing.T) { t.Run("collection name in kv, collection exist", func(t *testing.T) { myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), CollectionID: 0, }, nil).Once() assert.False(t, svr.handler.CheckShouldDropChannel("ch1", 0)) @@ -2508,7 +2497,7 @@ func TestShouldDropChannel(t *testing.T) { require.NoError(t, err) myRoot.EXPECT().DescribeCollection(mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), CollectionID: 0, }, nil).Once() assert.True(t, svr.handler.CheckShouldDropChannel("ch1", 0)) @@ -3011,8 +3000,7 @@ func TestGetRecoveryInfo(t *testing.T) { closeTestServer(t, svr) resp, err := svr.GetRecoveryInfo(context.TODO(), &datapb.GetRecoveryInfoRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -3077,8 +3065,7 @@ func TestGetCompactionState(t *testing.T) { resp, err := svr.GetCompactionState(context.Background(), &milvuspb.GetCompactionStateRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -3139,8 +3126,7 @@ func TestManualCompaction(t *testing.T) { Timetravel: 1, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -3190,8 +3176,7 @@ func TestGetCompactionStateWithPlans(t *testing.T) { CompactionID: 1, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -3347,7 +3332,7 @@ type rootCoordSegFlushComplete struct { // SegmentFlushCompleted, override default behavior func (rc *rootCoordSegFlushComplete) SegmentFlushCompleted(ctx context.Context, req *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { if rc.flag { - return &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, nil + return merr.Status(nil), nil } return &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, nil } @@ -3424,7 +3409,7 @@ func TestGetFlushState(t *testing.T) { resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) assert.NoError(t, err) assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Flushed: true, }, resp) }) @@ -3472,7 +3457,7 @@ func TestGetFlushState(t *testing.T) { resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) assert.NoError(t, err) assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Flushed: false, }, resp) }) @@ -3520,7 +3505,7 @@ func TestGetFlushState(t *testing.T) { resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) assert.NoError(t, err) assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Flushed: true, }, resp) }) @@ -3556,7 +3541,7 @@ func TestGetFlushState(t *testing.T) { }) assert.NoError(t, err) assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Flushed: true, }, resp) }) @@ -3592,7 +3577,7 @@ func TestGetFlushState(t *testing.T) { }) assert.NoError(t, err) assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Flushed: false, }, resp) }) @@ -3611,7 +3596,7 @@ func TestGetFlushState(t *testing.T) { }) assert.NoError(t, err) assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Flushed: true, }, resp) }) @@ -3680,7 +3665,7 @@ func TestGetFlushAllState(t *testing.T) { svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything). Return(&milvuspb.ListDatabasesResponse{ DbNames: []string{"db1"}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), }, nil).Maybe() } @@ -3692,7 +3677,7 @@ func TestGetFlushAllState(t *testing.T) { } else { svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything). Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), CollectionIds: []int64{collection}, }, nil).Maybe() } @@ -3705,7 +3690,7 @@ func TestGetFlushAllState(t *testing.T) { } else { svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), VirtualChannelNames: vchannels, }, nil).Maybe() } @@ -3723,8 +3708,10 @@ func TestGetFlushAllState(t *testing.T) { assert.NoError(t, err) if test.ExpectedSuccess { assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - } else { + } else if test.ServerIsHealthy { assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) + } else { + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) } assert.Equal(t, test.ExpectedFlushed, resp.GetFlushed()) }) @@ -3760,25 +3747,25 @@ func TestGetFlushAllStateWithDB(t *testing.T) { svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything). Return(&milvuspb.ListDatabasesResponse{ DbNames: []string{dbName}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), }, nil).Maybe() } else { svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ListDatabases(mock.Anything, mock.Anything). Return(&milvuspb.ListDatabasesResponse{ DbNames: []string{}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), }, nil).Maybe() } svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().ShowCollections(mock.Anything, mock.Anything). Return(&milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), CollectionIds: []int64{collectionID}, }, nil).Maybe() svr.rootCoordClient.(*mocks.MockRootCoordClient).EXPECT().DescribeCollectionInternal(mock.Anything, mock.Anything). Return(&milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), VirtualChannelNames: vchannels, CollectionID: collectionID, CollectionName: collectionName, @@ -3882,8 +3869,7 @@ func TestDataCoordServer_SetSegmentState(t *testing.T) { NewState: commonpb.SegmentState_Flushed, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) } @@ -3958,8 +3944,7 @@ func TestDataCoord_Import(t *testing.T) { }, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) - assert.Equal(t, msgDataCoordIsUnhealthy(paramtable.GetNodeID()), resp.Status.GetReason()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) }) t.Run("test update segment stat", func(t *testing.T) { @@ -3987,7 +3972,7 @@ func TestDataCoord_Import(t *testing.T) { }}, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + assert.ErrorIs(t, merr.Error(status), merr.ErrServiceNotReady) }) } @@ -4119,7 +4104,7 @@ func TestDataCoord_SaveImportSegment(t *testing.T) { status, err := svr.SaveImportSegment(context.TODO(), &datapb.SaveImportSegmentRequest{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_DataCoordNA, status.GetErrorCode()) + assert.ErrorIs(t, merr.Error(status), merr.ErrServiceNotReady) }) } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index bc22315d94..8e3292bc52 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -23,6 +23,7 @@ import ( "strconv" "sync" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.opentelemetry.io/otel" "go.uber.org/zap" @@ -47,11 +48,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -// checks whether server in Healthy State -func (s *Server) isClosed() bool { - return s.stateCode.Load() != commonpb.StateCode_Healthy -} - // GetTimeTickChannel legacy API, returns time tick channel name func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ @@ -63,10 +59,7 @@ func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTime // GetStatisticsChannel legacy API, returns statistics channel name func (s *Server) GetStatisticsChannel(ctx context.Context, req *internalpb.GetStatisticsChannelRequest) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "no statistics channel", - }, + Status: merr.Status(merr.WrapErrChannelNotFound("no statistics channel")), }, nil } @@ -81,18 +74,11 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F zap.Bool("isImporting", req.GetIsImport())) ctx, sp := otel.Tracer(typeutil.DataCoordRole).Start(ctx, "DataCoord-Flush") defer sp.End() - resp := &datapb.FlushResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - }, - DbID: 0, - CollectionID: 0, - SegmentIDs: []int64{}, - } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.FlushResponse{ + Status: merr.Status(err), + }, nil } // generate a timestamp timeOfSeal, all data before timeOfSeal is guaranteed to be sealed or flushed @@ -107,8 +93,10 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F sealedSegmentIDs, err := s.segmentManager.SealAllSegments(ctx, req.GetCollectionID(), req.GetSegmentIDs(), req.GetIsImport()) if err != nil { - resp.Status.Reason = fmt.Sprintf("failed to flush %d, %s", req.CollectionID, err) - return resp, nil + return &datapb.FlushResponse{ + Status: merr.Status(errors.Wrapf(err, "failed to flush colleciont %d", + req.GetCollectionID())), + }, nil } sealedSegmentsIDDict := make(map[UniqueID]bool) @@ -143,8 +131,9 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F return nil }, retry.Attempts(60)) // about 3min if err != nil { - resp.Status = merr.Status(err) - return resp, nil + return &datapb.FlushResponse{ + Status: merr.Status(err), + }, nil } log.Info("flush response with segments", @@ -153,25 +142,24 @@ func (s *Server) Flush(ctx context.Context, req *datapb.FlushRequest) (*datapb.F zap.Int64s("flushSegments", flushSegmentIDs), zap.Time("timeOfSeal", timeOfSeal), zap.Time("flushTs", tsoutil.PhysicalTime(ts))) - resp.Status.ErrorCode = commonpb.ErrorCode_Success - resp.DbID = req.GetDbID() - resp.CollectionID = req.GetCollectionID() - resp.SegmentIDs = sealedSegmentIDs - resp.TimeOfSeal = timeOfSeal.Unix() - resp.FlushSegmentIDs = flushSegmentIDs - resp.FlushTs = ts - return resp, nil + + return &datapb.FlushResponse{ + Status: merr.Status(nil), + DbID: req.GetDbID(), + CollectionID: req.GetCollectionID(), + SegmentIDs: sealedSegmentIDs, + TimeOfSeal: timeOfSeal.Unix(), + FlushSegmentIDs: flushSegmentIDs, + FlushTs: ts, + }, nil } // AssignSegmentID applies for segment ids and make allocation for records. func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentIDRequest) (*datapb.AssignSegmentIDResponse, error) { log := log.Ctx(ctx) - if s.isClosed() { + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return &datapb.AssignSegmentIDResponse{ - Status: &commonpb.Status{ - Reason: serverNotServingErrMsg, - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(err), }, nil } @@ -240,16 +228,15 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI // GetSegmentStates returns segments state func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentStatesRequest) (*datapb.GetSegmentStatesResponse, error) { - resp := &datapb.GetSegmentStatesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetSegmentStatesResponse{ + Status: merr.Status(err), + }, nil } + resp := &datapb.GetSegmentStatesResponse{ + Status: merr.Status(nil), + } for _, segmentID := range req.SegmentIDs { state := &datapb.SegmentStateInfo{ SegmentID: segmentID, @@ -263,26 +250,26 @@ func (s *Server) GetSegmentStates(ctx context.Context, req *datapb.GetSegmentSta } resp.States = append(resp.States, state) } - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } // GetInsertBinlogPaths returns binlog paths info for requested segments func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsertBinlogPathsRequest) (*datapb.GetInsertBinlogPathsResponse, error) { - resp := &datapb.GetInsertBinlogPathsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetInsertBinlogPathsResponse{ + Status: merr.Status(err), + }, nil } segment := s.meta.GetHealthySegment(req.GetSegmentID()) if segment == nil { - resp.Status.Reason = "segment not found" - return resp, nil + return &datapb.GetInsertBinlogPathsResponse{ + Status: merr.Status(merr.WrapErrSegmentNotFound(req.GetSegmentID())), + }, nil + } + + resp := &datapb.GetInsertBinlogPathsResponse{ + Status: merr.Status(nil), } binlogs := segment.GetBinlogs() fids := make([]UniqueID, 0, len(binlogs)) @@ -296,7 +283,6 @@ func (s *Server) GetInsertBinlogPaths(ctx context.Context, req *datapb.GetInsert } paths = append(paths, &internalpb.StringList{Values: p}) } - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.FieldIDs = fids resp.Paths = paths return resp, nil @@ -309,17 +295,16 @@ func (s *Server) GetCollectionStatistics(ctx context.Context, req *datapb.GetCol zap.Int64("collectionID", req.GetCollectionID()), ) log.Info("received request to get collection statistics") - resp := &datapb.GetCollectionStatisticsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetCollectionStatisticsResponse{ + Status: merr.Status(err), + }, nil } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + + resp := &datapb.GetCollectionStatisticsResponse{ + Status: merr.Status(nil), } nums := s.meta.GetNumRowsOfCollection(req.CollectionID) - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)}) log.Info("success to get collection statistics", zap.Any("response", resp)) return resp, nil @@ -334,13 +319,12 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart zap.Int64s("partitionIDs", req.GetPartitionIDs()), ) resp := &datapb.GetPartitionStatisticsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetPartitionStatisticsResponse{ + Status: merr.Status(err), + }, nil } nums := int64(0) if len(req.GetPartitionIDs()) == 0 { @@ -350,7 +334,6 @@ func (s *Server) GetPartitionStatistics(ctx context.Context, req *datapb.GetPart num := s.meta.GetNumRowsOfPartition(req.CollectionID, partID) nums += num } - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Stats = append(resp.Stats, &commonpb.KeyValuePair{Key: "row_count", Value: strconv.FormatInt(nums, 10)}) log.Info("success to get partition statistics", zap.Any("response", resp)) return resp, nil @@ -369,13 +352,12 @@ func (s *Server) GetSegmentInfoChannel(ctx context.Context, req *datapb.GetSegme func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoRequest) (*datapb.GetSegmentInfoResponse, error) { log := log.Ctx(ctx) resp := &datapb.GetSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetSegmentInfoResponse{ + Status: merr.Status(err), + }, nil } infos := make([]*datapb.SegmentInfo, 0, len(req.GetSegmentIDs())) channelCPs := make(map[string]*msgpb.MsgPosition) @@ -386,7 +368,8 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR if info == nil { log.Warn("failed to get segment, this may have been cleaned", zap.Int64("segmentID", id)) - resp.Status.Reason = msgSegmentNotFound(id) + err := merr.WrapErrSegmentNotFound(id) + resp.Status = merr.Status(err) return resp, nil } @@ -401,7 +384,8 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR } else { info = s.meta.GetHealthySegment(id) if info == nil { - resp.Status.Reason = msgSegmentNotFound(id) + err := merr.WrapErrSegmentNotFound(id) + resp.Status = merr.Status(err) return resp, nil } clonedInfo := info.Clone() @@ -413,7 +397,6 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR channelCPs[vchannel] = s.meta.GetChannelCheckpoint(vchannel) } } - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Infos = infos resp.ChannelCheckpoint = channelCPs return resp, nil @@ -422,11 +405,8 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR // SaveBinlogPaths updates segment related binlog path // works for Checkpoints and Flush func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPathsRequest) (*commonpb.Status, error) { - resp := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} - - if s.isClosed() { - resp.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return merr.Status(err), nil } log := log.Ctx(ctx).With( @@ -449,10 +429,9 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath // Also avoid to handle segment not found error if not the owner of shard if !req.GetImporting() && len(channelName) != 0 { if !s.channelManager.Match(nodeID, channelName) { - failResponse(resp, fmt.Sprintf("channel %s is not watched on node %d", channelName, nodeID)) - resp.ErrorCode = commonpb.ErrorCode_MetaFailed - log.Warn("node is not matched with channel", zap.String("channel", channelName)) - return resp, nil + err := merr.WrapErrChannelNotFound(channelName, fmt.Sprintf("for node %d", nodeID)) + log.Warn("node is not matched with channel", zap.String("channel", channelName), zap.Error(err)) + return merr.Status(err), nil } } @@ -461,19 +440,18 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath segment := s.meta.GetSegment(segmentID) if segment == nil { - log.Error("failed to get segment") - failResponseWithCode(resp, commonpb.ErrorCode_SegmentNotFound, fmt.Sprintf("failed to get segment %d", segmentID)) - return resp, nil + err := merr.WrapErrSegmentNotFound(segmentID) + log.Warn("failed to get segment", zap.Error(err)) + return merr.Status(err), nil } if segment.State == commonpb.SegmentState_Dropped { log.Info("save to dropped segment, ignore this request") - resp.ErrorCode = commonpb.ErrorCode_Success - return resp, nil + return merr.Status(nil), nil } else if !isSegmentHealthy(segment) { - log.Error("failed to get segment") - failResponseWithCode(resp, commonpb.ErrorCode_SegmentNotFound, fmt.Sprintf("failed to get segment %d", segmentID)) - return resp, nil + err := merr.WrapErrSegmentNotFound(segmentID) + log.Warn("failed to get segment, the segment not healthy", zap.Error(err)) + return merr.Status(err), nil } if req.GetDropped() { @@ -493,8 +471,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath req.GetStartPositions()) if err != nil { log.Error("save binlog and checkpoints failed", zap.Error(err)) - resp.Reason = err.Error() - return resp, nil + return merr.Status(err), nil } log.Info("flush segment with meta", zap.Any("meta", req.GetField2BinlogPaths())) @@ -513,8 +490,7 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath } } } - resp.ErrorCode = commonpb.ErrorCode_Success - return resp, nil + return merr.Status(nil), nil } // DropVirtualChannel notifies vchannel dropped @@ -522,13 +498,12 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtualChannelRequest) (*datapb.DropVirtualChannelResponse, error) { log := log.Ctx(ctx) resp := &datapb.DropVirtualChannelResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.DropVirtualChannelResponse{ + Status: merr.Status(err), + }, nil } channel := req.GetChannelName() @@ -538,8 +513,8 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual // validate nodeID := req.GetBase().GetSourceID() if !s.channelManager.Match(nodeID, channel) { - failResponse(resp.Status, fmt.Sprintf("channel %s is not watched on node %d", channel, nodeID)) - resp.Status.ErrorCode = commonpb.ErrorCode_MetaFailed + err := merr.WrapErrChannelNotFound(channel, fmt.Sprintf("for node %d", nodeID)) + resp.Status = merr.Status(err) log.Warn("node is not matched with channel", zap.String("channel", channel), zap.Int64("nodeID", nodeID)) return resp, nil } @@ -566,7 +541,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual err := s.meta.UpdateDropChannelSegmentInfo(channel, segments) if err != nil { log.Error("Update Drop Channel segment info failed", zap.String("channel", channel), zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } @@ -580,19 +555,15 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual metrics.CleanupDataCoordNumStoredRows(collectionID) // no compaction triggered in Drop procedure - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } // SetSegmentState reset the state of the given segment. func (s *Server) SetSegmentState(ctx context.Context, req *datapb.SetSegmentStateRequest) (*datapb.SetSegmentStateResponse, error) { log := log.Ctx(ctx) - if s.isClosed() { + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return &datapb.SetSegmentStateResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: serverNotServingErrMsg, - }, + Status: merr.Status(err), }, nil } err := s.meta.SetState(req.GetSegmentId(), req.GetNewState()) @@ -648,20 +619,19 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf ) log.Info("get recovery info request received") resp := &datapb.GetRecoveryInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetRecoveryInfoResponse{ + Status: merr.Status(err), + }, nil } dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID) if err != nil { log.Error("get collection info from rootcoord failed", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } channels := dresp.GetVirtualChannelNames() @@ -688,9 +658,9 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf for id := range flushedIDs { segment := s.meta.GetSegment(id) if segment == nil { - errMsg := fmt.Sprintf("failed to get segment %d", id) - log.Error(errMsg) - resp.Status.Reason = errMsg + err := merr.WrapErrSegmentNotFound(id) + log.Warn("failed to get segment", zap.Int64("segmentID", id)) + resp.Status = merr.Status(err) return resp, nil } // Skip non-flushing, non-flushed and dropped segments. @@ -766,7 +736,6 @@ func (s *Server) GetRecoveryInfo(ctx context.Context, req *datapb.GetRecoveryInf resp.Channels = channelInfos resp.Binlogs = binlogs - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } @@ -782,13 +751,12 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI ) log.Info("get recovery info request received") resp := &datapb.GetRecoveryInfoResponseV2{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetRecoveryInfoResponseV2{ + Status: merr.Status(err), + }, nil } dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID) @@ -796,7 +764,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI log.Error("get collection info from rootcoord failed", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } channels := dresp.GetVirtualChannelNames() @@ -819,9 +787,9 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI for id := range flushedIDs { segment := s.meta.GetSegment(id) if segment == nil { - errMsg := fmt.Sprintf("failed to get segment %d", id) - log.Error(errMsg) - resp.Status.Reason = errMsg + err := merr.WrapErrSegmentNotFound(id) + log.Warn("failed to get segment", zap.Int64("segmentID", id)) + resp.Status = merr.Status(err) return resp, nil } // Skip non-flushing, non-flushed and dropped segments. @@ -861,7 +829,6 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI resp.Channels = channelInfos resp.Segments = segmentInfos - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } @@ -870,9 +837,7 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedSegmentsRequest) (*datapb.GetFlushedSegmentsResponse, error) { log := log.Ctx(ctx) resp := &datapb.GetFlushedSegmentsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } collectionID := req.GetCollectionID() partitionID := req.GetPartitionID() @@ -880,9 +845,10 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), ) - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetFlushedSegmentsResponse{ + Status: merr.Status(err), + }, nil } var segmentIDs []UniqueID if partitionID < 0 { @@ -907,7 +873,6 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS } resp.Segments = ret - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } @@ -916,9 +881,7 @@ func (s *Server) GetFlushedSegments(ctx context.Context, req *datapb.GetFlushedS func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegmentsByStatesRequest) (*datapb.GetSegmentsByStatesResponse, error) { log := log.Ctx(ctx) resp := &datapb.GetSegmentsByStatesResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } collectionID := req.GetCollectionID() partitionID := req.GetPartitionID() @@ -927,9 +890,10 @@ func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Any("states", states)) - if s.isClosed() { - resp.Status.Reason = serverNotServingErrMsg - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GetSegmentsByStatesResponse{ + Status: merr.Status(err), + }, nil } var segmentIDs []UniqueID if partitionID < 0 { @@ -951,25 +915,14 @@ func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment } resp.Segments = ret - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } // ShowConfigurations returns the configurations of DataCoord matching req.Pattern func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { - log := log.Ctx(ctx) - if s.isClosed() { - log.Warn("DataCoord.ShowConfigurations failed", - zap.Int64("nodeId", paramtable.GetNodeID()), - zap.String("req", req.Pattern), - zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID()))) - + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return &internalpb.ShowConfigurationsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataCoordIsUnhealthy(paramtable.GetNodeID()), - }, - Configuations: nil, + Status: merr.Status(err), }, nil } @@ -992,19 +945,9 @@ func (s *Server) ShowConfigurations(ctx context.Context, req *internalpb.ShowCon // it may include SystemMetrics, Topology metrics, etc. func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { log := log.Ctx(ctx) - if s.isClosed() { - log.Warn("DataCoord.GetMetrics failed", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID()))) - + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { return &milvuspb.GetMetricsResponse{ - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataCoordIsUnhealthy(paramtable.GetNodeID()), - }, - Response: "", + Status: merr.Status(err), }, nil } @@ -1048,11 +991,7 @@ func (s *Server) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest return &milvuspb.GetMetricsResponse{ ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, paramtable.GetNodeID()), - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, - Response: "", + Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), }, nil } @@ -1064,31 +1003,28 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa log.Info("received manual compaction") resp := &milvuspb.ManualCompactionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - log.Warn("failed to execute manual compaction", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID()))) - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &milvuspb.ManualCompactionResponse{ + Status: merr.Status(err), + }, nil } if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { - resp.Status.Reason = "compaction disabled" + resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) return resp, nil } id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID) if err != nil { log.Error("failed to trigger manual compaction", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } log.Info("success to trigger manual compaction", zap.Int64("compactionID", id)) - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.CompactionID = id return resp, nil } @@ -1100,19 +1036,17 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac ) log.Info("received get compaction state request") resp := &milvuspb.GetCompactionStateResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - log.Warn("failed to get compaction state", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID()))) - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &milvuspb.GetCompactionStateResponse{ + Status: merr.Status(err), + }, nil } if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { - resp.Status.Reason = "compaction disabled" + resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) return resp, nil } @@ -1124,7 +1058,6 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac resp.CompletedPlanNo = int64(completedCnt) resp.TimeoutPlanNo = int64(timeoutCnt) resp.FailedPlanNo = int64(failedCnt) - resp.Status.ErrorCode = commonpb.ErrorCode_Success log.Info("success to get compaction state", zap.Any("state", state), zap.Int("executing", executingCnt), zap.Int("completed", completedCnt), zap.Int("failed", failedCnt), zap.Int("timeout", timeoutCnt), zap.Int64s("plans", lo.Map(tasks, func(t *compactionTask, _ int) int64 { @@ -1143,18 +1076,17 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb. ) log.Info("received the request to get compaction state with plans") + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &milvuspb.GetCompactionPlansResponse{ + Status: merr.Status(err), + }, nil + } + resp := &milvuspb.GetCompactionPlansResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, + Status: merr.Status(nil), } - - if s.isClosed() { - log.Warn("failed to get compaction state with plans", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID()))) - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil - } - if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { - resp.Status.Reason = "compaction disabled" + resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) return resp, nil } @@ -1165,7 +1097,6 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb. state, _, _, _, _ := getCompactionState(tasks) - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.State = state log.Info("success to get state with plans", zap.Any("state", state), zap.Any("merge infos", resp.MergeInfos), zap.Int64s("plans", lo.Map(tasks, func(t *compactionTask, _ int) int64 { @@ -1226,15 +1157,13 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq ) log.Info("receive watch channels request") resp := &datapb.WatchChannelsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - log.Warn("failed to watch channels request", zap.Error(errDataCoordIsUnhealthy(paramtable.GetNodeID()))) - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.WatchChannelsResponse{ + Status: merr.Status(err), + }, nil } for _, channelName := range req.GetChannelNames() { ch := &channel{ @@ -1247,17 +1176,16 @@ func (s *Server) WatchChannels(ctx context.Context, req *datapb.WatchChannelsReq err := s.channelManager.Watch(ch) if err != nil { log.Warn("fail to watch channelName", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } if err := s.meta.catalog.MarkChannelAdded(ctx, ch.Name); err != nil { // TODO: add background task to periodically cleanup the orphaned channel add marks. log.Error("failed to mark channel added", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } } - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } @@ -1267,13 +1195,13 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq log := log.Ctx(ctx).With(zap.Int64("collection", req.GetCollectionID()), zap.Time("flushTs", tsoutil.PhysicalTime(req.GetFlushTs()))). WithRateGroup("dc.GetFlushState", 1, 60) - resp := &milvuspb.GetFlushStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}} - if s.isClosed() { - log.Warn("DataCoord receive GetFlushState request, server closed") - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &milvuspb.GetFlushStateResponse{ + Status: merr.Status(err), + }, nil } + resp := &milvuspb.GetFlushStateResponse{Status: merr.Status(nil)} if len(req.GetSegmentIDs()) > 0 { var unflushed []UniqueID for _, sid := range req.GetSegmentIDs() { @@ -1287,7 +1215,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq if len(unflushed) != 0 { log.RatedInfo(10, "DataCoord receive GetFlushState request, Flushed is false", zap.Int64s("unflushed", unflushed), zap.Int("len", len(unflushed))) resp.Flushed = false - resp.Status.ErrorCode = commonpb.ErrorCode_Success + return resp, nil } } @@ -1305,7 +1233,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq if len(channels) == 0 { // For compatibility with old client resp.Flushed = true - resp.Status.ErrorCode = commonpb.ErrorCode_Success + log.Info("GetFlushState all flushed without checking flush ts") return resp, nil } @@ -1314,7 +1242,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq cp := s.meta.GetChannelCheckpoint(channel) if cp == nil || cp.GetTimestamp() < req.GetFlushTs() { resp.Flushed = false - resp.Status.ErrorCode = commonpb.ErrorCode_Success + log.RatedInfo(10, "GetFlushState failed, channel unflushed", zap.String("channel", channel), zap.Time("CP", tsoutil.PhysicalTime(cp.GetTimestamp())), zap.Duration("lag", tsoutil.PhysicalTime(req.GetFlushTs()).Sub(tsoutil.PhysicalTime(cp.GetTimestamp())))) @@ -1323,7 +1251,6 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq } resp.Flushed = true - resp.Status.ErrorCode = commonpb.ErrorCode_Success log.Info("GetFlushState all flushed") return resp, nil @@ -1332,17 +1259,18 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq // GetFlushAllState checks if all DML messages before `FlushAllTs` have been flushed. func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAllStateRequest) (*milvuspb.GetFlushAllStateResponse, error) { log := log.Ctx(ctx) - resp := &milvuspb.GetFlushAllStateResponse{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}} - if s.isClosed() { - log.Warn("DataCoord receive GetFlushAllState request, server closed") - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &milvuspb.GetFlushAllStateResponse{ + Status: merr.Status(err), + }, nil } + resp := &milvuspb.GetFlushAllStateResponse{Status: merr.Status(nil)} + dbsRsp, err := s.broker.ListDatabases(ctx) if err != nil { log.Warn("failed to ListDatabases", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } dbNames := dbsRsp.DbNames @@ -1351,7 +1279,7 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll return dbName == req.GetDbName() }) if len(dbNames) == 0 { - resp.Status.Reason = merr.WrapErrDatabaseNotFound(req.GetDbName()).Error() + resp.Status = merr.Status(merr.WrapErrDatabaseNotFound(req.GetDbName())) return resp, nil } } @@ -1360,7 +1288,7 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll showColRsp, err := s.broker.ShowCollections(ctx, dbName) if err != nil { log.Warn("failed to ShowCollections", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } @@ -1368,80 +1296,69 @@ func (s *Server) GetFlushAllState(ctx context.Context, req *milvuspb.GetFlushAll describeColRsp, err := s.broker.DescribeCollectionInternal(ctx, collection) if err != nil { log.Warn("failed to DescribeCollectionInternal", zap.Error(err)) - resp.Status.Reason = err.Error() + resp.Status = merr.Status(err) return resp, nil } for _, channel := range describeColRsp.GetVirtualChannelNames() { channelCP := s.meta.GetChannelCheckpoint(channel) if channelCP == nil || channelCP.GetTimestamp() < req.GetFlushAllTs() { resp.Flushed = false - resp.Status.ErrorCode = commonpb.ErrorCode_Success + return resp, nil } } } } resp.Flushed = true - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } -// Import distributes the import tasks to dataNodes. -// It returns a failed status if no dataNode is available or if any error occurs. -func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { +// Import distributes the import tasks to DataNodes. +// It returns a failed status if no DataNode is available or if any error occurs. +func (s *Server) Import(ctx context.Context, req *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { log := log.Ctx(ctx) - log.Info("DataCoord receives import request", zap.Any("import task request", itr)) + log.Info("DataCoord receives import request", zap.Any("req", req)) resp := &datapb.ImportTaskResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } - if s.isClosed() { - log.Error("failed to import for closed DataCoord service") - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.ImportTaskResponse{ + Status: merr.Status(err), + }, nil } nodes := s.sessionManager.getLiveNodeIDs() if len(nodes) == 0 { - log.Error("import failed as all DataNodes are offline") - resp.Status.Reason = "no data node available" + log.Warn("import failed as all DataNodes are offline") + resp.Status = merr.Status(merr.WrapErrNodeLackAny("no live DataNode")) return resp, nil } - log.Info("available DataNodes are", zap.Int64s("node ID", nodes)) + log.Info("available DataNodes are", zap.Int64s("nodeIDs", nodes)) - avaNodes := getDiff(nodes, itr.GetWorkingNodes()) + avaNodes := getDiff(nodes, req.GetWorkingNodes()) if len(avaNodes) > 0 { // If there exists available DataNodes, pick one at random. resp.DatanodeId = avaNodes[rand.Intn(len(avaNodes))] - log.Info("picking a free dataNode", - zap.Any("all dataNodes", nodes), - zap.Int64("picking free dataNode with ID", resp.GetDatanodeId())) - s.cluster.Import(s.ctx, resp.GetDatanodeId(), itr) + log.Info("picking a free DataNode", + zap.Any("all DataNodes", nodes), + zap.Int64("picking free DataNode with ID", resp.GetDatanodeId())) + s.cluster.Import(s.ctx, resp.GetDatanodeId(), req) } else { - // No dataNode is available, reject the import request. - msg := "all DataNodes are busy working on data import, the task has been rejected and wait for idle datanode" - log.Info(msg, zap.Int64("task ID", itr.GetImportTask().GetTaskId())) - resp.Status.Reason = msg + // No DataNode is available, reject the import request. + msg := "all DataNodes are busy working on data import, the task has been rejected and wait for idle DataNode" + log.Info(msg, zap.Int64("taskID", req.GetImportTask().GetTaskId())) + resp.Status = merr.Status(merr.WrapErrNodeLackAny("no available DataNode")) return resp, nil } - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } // UpdateSegmentStatistics updates a segment's stats. func (s *Server) UpdateSegmentStatistics(ctx context.Context, req *datapb.UpdateSegmentStatisticsRequest) (*commonpb.Status, error) { - log := log.Ctx(ctx) - resp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn("failed to update segment stat for closed server") - resp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return merr.Status(err), nil } s.updateSegmentStatistics(req.GetStats()) return merr.Status(nil), nil @@ -1450,20 +1367,14 @@ func (s *Server) UpdateSegmentStatistics(ctx context.Context, req *datapb.Update // UpdateChannelCheckpoint updates channel checkpoint in dataCoord. func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.UpdateChannelCheckpointRequest) (*commonpb.Status, error) { log := log.Ctx(ctx) - resp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - } - if s.isClosed() { - log.Warn("failed to update channel position for closed server") - resp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return merr.Status(err), nil } err := s.meta.UpdateChannelCheckpoint(req.GetVChannel(), req.GetPosition()) if err != nil { log.Warn("failed to UpdateChannelCheckpoint", zap.String("vChannel", req.GetVChannel()), zap.Error(err)) - resp.Reason = err.Error() - return resp, nil + return merr.Status(err), nil } return merr.Status(nil), nil @@ -1472,9 +1383,8 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update // ReportDataNodeTtMsgs send datenode timetick messages to dataCoord. func (s *Server) ReportDataNodeTtMsgs(ctx context.Context, req *datapb.ReportDataNodeTtMsgsRequest) (*commonpb.Status, error) { log := log.Ctx(ctx) - if s.isClosed() { - log.Warn("failed to report dataNode ttMsgs on closed server") - return merr.Status(merr.WrapErrServiceUnavailable("Datacoord not ready")), nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return merr.Status(err), nil } for _, ttMsg := range req.GetMsgs() { @@ -1569,22 +1479,15 @@ func (s *Server) SaveImportSegment(ctx context.Context, req *datapb.SaveImportSe zap.Int64("partitionID", req.GetPartitionId()), zap.String("channelName", req.GetChannelName()), zap.Int64("# of rows", req.GetRowNum())) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - if s.isClosed() { - log.Warn("failed to add segment for closed server") - errResp.ErrorCode = commonpb.ErrorCode_DataCoordNA - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return errResp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return merr.Status(err), nil } // Look for the DataNode that watches the channel. ok, nodeID := s.channelManager.getNodeIDByChannelName(req.GetChannelName()) if !ok { - log.Error("no DataNode found for channel", zap.String("channelName", req.GetChannelName())) - errResp.Reason = fmt.Sprint("no DataNode found for channel ", req.GetChannelName()) - return errResp, nil + err := merr.WrapErrChannelNotFound(req.GetChannelName(), "no DataNode watches this channel") + log.Error("no DataNode found for channel", zap.String("channelName", req.GetChannelName()), zap.Error(err)) + return merr.Status(err), nil } // Call DataNode to add the new segment to its own flow graph. cli, err := s.sessionManager.getClient(ctx, nodeID) @@ -1649,33 +1552,20 @@ func (s *Server) UnsetIsImportingState(ctx context.Context, req *datapb.UnsetIsI // Deprecated, do not use it func (s *Server) MarkSegmentsDropped(ctx context.Context, req *datapb.MarkSegmentsDroppedRequest) (*commonpb.Status, error) { log.Info("marking segments dropped", zap.Int64s("segments", req.GetSegmentIds())) - failure := false + var err error for _, segID := range req.GetSegmentIds() { - if err := s.meta.SetState(segID, commonpb.SegmentState_Dropped); err != nil { + if err = s.meta.SetState(segID, commonpb.SegmentState_Dropped); err != nil { // Fail-open. log.Error("failed to set segment state as dropped", zap.Int64("segmentID", segID)) - failure = true + break } } - if failure { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, nil - } - return merr.Status(nil), nil + return merr.Status(err), nil } func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.AlterCollectionRequest) (*commonpb.Status, error) { - log := log.Ctx(ctx) - errResp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "", - } - - if s.isClosed() { - log.Warn("failed to broadcast collection information for closed server") - errResp.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return errResp, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return merr.Status(err), nil } // get collection info from cache @@ -1705,9 +1595,11 @@ func (s *Server) BroadcastAlteredCollection(ctx context.Context, req *datapb.Alt } func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - if s.isClosed() { - reason := errorutil.UnHealthReason("datacoord", paramtable.GetNodeID(), "datacoord is closed") - return &milvuspb.CheckHealthResponse{Status: s.UnhealthyStatus(), IsHealthy: false, Reasons: []string{reason}}, nil + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil } mu := &sync.Mutex{} @@ -1722,12 +1614,12 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque if err != nil { mu.Lock() defer mu.Unlock() - errReasons = append(errReasons, errorutil.UnHealthReason("datanode", nodeID, err.Error())) + errReasons = append(errReasons, errorutil.UnHealthReason("DataNode", nodeID, err.Error())) return err } sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - isHealthy, reason := errorutil.UnHealthReasonWithComponentStatesOrErr("datanode", nodeID, sta, err) + isHealthy, reason := errorutil.UnHealthReasonWithComponentStatesOrErr("DataNode", nodeID, sta, err) if !isHealthy { mu.Lock() defer mu.Unlock() @@ -1746,19 +1638,15 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque } func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { + if err := merr.CheckHealthy(s.GetStateCode()); err != nil { + return &datapb.GcConfirmResponse{ + Status: merr.Status(err), + }, nil + } + resp := &datapb.GcConfirmResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, - GcFinished: false, + Status: merr.Status(nil), } - - if s.isClosed() { - resp.Status.Reason = msgDataCoordIsUnhealthy(paramtable.GetNodeID()) - return resp, nil - } - resp.GcFinished = s.meta.GcConfirm(ctx, request.GetCollectionId(), request.GetPartitionId()) - resp.Status.ErrorCode = commonpb.ErrorCode_Success return resp, nil } diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index d67ea5172b..8d509a9358 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -17,6 +17,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" ) @@ -586,7 +587,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { closeTestServer(t, svr) resp, err := svr.GetRecoveryInfoV2(context.TODO(), &datapb.GetRecoveryInfoRequestV2{}) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - assert.Equal(t, serverNotServingErrMsg, resp.GetStatus().GetReason()) + err = merr.Error(resp.GetStatus()) + assert.ErrorIs(t, err, merr.ErrServiceNotReady) }) } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 3ab2330a60..37bafea790 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -22,7 +22,6 @@ import ( "strings" "time" - "github.com/cockroachdb/errors" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -30,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" ) // Response response interface for verification @@ -51,32 +51,16 @@ func VerifyResponse(response interface{}, err error) error { if resp.GetStatus() == nil { return errNilStatusResponse } - if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return errors.New(resp.GetStatus().GetReason()) - } + return merr.Error(resp.GetStatus()) + case *commonpb.Status: if resp == nil { return errNilResponse } - if resp.ErrorCode != commonpb.ErrorCode_Success { - return errors.New(resp.GetReason()) - } + return merr.Error(resp) default: return errUnknownResponseType } - return nil -} - -// failResponse sets status to failed with unexpected error and reason. -func failResponse(status *commonpb.Status, reason string) { - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - status.Reason = reason -} - -// failResponseWithCode sets status to failed with error code and reason. -func failResponseWithCode(status *commonpb.Status, errCode commonpb.ErrorCode, reason string) { - status.ErrorCode = errCode - status.Reason = reason } func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo) []*SegmentInfo { diff --git a/internal/datacoord/util_test.go b/internal/datacoord/util_test.go index 16b59fcfa5..0b9b564a5d 100644 --- a/internal/datacoord/util_test.go +++ b/internal/datacoord/util_test.go @@ -110,7 +110,7 @@ func (suite *UtilSuite) TestVerifyResponse() { for _, c := range cases { r := VerifyResponse(c.resp, c.err) if c.equalValue { - suite.EqualValues(c.expected.Error(), r.Error()) + suite.Contains(r.Error(), c.expected.Error()) } else { suite.Equal(c.expected, r) } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 5e0458b911..48bd910985 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -59,10 +59,7 @@ func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmCha log.Warn("DataNode WatchDmChannels is not in use") // TODO ERROR OF GRPC NOT IN USE - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "watchDmChannels do nothing", - }, nil + return merr.Status(nil), nil } // GetComponentStates will return current state of DataNode diff --git a/internal/distributed/utils/util.go b/internal/distributed/utils/util.go index 1b08708d7b..f2cc161ead 100644 --- a/internal/distributed/utils/util.go +++ b/internal/distributed/utils/util.go @@ -3,9 +3,10 @@ package utils import ( "time" + "google.golang.org/grpc" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/paramtable" - "google.golang.org/grpc" ) func GracefulStopGRPCServer(s *grpc.Server) { diff --git a/internal/distributed/utils/util_test.go b/internal/distributed/utils/util_test.go index 4469518489..cf65f3feb1 100644 --- a/internal/distributed/utils/util_test.go +++ b/internal/distributed/utils/util_test.go @@ -3,8 +3,9 @@ package utils import ( "testing" - "github.com/milvus-io/milvus/pkg/util/paramtable" "google.golang.org/grpc" + + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestGracefulStopGrpcServer(t *testing.T) { diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index 2bd0f84eb3..1431356629 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -116,8 +116,7 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil { log.Ctx(ctx).Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", req.GetBuildID()), zap.String("clusterID", req.GetClusterID()), zap.Error(err)) - ret.ErrorCode = commonpb.ErrorCode_UnexpectedError - ret.Reason = err.Error() + ret = merr.Status(err) metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc() return ret, nil } diff --git a/internal/metastore/kv/rootcoord/kv_catalog.go b/internal/metastore/kv/rootcoord/kv_catalog.go index 44a7e3b47d..0039028786 100644 --- a/internal/metastore/kv/rootcoord/kv_catalog.go +++ b/internal/metastore/kv/rootcoord/kv_catalog.go @@ -594,7 +594,7 @@ func (kc *Catalog) GetCollectionByName(ctx context.Context, dbID int64, collecti } } - return nil, merr.WrapErrCollectionNotFoundWithDB(dbID, collectionName, fmt.Sprintf("timestample = %d", ts)) + return nil, merr.WrapErrCollectionNotFoundWithDB(dbID, collectionName, fmt.Sprintf("timestamp = %d", ts)) } func (kc *Catalog) ListCollections(ctx context.Context, dbID int64, ts typeutil.Timestamp) ([]*model.Collection, error) { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 47a496f6e3..a460dca3f9 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -2338,22 +2338,6 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) chTicker: node.chTicker, } - constructFailedResponse := func(err error, errCode commonpb.ErrorCode) *milvuspb.MutationResult { - numRows := request.NumRows - errIndex := make([]uint32, numRows) - for i := uint32(0); i < numRows; i++ { - errIndex[i] = i - } - - return &milvuspb.MutationResult{ - Status: &commonpb.Status{ - ErrorCode: errCode, - Reason: err.Error(), - }, - ErrIndex: errIndex, - } - } - log.Debug("Enqueue upsert request in Proxy", zap.Int("len(FieldsData)", len(request.FieldsData)), zap.Int("len(HashKeys)", len(request.HashKeys))) @@ -2380,9 +2364,19 @@ func (node *Proxy) Upsert(ctx context.Context, request *milvuspb.UpsertRequest) // Not every error case changes the status internally // change status there to handle it if it.result.GetStatus().GetErrorCode() == commonpb.ErrorCode_Success { - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + it.result.Status = merr.Status(err) } - return constructFailedResponse(err, it.result.GetStatus().GetErrorCode()), nil + + numRows := request.NumRows + errIndex := make([]uint32, numRows) + for i := uint32(0); i < numRows; i++ { + errIndex[i] = i + } + + return &milvuspb.MutationResult{ + Status: merr.Status(err), + ErrIndex: errIndex, + }, nil } if it.result.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { @@ -2538,7 +2532,7 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* }, } if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - resp.Status.Reason = "proxy is not healthy" + resp.Status = merr.Status(err) return resp, nil } @@ -2588,7 +2582,6 @@ func (node *Proxy) Flush(ctx context.Context, request *milvuspb.FlushRequest) (* metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status = merr.Status(err) return resp, nil } @@ -2945,10 +2938,10 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest) log := log.With(zap.String("db", req.GetDbName())) resp := &milvuspb.FlushAllResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}, + Status: merr.Status(nil), } if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { - resp.Status.Reason = "proxy is not healthy" + resp.Status = merr.Status(err) return resp, nil } log.Info(rpcReceived("FlushAll")) @@ -2979,7 +2972,7 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest) return dbName == req.GetDbName() }) if len(dbNames) == 0 { - resp.Status.Reason = fmt.Sprintf("failed to get db %s", req.GetDbName()) + resp.Status = merr.Status(merr.WrapErrDatabaseNotFound(req.GetDbName())) return resp, nil } } @@ -3027,7 +3020,6 @@ func (node *Proxy) FlushAll(ctx context.Context, req *milvuspb.FlushAllRequest) } resp.FlushAllTs = ts - resp.Status.ErrorCode = commonpb.ErrorCode_Success log.Info(rpcDone("FlushAll"), zap.Uint64("FlushAllTs", ts), zap.Time("FlushAllTime", tsoutil.PhysicalTime(ts))) @@ -3057,9 +3049,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G zap.Any("collection", req.CollectionName)) resp := &milvuspb.GetPersistentSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { resp.Status = merr.Status(err) @@ -3074,7 +3064,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - resp.Status.Reason = fmt.Errorf("getCollectionID failed, err:%w", err).Error() + resp.Status = merr.Status(err) return resp, nil } @@ -3086,7 +3076,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G }) if err != nil { metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() - resp.Status.Reason = fmt.Errorf("getSegmentsOfCollection, err:%w", err).Error() + resp.Status = merr.Status(err) return resp, nil } @@ -3104,7 +3094,7 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G metrics.FailLabel).Inc() log.Warn("GetPersistentSegmentInfo fail", zap.Error(err)) - resp.Status.Reason = fmt.Errorf("dataCoord:GetSegmentInfo, err:%w", err).Error() + resp.Status = merr.Status(err) return resp, nil } err = merr.Error(infoResp.GetStatus()) @@ -3130,7 +3120,6 @@ func (node *Proxy) GetPersistentSegmentInfo(ctx context.Context, req *milvuspb.G metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Infos = persistentInfos return resp, nil } @@ -3148,9 +3137,7 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue zap.Any("collection", req.CollectionName)) resp := &milvuspb.GetQuerySegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + Status: merr.Status(nil), } if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { resp.Status = merr.Status(err) @@ -3206,7 +3193,6 @@ func (node *Proxy) GetQuerySegmentInfo(ctx context.Context, req *milvuspb.GetQue metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.SuccessLabel).Inc() metrics.ProxyReqLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method).Observe(float64(tr.ElapseSpan().Milliseconds())) - resp.Status.ErrorCode = commonpb.ErrorCode_Success resp.Infos = queryInfos return resp, nil } @@ -3448,16 +3434,14 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq return unhealthyStatus(), nil } - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - } + status := merr.Status(nil) collectionID, err := globalMetaCache.GetCollectionID(ctx, req.GetDbName(), req.GetCollectionName()) if err != nil { log.Warn("failed to get collection id", zap.String("collectionName", req.GetCollectionName()), zap.Error(err)) - status.Reason = err.Error() + status = merr.Status(err) return status, nil } infoResp, err := node.queryCoord.LoadBalance(ctx, &querypb.LoadBalanceRequest{ @@ -3476,19 +3460,18 @@ func (node *Proxy) LoadBalance(ctx context.Context, req *milvuspb.LoadBalanceReq log.Warn("Failed to LoadBalance from Query Coordinator", zap.Any("req", req), zap.Error(err)) - status.Reason = err.Error() + status = merr.Status(err) return status, nil } if infoResp.ErrorCode != commonpb.ErrorCode_Success { log.Warn("Failed to LoadBalance from Query Coordinator", zap.String("errMsg", infoResp.Reason)) - status.Reason = infoResp.Reason + status = infoResp return status, nil } log.Debug("LoadBalance Done", zap.Any("req", req), zap.Any("status", infoResp)) - status.ErrorCode = commonpb.ErrorCode_Success return status, nil } @@ -3710,8 +3693,7 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi if err != nil { log.Error("failed to execute import request", zap.Error(err)) - resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - resp.Status.Reason = "request options is not illegal \n" + err.Error() + " \nIllegal option format \n" + importutil.OptionFormat + resp.Status = merr.Status(err) return resp, nil } @@ -3726,7 +3708,6 @@ func (node *Proxy) Import(ctx context.Context, req *milvuspb.ImportRequest) (*mi metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() log.Error("failed to execute bulk insert request", zap.Error(err)) - resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status = merr.Status(err) return resp, nil } @@ -3745,7 +3726,9 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt log.Debug("received get import state request", zap.Int64("taskID", req.GetTask())) - resp := &milvuspb.GetImportStateResponse{} + resp := &milvuspb.GetImportStateResponse{ + Status: merr.Status(nil), + } if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { resp.Status = merr.Status(err) return resp, nil @@ -3760,7 +3743,6 @@ func (node *Proxy) GetImportState(ctx context.Context, req *milvuspb.GetImportSt metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() log.Error("failed to execute get import state", zap.Error(err)) - resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status = merr.Status(err) return resp, nil } @@ -3781,7 +3763,9 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport log := log.Ctx(ctx) log.Debug("received list import tasks request") - resp := &milvuspb.ListImportTasksResponse{} + resp := &milvuspb.ListImportTasksResponse{ + Status: merr.Status(nil), + } if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { resp.Status = merr.Status(err) return resp, nil @@ -3795,7 +3779,6 @@ func (node *Proxy) ListImportTasks(ctx context.Context, req *milvuspb.ListImport metrics.ProxyFunctionCall.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), method, metrics.FailLabel).Inc() log.Error("failed to execute list import tasks", zap.Error(err)) - resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status = merr.Status(err) return resp, nil } @@ -4363,9 +4346,7 @@ func (node *Proxy) RefreshPolicyInfoCache(ctx context.Context, req *proxypb.Refr // SetRates limits the rates of requests. func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesRequest) (*commonpb.Status, error) { - resp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - } + resp := merr.Status(nil) if err := merr.CheckHealthy(node.stateCode.Load().(commonpb.StateCode)); err != nil { resp = unhealthyStatus() return resp, nil @@ -4374,10 +4355,10 @@ func (node *Proxy) SetRates(ctx context.Context, request *proxypb.SetRatesReques err := node.multiRateLimiter.SetRates(request.GetRates()) // TODO: set multiple rate limiter rates if err != nil { - resp.Reason = err.Error() + resp = merr.Status(err) return resp, nil } - resp.ErrorCode = commonpb.ErrorCode_Success + return resp, nil } diff --git a/internal/proxy/impl_test.go b/internal/proxy/impl_test.go index c702178e99..6b7938a54e 100644 --- a/internal/proxy/impl_test.go +++ b/internal/proxy/impl_test.go @@ -486,7 +486,7 @@ func TestProxy_FlushAll(t *testing.T) { node.stateCode.Store(commonpb.StateCode_Abnormal) resp, err := node.FlushAll(ctx, &milvuspb.FlushAllRequest{}) assert.NoError(t, err) - assert.Equal(t, resp.GetStatus().GetErrorCode(), commonpb.ErrorCode_UnexpectedError) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) node.stateCode.Store(commonpb.StateCode_Healthy) }) diff --git a/internal/proxy/multi_rate_limiter_test.go b/internal/proxy/multi_rate_limiter_test.go index a41176af57..db80be17ca 100644 --- a/internal/proxy/multi_rate_limiter_test.go +++ b/internal/proxy/multi_rate_limiter_test.go @@ -302,7 +302,7 @@ func TestRateLimiter(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() // avoid production precision issues when comparing 0-terminated numbers - newRate := fmt.Sprintf("%.3f1", rand.Float64()) + newRate := fmt.Sprintf("%.2f1", rand.Float64()) etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/collectionRate", newRate) etcdCli.KV.Put(ctx, "by-dev/config/quotaAndLimits/ddl/partitionRate", "invalid") diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index ba457da05f..159d5bf879 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -1518,7 +1518,7 @@ func TestProxy(t *testing.T) { Base: nil, }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.ErrorCode) + assert.ErrorIs(t, merr.Error(resp), merr.ErrCollectionNotFound) }) // TODO(dragondriver): dummy @@ -2047,7 +2047,7 @@ func TestProxy(t *testing.T) { resp, err := proxy.Upsert(ctx, req) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_UpsertAutoIDTrue, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid) assert.Equal(t, 0, len(resp.SuccIndex)) assert.Equal(t, rowNum, len(resp.ErrIndex)) assert.Equal(t, int64(0), resp.UpsertCnt) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 287fdb0ece..ea9fdf0323 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -536,7 +536,9 @@ func (dct *describeCollectionTask) Execute(ctx context.Context) error { // compatibility with PyMilvus existing implementation err := merr.Error(dct.result.GetStatus()) if errors.Is(err, merr.ErrCollectionNotFound) { + // nolint dct.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError + // nolint dct.result.Status.Reason = "can't find collection " + dct.result.GetStatus().GetReason() } } else { diff --git a/internal/proxy/task_insert.go b/internal/proxy/task_insert.go index 7d54d7905c..2a6359cf32 100644 --- a/internal/proxy/task_insert.go +++ b/internal/proxy/task_insert.go @@ -233,8 +233,7 @@ func (it *insertTask) Execute(ctx context.Context) error { channelNames, err := it.chMgr.getVChannels(collID) if err != nil { log.Warn("get vChannels failed", zap.Int64("collectionID", collID), zap.Error(err)) - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - it.result.Status.Reason = err.Error() + it.result.Status = merr.Status(err) return err } @@ -255,8 +254,7 @@ func (it *insertTask) Execute(ctx context.Context) error { } if err != nil { log.Warn("assign segmentID and repack insert data failed", zap.Error(err)) - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - it.result.Status.Reason = err.Error() + it.result.Status = merr.Status(err) return err } assignSegmentIDDur := tr.RecordSpan() @@ -266,8 +264,7 @@ func (it *insertTask) Execute(ctx context.Context) error { err = stream.Produce(msgPack) if err != nil { log.Warn("fail to produce insert msg", zap.Error(err)) - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - it.result.Status.Reason = err.Error() + it.result.Status = merr.Status(err) return err } sendMsgDur := tr.RecordSpan() diff --git a/internal/proxy/task_upsert.go b/internal/proxy/task_upsert.go index 64aec5660b..65f0611ce7 100644 --- a/internal/proxy/task_upsert.go +++ b/internal/proxy/task_upsert.go @@ -389,8 +389,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP if err != nil { log.Warn("get vChannels failed when insertExecute", zap.Error(err)) - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - it.result.Status.Reason = err.Error() + it.result.Status = merr.Status(err) return err } @@ -413,8 +412,7 @@ func (it *upsertTask) insertExecute(ctx context.Context, msgPack *msgstream.MsgP if err != nil { log.Warn("assign segmentID and repack insert data failed when insertExecute", zap.Error(err)) - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - it.result.Status.Reason = err.Error() + it.result.Status = merr.Status(err) return err } assignSegmentIDDur := tr.RecordSpan() @@ -438,8 +436,7 @@ func (it *upsertTask) deleteExecute(ctx context.Context, msgPack *msgstream.MsgP channelNames, err := it.chMgr.getVChannels(collID) if err != nil { log.Warn("get vChannels failed when deleteExecute", zap.Error(err)) - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - it.result.Status.Reason = err.Error() + it.result.Status = merr.Status(err) return err } it.upsertMsg.DeleteMsg.PrimaryKeys = it.result.IDs @@ -539,8 +536,7 @@ func (it *upsertTask) Execute(ctx context.Context) (err error) { tr.RecordSpan() err = stream.Produce(msgPack) if err != nil { - it.result.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError - it.result.Status.Reason = err.Error() + it.result.Status = merr.Status(err) return err } sendMsgDur := tr.RecordSpan() diff --git a/internal/proxy/util.go b/internal/proxy/util.go index 010227deac..3fec75050a 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -1192,8 +1192,9 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M // upsert has not supported when autoID == true log.Info("can not upsert when auto id enabled", zap.String("primaryFieldSchemaName", primaryFieldSchema.Name)) - result.Status.ErrorCode = commonpb.ErrorCode_UpsertAutoIDTrue - return nil, fmt.Errorf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name) + err := merr.WrapErrParameterInvalidMsg(fmt.Sprintf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.GetName())) + result.Status = merr.Status(err) + return nil, err } primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema) if err != nil { diff --git a/internal/proxy/util_test.go b/internal/proxy/util_test.go index 5036c2455a..1b93e88eed 100644 --- a/internal/proxy/util_test.go +++ b/internal/proxy/util_test.go @@ -1701,7 +1701,7 @@ func Test_UpsertTaskCheckPrimaryFieldData(t *testing.T) { case4.schema.Fields[0].IsPrimaryKey = true case4.schema.Fields[0].AutoID = true _, err = checkPrimaryFieldData(case4.schema, case4.result, case4.insertMsg, false) - assert.Equal(t, commonpb.ErrorCode_UpsertAutoIDTrue, case4.result.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(case4.result.GetStatus()), merr.ErrParameterInvalid) assert.NotEqual(t, nil, err) // primary field data is nil, GetPrimaryFieldData fail diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index a681957f2f..35e90d8175 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -212,7 +212,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID return nil, err } if resp.GetSegmentInfo() == nil { - err = merr.WrapErrCollectionNotFound(segmentID) + err = merr.WrapErrIndexNotFoundForSegment(segmentID) log.Warn("failed to get segment index info", zap.Error(err)) return nil, err @@ -220,7 +220,7 @@ func (broker *CoordinatorBroker) GetIndexInfo(ctx context.Context, collectionID segmentInfo, ok := resp.GetSegmentInfo()[segmentID] if !ok || len(segmentInfo.GetIndexInfos()) == 0 { - return nil, merr.WrapErrIndexNotFound() + return nil, merr.WrapErrIndexNotFoundForSegment(segmentID) } indexes := make([]*querypb.FieldIndexInfo, 0) diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 82aa287d27..0923ccb366 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -674,7 +674,7 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques srcNode := req.GetSourceNodeIDs()[0] replica := s.meta.ReplicaManager.GetByCollectionAndNode(req.GetCollectionID(), srcNode) if replica == nil { - err := merr.WrapErrReplicaNotFound(-1, fmt.Sprintf("replica not found for collection %d and node %d", req.GetCollectionID(), srcNode)) + err := merr.WrapErrNodeNotFound(srcNode, fmt.Sprintf("source node not found in any replica of collection %d", req.GetCollectionID())) msg := "source node not found in any replica" log.Warn(msg) return merr.Status(err), nil @@ -685,9 +685,8 @@ func (s *Server) LoadBalance(ctx context.Context, req *querypb.LoadBalanceReques } for _, dstNode := range req.GetDstNodeIDs() { if !replica.Contains(dstNode) { - err := merr.WrapErrParameterInvalid("destination node in the same replica as source node", fmt.Sprintf("destination node %d not in replica %d", dstNode, replica.GetID())) - msg := "destination nodes have to be in the same replica of source node" - log.Warn(msg) + err := merr.WrapErrNodeNotFound(dstNode, "destination node not found in the same replica") + log.Warn("failed to balance to the destination node", zap.Error(err)) return merr.Status(err), nil } if err := s.isStoppingNode(dstNode); err != nil { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 57d5750ab2..3ef4238a4b 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1241,7 +1241,7 @@ func (suite *ServiceSuite) TestLoadBalanceFailed() { } resp, err := server.LoadBalance(ctx, req) suite.NoError(err) - suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid) + suite.ErrorIs(merr.Error(resp), merr.ErrNodeNotFound) } // Test balance task failed diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 088b385afb..70af9f1263 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -502,7 +502,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() { }, }, }, nil) - suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFound()) + suite.broker.EXPECT().GetIndexInfo(mock.Anything, suite.collection, segment).Return(nil, merr.WrapErrIndexNotFoundForSegment(segment)) } suite.cluster.EXPECT().LoadSegments(mock.Anything, targetNode, mock.Anything).Return(merr.Status(nil), nil) diff --git a/internal/querynodev2/handlers.go b/internal/querynodev2/handlers.go index 4bd0f01fbe..3ad2304139 100644 --- a/internal/querynodev2/handlers.go +++ b/internal/querynodev2/handlers.go @@ -33,7 +33,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/delegator" "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" - "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -100,10 +99,10 @@ func (node *QueryNode) loadDeltaLogs(ctx context.Context, req *querypb.LoadSegme if finalErr != nil { log.Warn("failed to load delta logs", zap.Error(finalErr)) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to load delta logs", finalErr) + return merr.Status(finalErr) } - return util.SuccessStatus() + return merr.Status(nil) } func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsRequest) *commonpb.Status { @@ -112,7 +111,7 @@ func (node *QueryNode) loadIndex(ctx context.Context, req *querypb.LoadSegmentsR zap.Int64s("segmentIDs", lo.Map(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) int64 { return info.GetSegmentID() })), ) - status := util.SuccessStatus() + status := merr.Status(nil) log.Info("start to load index") for _, info := range req.GetInfos() { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index a9abda2a4f..dea1c7a4d1 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -42,7 +42,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -234,7 +233,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm if !node.subscribingChannels.Insert(channel.GetChannelName()) { msg := "channel subscribing..." log.Warn(msg) - return util.SuccessStatus(msg), nil + return merr.Status(nil), nil } defer node.subscribingChannels.Remove(channel.GetChannelName()) @@ -248,7 +247,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm _, exist := node.delegators.Get(channel.GetChannelName()) if exist { log.Info("channel already subscribed") - return util.SuccessStatus(), nil + return merr.Status(nil), nil } node.manager.Collection.PutOrRef(req.GetCollectionID(), req.GetSchema(), @@ -259,7 +258,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm node.clusterManager, node.manager, node.tSafeManager, node.loader, node.factory, channel.GetSeekPosition().GetTimestamp()) if err != nil { log.Warn("failed to create shard delegator", zap.Error(err)) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to create shard delegator", err), nil + return merr.Status(err), nil } node.delegators.Insert(channel.GetChannelName(), delegator) defer func() { @@ -280,7 +279,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm if err != nil { msg := "failed to create pipeline" log.Warn(msg, zap.Error(err)) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil + return merr.Status(err), nil } defer func() { if err != nil { @@ -316,7 +315,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm if err != nil { msg := "failed to load growing segments" log.Warn(msg, zap.Error(err)) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg, err), nil + return merr.Status(err), nil } position := &msgpb.MsgPosition{ ChannelName: channel.SeekPosition.ChannelName, @@ -338,7 +337,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm // delegator after all steps done delegator.Start() log.Info("watch dml channel success") - return util.SuccessStatus(), nil + return merr.Status(nil), nil } func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmChannelRequest) (*commonpb.Status, error) { @@ -382,7 +381,7 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC } log.Info("unsubscribed channel") - return util.SuccessStatus(), nil + return merr.Status(nil), nil } func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) { @@ -434,9 +433,8 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen node.lifetime.Done() // check target matches - if req.GetBase().GetTargetID() != paramtable.GetNodeID() { - return util.WrapStatus(commonpb.ErrorCode_NodeIDNotMatch, - common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), paramtable.GetNodeID())), nil + if err := merr.CheckTargetID(req.GetBase()); err != nil { + return merr.Status(err), nil } // Delegates request to workers @@ -445,17 +443,18 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen if !ok { msg := "failed to load segments, delegator not found" log.Warn(msg) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrChannelNotFound(segment.GetInsertChannel()) + return merr.Status(err), nil } req.NeedTransfer = false err := delegator.LoadSegments(ctx, req) if err != nil { log.Warn("delegator failed to load segments", zap.Error(err)) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } - return util.SuccessStatus(), nil + return merr.Status(nil), nil } if req.GetLoadScope() == querypb.LoadScope_Delta { @@ -486,7 +485,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen log.Info("load segments done...", zap.Int64s("segments", lo.Map(loaded, func(s segments.Segment, _ int) int64 { return s.ID() }))) - return util.SuccessStatus(), nil + return merr.Status(nil), nil } // ReleaseCollection clears all data related to this collection on the querynode @@ -498,7 +497,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.Releas } defer node.lifetime.Done() - return util.SuccessStatus(), nil + return merr.Status(nil), nil } // ReleasePartitions clears all data related to this partition on the querynode @@ -526,7 +525,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.Relea } log.Info("release partitions done") - return util.SuccessStatus(), nil + return merr.Status(nil), nil } // ReleaseSegments remove the specified segments from query node according segmentIDs, partitionIDs, and collectionID @@ -565,7 +564,8 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release if !ok { msg := "failed to release segment, delegator not found" log.Warn(msg) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrChannelNotFound(req.GetShard()) + return merr.Status(err), nil } // when we try to release a segment, add it to pipeline's exclude list first @@ -587,10 +587,10 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release err := delegator.ReleaseSegments(ctx, req, false) if err != nil { log.Warn("delegator failed to release segment", zap.Error(err)) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + return merr.Status(err), nil } - return util.SuccessStatus(), nil + return merr.Status(nil), nil } log.Info("start to release segments") @@ -601,7 +601,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release } node.manager.Collection.Unref(req.GetCollectionID(), uint32(sealedCount)) - return util.SuccessStatus(), nil + return merr.Status(nil), nil } // GetSegmentInfo returns segment information of the collection on the queryNode, and the information includes memSize, numRow, indexName, indexID ... @@ -831,7 +831,6 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( result, err := segments.ReduceSearchResults(ctx, toReduceResults, req.Req.GetNq(), req.Req.GetTopk(), req.Req.GetMetricType()) if err != nil { log.Warn("failed to reduce search results", zap.Error(err)) - failRet.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError failRet.Status = merr.Status(err) return failRet, nil } @@ -852,10 +851,8 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( // only used for delegator query segments from worker func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequest) (*internalpb.RetrieveResults, error) { - failRet := &internalpb.RetrieveResults{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - }, + resp := &internalpb.RetrieveResults{ + Status: merr.Status(nil), } msgID := req.Req.Base.GetMsgID() traceID := trace.SpanFromContext(ctx).SpanContext().TraceID() @@ -869,14 +866,14 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ if !node.lifetime.Add(commonpbutil.IsHealthy) { err := merr.WrapErrServiceUnavailable(fmt.Sprintf("node id: %d is unhealthy", paramtable.GetNodeID())) - failRet.Status = merr.Status(err) - return failRet, nil + resp.Status = merr.Status(err) + return resp, nil } defer node.lifetime.Done() metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.TotalLabel, metrics.FromLeader).Inc() defer func() { - if failRet.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { metrics.QueryNodeSQCount.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FailLabel, metrics.FromLeader).Inc() } }() @@ -892,22 +889,22 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ tr := timerecord.NewTimeRecorder("querySegments") collection := node.manager.Collection.Get(req.Req.GetCollectionID()) if collection == nil { - failRet.Status = merr.Status(merr.WrapErrCollectionNotLoaded(req.Req.GetCollectionID())) - return failRet, nil + resp.Status = merr.Status(merr.WrapErrCollectionNotLoaded(req.Req.GetCollectionID())) + return resp, nil } // Send task to scheduler and wait until it finished. task := tasks.NewQueryTask(queryCtx, collection, node.manager, req) if err := node.scheduler.Add(task); err != nil { log.Warn("failed to add query task into scheduler", zap.Error(err)) - failRet.Status = merr.Status(err) - return failRet, nil + resp.Status = merr.Status(err) + return resp, nil } err := task.Wait() if err != nil { log.Warn("failed to query channel", zap.Error(err)) - failRet.Status = merr.Status(err) - return failRet, nil + resp.Status = merr.Status(err) + return resp, nil } tr.CtxElapse(ctx, fmt.Sprintf("do query done, traceID = %s, fromSharedLeader = %t, vChannel = %s, segmentIDs = %v", @@ -917,7 +914,6 @@ func (node *QueryNode) QuerySegments(ctx context.Context, req *querypb.QueryRequ req.GetSegmentIDs(), )) - failRet.Status.ErrorCode = commonpb.ErrorCode_Success // TODO QueryNodeSQLatencyInQueue QueryNodeReduceLatency latency := tr.ElapseSpan() metrics.QueryNodeSQReqLatency.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.QueryLabel, metrics.FromLeader).Observe(float64(latency.Milliseconds())) @@ -1143,7 +1139,7 @@ func (node *QueryNode) QueryStreamSegments(req *querypb.QueryRequest, srv queryp // SyncReplicaSegments syncs replica node & segments states func (node *QueryNode) SyncReplicaSegments(ctx context.Context, req *querypb.SyncReplicaSegmentsRequest) (*commonpb.Status, error) { - return util.SuccessStatus(), nil + return merr.Status(nil), nil } // ShowConfigurations returns the configurations of queryNode matching req.Pattern @@ -1416,7 +1412,7 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi LoadScope: querypb.LoadScope_Delta, }) if err != nil { - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, "failed to sync(load) segment", err), nil + return merr.Status(err), nil } } diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index a893345527..c5eb1895cd 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -780,20 +780,19 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() { // Delegator not found status, err := suite.node.LoadSegments(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) - suite.Contains(status.GetReason(), "failed to load segments, delegator not found") + suite.ErrorIs(merr.Error(status), merr.ErrChannelNotFound) // target not match req.Base.TargetID = -1 status, err = suite.node.LoadSegments(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_NodeIDNotMatch, status.GetErrorCode()) + suite.ErrorIs(merr.Error(status), merr.ErrNodeNotMatch) // node not healthy suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err = suite.node.LoadSegments(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) + suite.ErrorIs(merr.Error(status), merr.ErrServiceNotReady) } func (suite *ServiceSuite) TestLoadSegments_Transfer() { diff --git a/internal/querynodev2/tasks/task.go b/internal/querynodev2/tasks/task.go index 545de27bbf..6c396f093b 100644 --- a/internal/querynodev2/tasks/task.go +++ b/internal/querynodev2/tasks/task.go @@ -16,7 +16,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/querynodev2/collector" "github.com/milvus-io/milvus/internal/querynodev2/segments" - "github.com/milvus-io/milvus/internal/util" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -220,7 +219,7 @@ func (t *SearchTask) Execute() error { Base: &commonpb.MsgBase{ SourceID: paramtable.GetNodeID(), }, - Status: util.WrapStatus(commonpb.ErrorCode_Success, ""), + Status: merr.Status(nil), MetricType: req.GetReq().GetMetricType(), NumQueries: t.originNqs[i], TopK: t.originTopks[i], diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 3ffc8c7d73..21aaee2063 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1941,7 +1941,6 @@ func (c *Core) ListImportTasks(ctx context.Context, req *milvuspb.ListImportTask err = fmt.Errorf("failed to find collection ID from its name: '%s', error: %w", req.GetCollectionName(), err) log.Error("ListImportTasks failed", zap.Error(err)) status := merr.Status(err) - status.ErrorCode = commonpb.ErrorCode_IllegalCollectionName return &milvuspb.ListImportTasksResponse{ Status: status, }, nil @@ -2231,7 +2230,6 @@ func (c *Core) ListCredUsers(ctx context.Context, in *milvuspb.ListCredUsersRequ metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.FailLabel).Inc() status := merr.Status(err) - status.ErrorCode = commonpb.ErrorCode_ListCredUsersFailure return &milvuspb.ListCredUsersResponse{Status: status}, nil } ctxLog.Debug("ListCredUsers success") diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 3ba037c6aa..cfaa2d14a8 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -19,7 +19,6 @@ package rootcoord import ( "context" "fmt" - "github.com/milvus-io/milvus/pkg/common" "math/rand" "os" "sync" @@ -47,6 +46,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/sessionutil" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" @@ -1216,6 +1216,7 @@ func TestCore_Import(t *testing.T) { }, }, }) + assert.NoError(t, err) assert.ErrorIs(t, merr.Error(resp2.GetStatus()), merr.ErrBulkInsertPartitionNotFound) }) } @@ -1336,7 +1337,7 @@ func TestCore_ListImportTasks(t *testing.T) { CollectionID: ti3.CollectionId, }, nil } - return nil, errors.New("GetCollectionByName error") + return nil, merr.WrapErrCollectionNotFound(collectionName) } ctx := context.Background() @@ -1374,7 +1375,7 @@ func TestCore_ListImportTasks(t *testing.T) { }) assert.NoError(t, err) assert.Equal(t, 0, len(resp.GetTasks())) - assert.Equal(t, commonpb.ErrorCode_IllegalCollectionName, resp.GetStatus().GetErrorCode()) + assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrCollectionNotFound) // list the latest 2 tasks resp, err = c.ListImportTasks(ctx, &milvuspb.ListImportTasksRequest{ diff --git a/internal/util/utils.go b/internal/util/utils.go deleted file mode 100644 index 7fa8aef322..0000000000 --- a/internal/util/utils.go +++ /dev/null @@ -1,50 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "fmt" - "strings" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -) - -// WrapStatus wraps status with given error code, message and errors -func WrapStatus(code commonpb.ErrorCode, msg string, errs ...error) *commonpb.Status { - status := &commonpb.Status{ - ErrorCode: code, - Reason: msg, - } - - for _, err := range errs { - status.Reason = fmt.Sprintf("%s, err=%v", status.Reason, err) - } - - return status -} - -// SuccessStatus returns a success status with given message -func SuccessStatus(msgs ...string) *commonpb.Status { - return &commonpb.Status{ - Reason: strings.Join(msgs, "; "), - } -} - -// WrapError wraps error with given message -func WrapError(msg string, err error) error { - return fmt.Errorf("%s[%w]", msg, err) -} diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 7e9edcadad..ac31f5061d 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -74,7 +74,9 @@ var ( ErrSegmentReduplicate = newMilvusError("segment reduplicates", 603, false) // Index related - ErrIndexNotFound = newMilvusError("index not found", 700, false) + ErrIndexNotFound = newMilvusError("index not found", 700, false) + ErrIndexNotSupported = newMilvusError("index type not supported", 701, false) + ErrIndexDuplicate = newMilvusError("index duplicates", 702, false) // Database related ErrDatabaseNotFound = newMilvusError("database not found", 800, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 6226658533..d70153b079 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -27,12 +27,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/paramtable" ) -// For compatibility -var oldErrCodes = map[int32]commonpb.ErrorCode{ - ErrServiceNotReady.code(): commonpb.ErrorCode_NotReadyServe, - ErrCollectionNotFound.code(): commonpb.ErrorCode_CollectionNotExists, -} - // Code returns the error code of the given error, // WARN: DO NOT use this for now func Code(err error) int32 { @@ -125,6 +119,12 @@ func oldCode(code int32) commonpb.ErrorCode { case ErrServiceForceDeny.code(): return commonpb.ErrorCode_ForceDeny + case ErrIndexNotFound.code(): + return commonpb.ErrorCode_IndexNotExist + + case ErrSegmentNotFound.code(): + return commonpb.ErrorCode_SegmentNotFound + default: return commonpb.ErrorCode_UnexpectedError } @@ -156,6 +156,12 @@ func OldCodeToMerr(code commonpb.ErrorCode) error { case commonpb.ErrorCode_ForceDeny: return ErrServiceForceDeny + case commonpb.ErrorCode_IndexNotExist: + return ErrIndexNotFound + + case commonpb.ErrorCode_SegmentNotFound: + return ErrSegmentNotFound + default: return errUnexpected } @@ -450,8 +456,32 @@ func WrapErrSegmentReduplicate(id int64, msg ...string) error { } // Index related -func WrapErrIndexNotFound(msg ...string) error { - err := error(ErrIndexNotFound) +func WrapErrIndexNotFound(indexName string, msg ...string) error { + err := wrapWithField(ErrIndexNotFound, "indexName", indexName) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + +func WrapErrIndexNotFoundForSegment(segmentID int64, msg ...string) error { + err := wrapWithField(ErrIndexNotFound, "segmentID", segmentID) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + +func WrapErrIndexNotSupported(indexType string, msg ...string) error { + err := wrapWithField(ErrIndexNotSupported, "indexType", indexType) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + +func WrapErrIndexDuplicate(indexName string, msg ...string) error { + err := wrapWithField(ErrIndexDuplicate, "indexName", indexName) if len(msg) > 0 { err = errors.Wrap(err, strings.Join(msg, "; ")) } @@ -483,6 +513,14 @@ func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error { return err } +func WrapErrNodeLackAny(msg ...string) error { + err := error(ErrNodeLack) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + func WrapErrNodeNotAvailable(id int64, msg ...string) error { err := wrapWithField(ErrNodeNotAvailable, "node", id) if len(msg) > 0 { diff --git a/tests/python_client/testcases/test_utility.py b/tests/python_client/testcases/test_utility.py index 1c5a25c6fd..1a183716d2 100644 --- a/tests/python_client/testcases/test_utility.py +++ b/tests/python_client/testcases/test_utility.py @@ -1924,7 +1924,7 @@ class TestUtilityAdvanced(TestcaseBase): # load balance self.utility_wrap.load_balance(collection_w.name, src_node_id, dst_node_ids, sealed_segment_ids, check_task=CheckTasks.err_res, - check_items={ct.err_code: 1, ct.err_msg: "no available queryNode to allocate"}) + check_items={ct.err_code: 1, ct.err_msg: "destination node not found in the same replica"}) @pytest.mark.tags(CaseLabel.L1) @pytest.mark.xfail(reason="issue: https://github.com/milvus-io/milvus/issues/19441")