diff --git a/internal/datanode/channel_meta.go b/internal/datanode/channel_meta.go index 9f3de5a55d..dc93d8dfe1 100644 --- a/internal/datanode/channel_meta.go +++ b/internal/datanode/channel_meta.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/tsoutil" "github.com/milvus-io/milvus/internal/util/typeutil" ) @@ -206,7 +207,7 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error { log.Warn("failed to addSegment, collection mismatch", zap.Int64("current collection ID", req.collID), zap.Int64("expected collection ID", c.collectionID)) - return fmt.Errorf("failed to addSegment, mismatch collection, ID=%d", req.collID) + return merr.WrapErrParameterInvalid(c.collectionID, req.collID, "collection not match") } log.Info("adding segment", zap.String("type", req.segType.String()), @@ -533,7 +534,7 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem log.Warn("failed to getCollectionSchema, collection mismatch", zap.Int64("current collection ID", collID), zap.Int64("expected collection ID", c.collectionID)) - return nil, fmt.Errorf("failed to getCollectionSchema, mismatch collection, want %d, actual %d", c.collectionID, collID) + return nil, merr.WrapErrParameterInvalid(c.collectionID, collID, "collection not match") } c.schemaMut.RLock() @@ -569,7 +570,7 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl log.Warn("failed to mergeFlushedSegments, collection mismatch", zap.Int64("current collection ID", seg.collectionID), zap.Int64("expected collection ID", c.collectionID)) - return errors.Newf("failed to mergeFlushedSegments, mismatch collection, ID=%d", seg.collectionID) + return merr.WrapErrParameterInvalid(c.collectionID, seg.collectionID, "collection not match") } var inValidSegments []UniqueID @@ -619,7 +620,7 @@ func (c *ChannelMeta) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, n log.Warn("failed to addFlushedSegmentWithPKs, collection mismatch", zap.Int64("current collection ID", collID), zap.Int64("expected collection ID", c.collectionID)) - return fmt.Errorf("failed to addFlushedSegmentWithPKs, mismatch collection, ID=%d", collID) + return merr.WrapErrParameterInvalid(c.collectionID, collID, "collection not match") } log.Info("Add Flushed segment", diff --git a/internal/datanode/errors.go b/internal/datanode/errors.go deleted file mode 100644 index 525293da92..0000000000 --- a/internal/datanode/errors.go +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -import ( - "fmt" - - "github.com/cockroachdb/errors" -) - -var ( - // errSegmentStatsNotChanged error stands for segment stats not changed. - errSegmentStatsNotChanged = errors.New("segment stats not changed") -) - -func msgDataNodeIsUnhealthy(nodeID UniqueID) string { - return fmt.Sprintf("DataNode %d is not ready", nodeID) -} - -func errDataNodeIsUnhealthy(nodeID UniqueID) error { - return errors.New(msgDataNodeIsUnhealthy(nodeID)) -} diff --git a/internal/datanode/errors_test.go b/internal/datanode/errors_test.go deleted file mode 100644 index 112a9f652e..0000000000 --- a/internal/datanode/errors_test.go +++ /dev/null @@ -1,40 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package datanode - -import ( - "testing" - - "github.com/milvus-io/milvus/internal/util/typeutil" - - "github.com/milvus-io/milvus/internal/log" - "go.uber.org/zap" -) - -func TestMsgDataNodeIsUnhealthy(t *testing.T) { - nodeIDList := []typeutil.UniqueID{1, 2, 3} - for _, nodeID := range nodeIDList { - log.Info("TestMsgDataNodeIsUnhealthy", zap.String("msg", msgDataNodeIsUnhealthy(nodeID))) - } -} - -func TestErrDataNodeIsUnhealthy(t *testing.T) { - nodeIDList := []typeutil.UniqueID{1, 2, 3} - for _, nodeID := range nodeIDList { - log.Info("TestErrDataNodeIsUnhealthy", zap.Error(errDataNodeIsUnhealthy(nodeID))) - } -} diff --git a/internal/datanode/flow_graph_manager.go b/internal/datanode/flow_graph_manager.go index 7c01e9ac20..9cd0184b33 100644 --- a/internal/datanode/flow_graph_manager.go +++ b/internal/datanode/flow_graph_manager.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/metrics" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/hardware" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/paramtable" ) @@ -147,7 +148,7 @@ func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error) return flushCh, nil } - return nil, fmt.Errorf("cannot find segment %d in all flowgraphs", segID) + return nil, merr.WrapErrSegmentNotFound(segID, "failed to get flush channel has this segment") } func (fm *flowgraphManager) getChannel(segID UniqueID) (Channel, error) { diff --git a/internal/datanode/meta_service.go b/internal/datanode/meta_service.go index 413d75161a..86c4f289eb 100644 --- a/internal/datanode/meta_service.go +++ b/internal/datanode/meta_service.go @@ -18,12 +18,12 @@ package datanode import ( "context" - "fmt" "reflect" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/paramtable" "go.uber.org/zap" @@ -74,11 +74,14 @@ func (mService *metaService) getCollectionInfo(ctx context.Context, collID Uniqu response, err := mService.rootCoord.DescribeCollectionInternal(ctx, req) if err != nil { - return nil, fmt.Errorf("grpc error when describe collection %v from rootcoord: %s", collID, err.Error()) + log.Error("grpc error when describe", zap.Int64("collectionID", collID), zap.Error(err)) + return nil, err } if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - return nil, fmt.Errorf("describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason()) + err := merr.Error(response.Status) + log.Error("describe collection from rootcoord failed", zap.Int64("collectionID", collID), zap.Error(err)) + return nil, err } return response, nil diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 82eb298044..fac17c3938 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/importutil" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/metautil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" @@ -55,6 +56,7 @@ import ( func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) { log.Warn("DataNode WatchDmChannels is not in use") + // TODO ERROR OF GRPC NOT IN USE return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, Reason: "watchDmChannels do nothing", @@ -76,7 +78,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*milvuspb.Compone StateCode: node.stateCode.Load().(commonpb.StateCode), }, SubcomponentStates: make([]*milvuspb.ComponentInfo, 0), - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), } return states, nil } @@ -91,13 +93,11 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen fmt.Sprint(paramtable.GetNodeID()), MetricRequestsTotal).Inc() - errStatus := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - } - if !node.isHealthy() { - errStatus.Reason = "dataNode not in HEALTHY state" - return errStatus, nil + err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) + log.Warn("DataNode.FlushSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) + + return merr.Status(err), nil } serverID := node.GetSession().ServerID @@ -106,11 +106,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen zap.Int64("targetID", req.GetBase().GetTargetID()), zap.Int64("serverID", serverID), ) - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_NodeIDNotMatch, - Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), serverID), - } - return status, nil + + return merr.Status(merr.WrapErrNodeNotMatch(req.GetBase().GetTargetID(), serverID)), nil } log.Info("receiving FlushSegments request", @@ -130,9 +127,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen // If no flush channel is found, report an error. flushCh, err := node.flowgraphManager.getFlushCh(segID) if err != nil { - errStatus.Reason = "no flush channel found for the segment, unable to flush" - log.Error(errStatus.Reason, zap.Int64("segmentID", segID), zap.Error(err)) - return errStatus, nil + log.Error("no flush channel found for the segment, unable to flush", zap.Int64("segmentID", segID), zap.Error(err)) + return merr.Status(err), nil } // Double check that the segment is still not cached. @@ -163,9 +159,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen metrics.DataNodeFlushReqCounter.WithLabelValues( fmt.Sprint(paramtable.GetNodeID()), MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, nil + return merr.Status(nil), nil } // ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message. @@ -177,9 +171,7 @@ func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.Resend log.Info("found segment(s) with stats to resend", zap.Int64s("segment IDs", segResent)) return &datapb.ResendSegmentStatsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: merr.Status(nil), SegResent: segResent, }, nil } @@ -187,18 +179,14 @@ func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.Resend // GetTimeTickChannel currently do nothing func (node *DataNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: merr.Status(nil), }, nil } // GetStatisticsChannel currently do nothing func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { return &milvuspb.StringResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: merr.Status(nil), }, nil } @@ -206,16 +194,11 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { log.Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern)) if !node.isHealthy() { - log.Warn("DataNode.ShowConfigurations failed", - zap.Int64("nodeId", paramtable.GetNodeID()), - zap.String("req", req.Pattern), - zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) + err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) + log.Warn("DataNode.ShowConfigurations failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) return &internalpb.ShowConfigurationsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), - }, + Status: merr.Status(err), Configuations: nil, }, nil } @@ -229,10 +212,7 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh } return &internalpb.ShowConfigurationsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, + Status: merr.Status(nil), Configuations: configList, }, nil } @@ -240,16 +220,11 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh // GetMetrics return datanode metrics func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { if !node.isHealthy() { - log.Warn("DataNode.GetMetrics failed", - zap.Int64("nodeID", paramtable.GetNodeID()), - zap.String("req", req.Request), - zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) + err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) + log.Warn("DataNode.GetMetrics failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), - }, + Status: merr.Status(err), }, nil } @@ -261,10 +236,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()), - }, + Status: merr.Status(err), }, nil } @@ -273,10 +245,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe if err != nil { log.Warn("DataNode GetMetrics failed", zap.Int64("nodeID", paramtable.GetNodeID()), zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()), - }, + Status: merr.Status(err), }, nil } @@ -289,31 +258,28 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe zap.String("metric_type", metricType)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, + Status: merr.Status(merr.WrapErrMetricNotFound(metricType)), }, nil } // Compaction handles compaction request from DataCoord // returns status as long as compaction task enqueued or invalid func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) { - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, + if !node.isHealthy() { + err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) + log.Warn("DataNode.Compaction failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) + return merr.Status(err), nil } ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel()) if !ok { log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel())) - status.Reason = errIllegalCompactionPlan.Error() - return status, nil + return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil } if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) { log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channel name", req.GetChannel())) - status.Reason = "channel marked invalid" - return status, nil + return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil } binlogIO := &binlogIO{node.chunkManager, ds.idAllocator} @@ -329,20 +295,17 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan node.compactionExecutor.execute(task) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, nil + return merr.Status(nil), nil } // GetCompactionState called by DataCoord // return status of all compaction plans func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) { if !node.isHealthy() { + err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) + log.Warn("DataNode.GetCompactionState failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) return &datapb.CompactionStateResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DataNode is unhealthy", - }, + Status: merr.Status(err), }, nil } results := make([]*datapb.CompactionStateResult, 0) @@ -366,7 +329,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac log.Info("Compaction results", zap.Any("results", results)) } return &datapb.CompactionStateResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + Status: merr.Status(nil), Results: results, }, nil } @@ -379,16 +342,15 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments zap.Int64s("compacted from", req.GetCompactedFrom()), zap.Int64("numOfRows", req.GetNumOfRows()), ) - status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} if !node.isHealthy() { - status.Reason = "DataNode is unhealthy" - return status, nil + err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) + log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) + return merr.Status(err), nil } if len(req.GetCompactedFrom()) <= 0 { - status.Reason = "invalid request, compacted from segments shouldn't be empty" - return status, nil + return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil } var ( @@ -415,8 +377,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments } if oneSegment == 0 { log.Ctx(ctx).Warn("no valid segment, maybe the request is a retry") - status.ErrorCode = commonpb.ErrorCode_Success - return status, nil + return merr.Status(nil), nil } // oneSegment is definitely in the channel, guaranteed by the check before. @@ -430,20 +391,17 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime()) if err != nil { - status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error()) - return status, nil + return merr.Status(err), nil } // block all flow graph so it's safe to remove segment ds.fg.Blockall() defer ds.fg.Unblock() if err := channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil { - status.Reason = err.Error() - return status, nil + return merr.Status(err), nil } node.compactionExecutor.injectDone(req.GetPlanID()) - status.ErrorCode = commonpb.ErrorCode_Success - return status, nil + return merr.Status(nil), nil } // Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments @@ -459,9 +417,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) }() importResult := &rootcoordpb.ImportResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: merr.Status(nil), TaskId: req.GetImportTask().TaskId, DatanodeId: paramtable.GetNodeID(), State: commonpb.ImportState_ImportStarted, @@ -488,16 +444,9 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) } if !node.isHealthy() { - log.Warn("DataNode import failed", - zap.Int64("collection ID", req.GetImportTask().GetCollectionId()), - zap.Int64("partition ID", req.GetImportTask().GetPartitionId()), - zap.Int64("task ID", req.GetImportTask().GetTaskId()), - zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID()))) - - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()), - }, nil + err := merr.WrapErrServiceNotReady(node.GetStateCode().String()) + log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err)) + return merr.Status(err), nil } // get a timestamp for all the rows @@ -520,10 +469,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) log.Warn("fail to report import state to RootCoord", zap.Error(reportErr)) } if err != nil { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msg, - }, nil + return merr.Status(err), nil } } @@ -543,10 +489,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) if reportErr != nil { log.Warn("fail to report import state to RootCoord", zap.Error(err)) } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, nil + return merr.Status(err), nil } returnFailFunc := func(inputErr error) (*commonpb.Status, error) { @@ -559,10 +502,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) if reportErr != nil { log.Warn("fail to report import state to RootCoord", zap.Error(inputErr)) } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: inputErr.Error(), - }, nil + return merr.Status(err), nil } // parse files and generate segments @@ -585,10 +525,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest) return returnFailFunc(err) } - resp := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - } - return resp, nil + return merr.Status(nil), nil } // AddImportSegment adds the import segment to the current DataNode. @@ -606,7 +543,7 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor var ok bool ds, ok = node.flowgraphManager.getFlowgraphService(req.GetChannelName()) if !ok { - return errors.New("channel not found") + return merr.WrapErrChannelNotFound(req.ChannelName) } return nil }, retry.Attempts(getFlowGraphServiceAttempts)) @@ -615,22 +552,14 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor zap.String("channel name", req.GetChannelName()), zap.Int64("node ID", paramtable.GetNodeID())) return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - // TODO: Add specific error code. - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "channel not found in current DataNode", - }, + Status: merr.Status(err), }, nil } // Get the current dml channel position ID, that will be used in segments start positions and end positions. posID, err := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId()) if err != nil { return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - // TODO: Add specific error code. - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "failed to get channel position", - }, + Status: merr.Status(merr.WrapErrChannelNotFound(req.ChannelName, "failed to get channel position")), }, nil } // Add the new segment to the channel. @@ -663,19 +592,13 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor log.Error("failed to add segment to flow graph", zap.Error(err)) return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - // TODO: Add specific error code. - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: merr.Status(err), }, nil } } ds.flushingSegCache.Remove(req.GetSegmentId()) return &datapb.AddImportSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: merr.Status(nil), ChannelPos: posID, }, nil } @@ -723,9 +646,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil // ignore the returned error, since even report failed the segments still can be cleaned retry.Do(context.Background(), func() error { importResult := &rootcoordpb.ImportResult{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - }, + Status: merr.Status(nil), TaskId: req.GetImportTask().TaskId, DatanodeId: paramtable.GetNodeID(), State: commonpb.ImportState_ImportStarted, diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 5e47771b28..04f6de7d17 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -44,6 +44,7 @@ import ( "github.com/milvus-io/milvus/internal/util/dependency" "github.com/milvus-io/milvus/internal/util/etcd" "github.com/milvus-io/milvus/internal/util/importutil" + "github.com/milvus-io/milvus/internal/util/merr" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/sessionutil" @@ -117,7 +118,7 @@ func (s *DataNodeServicesSuite) TestNotInUseAPIs() { s.Run("WatchDmChannels", func() { status, err := s.node.WatchDmChannels(s.ctx, &datapb.WatchDmChannelsRequest{}) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) + s.Assert().True(merr.Ok(status)) }) s.Run("GetTimeTickChannel", func() { _, err := s.node.GetTimeTickChannel(s.ctx) @@ -133,14 +134,14 @@ func (s *DataNodeServicesSuite) TestNotInUseAPIs() { func (s *DataNodeServicesSuite) TestGetComponentStates() { resp, err := s.node.GetComponentStates(s.ctx) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().True(merr.Ok(resp.GetStatus())) s.Assert().Equal(common.NotRegisteredID, resp.State.NodeID) s.node.SetSession(&sessionutil.Session{}) s.node.session.UpdateRegistered(true) resp, err = s.node.GetComponentStates(context.Background()) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().True(merr.Ok(resp.GetStatus())) } func (s *DataNodeServicesSuite) TestGetCompactionState() { @@ -184,7 +185,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { node := &DataNode{} node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, _ := node.GetCompactionState(s.ctx, nil) - s.Assert().Equal("DataNode is unhealthy", resp.GetStatus().GetReason()) + s.Assert().Equal(merr.Code(merr.ErrServiceNotReady), resp.GetStatus().GetCode()) }) } @@ -231,7 +232,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { status, err := s.node.FlushSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) + s.Assert().True(merr.Ok(status)) }() go func() { @@ -273,7 +274,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { // dup call status, err := s.node.FlushSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) + s.Assert().True(merr.Ok(status)) // failure call req = &datapb.FlushSegmentsRequest{ @@ -286,7 +287,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { } status, err = s.node.FlushSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_NodeIDNotMatch, status.ErrorCode) + s.Assert().Equal(merr.Code(merr.ErrNodeNotMatch), status.GetCode()) req = &datapb.FlushSegmentsRequest{ Base: &commonpb.MsgBase{ @@ -299,7 +300,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { status, err = s.node.FlushSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + s.Assert().False(merr.Ok(status)) req = &datapb.FlushSegmentsRequest{ Base: &commonpb.MsgBase{ @@ -312,7 +313,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() { status, err = s.node.FlushSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode) + s.Assert().True(merr.Ok(status)) } func (s *DataNodeServicesSuite) TestShowConfigurations() { @@ -332,12 +333,12 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() { resp, err := node.ShowConfigurations(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + s.Assert().False(merr.Ok(resp.GetStatus())) node.stateCode.Store(commonpb.StateCode_Healthy) resp, err = node.ShowConfigurations(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().True(merr.Ok(resp.GetStatus())) s.Assert().Equal(1, len(resp.Configuations)) s.Assert().Equal("datanode.port", resp.Configuations[0].Key) } @@ -350,7 +351,7 @@ func (s *DataNodeServicesSuite) TestGetMetrics() { node.stateCode.Store(commonpb.StateCode_Abnormal) resp, err := node.GetMetrics(s.ctx, &milvuspb.GetMetricsRequest{}) s.Assert().NoError(err) - s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().False(merr.Ok(resp.GetStatus())) node.stateCode.Store(commonpb.StateCode_Healthy) @@ -360,7 +361,7 @@ func (s *DataNodeServicesSuite) TestGetMetrics() { Request: invalidRequest, }) s.Assert().NoError(err) - s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().False(merr.Ok(resp.GetStatus())) // unsupported metric type unsupportedMetricType := "unsupported" @@ -368,14 +369,14 @@ func (s *DataNodeServicesSuite) TestGetMetrics() { s.Assert().NoError(err) resp, err = node.GetMetrics(s.ctx, req) s.Assert().NoError(err) - s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().False(merr.Ok(resp.GetStatus())) // normal case req, err = metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) s.Assert().NoError(err) resp, err = node.GetMetrics(node.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) + s.Assert().True(merr.Ok(resp.GetStatus())) log.Info("Test DataNode.GetMetrics", zap.String("name", resp.ComponentName), zap.String("response", resp.Response)) @@ -453,7 +454,7 @@ func (s *DataNodeServicesSuite) TestImport() { stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetErrorCode()) + s.Assert().True(merr.Ok(stat)) s.Assert().Equal("", stat.GetReason()) }) @@ -504,7 +505,7 @@ func (s *DataNodeServicesSuite) TestImport() { } stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetErrorCode()) + s.Assert().True(merr.Ok(stat)) s.Assert().Equal("", stat.GetReason()) }) s.Run("Test Import report import error", func() { @@ -537,7 +538,7 @@ func (s *DataNodeServicesSuite) TestImport() { } stat, err := s.node.Import(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) + s.Assert().False(merr.Ok(stat)) }) s.Run("Test Import error", func() { @@ -550,16 +551,16 @@ func (s *DataNodeServicesSuite) TestImport() { } stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.ErrorCode) + s.Assert().False(merr.Ok(stat)) stat, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, returnError), req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) + s.Assert().False(merr.Ok(stat)) s.node.stateCode.Store(commonpb.StateCode_Abnormal) stat, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode()) + s.Assert().False(merr.Ok(stat)) }) } @@ -592,7 +593,7 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { _, ok = s.node.flowgraphManager.getFlowgraphService(chName2) s.Assert().True(ok) - stat, err := s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ + resp, err := s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ SegmentId: 100, CollectionId: 100, PartitionId: 100, @@ -600,12 +601,12 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { RowNum: 500, }) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetStatus().GetErrorCode()) - s.Assert().Equal("", stat.GetStatus().GetReason()) - s.Assert().NotEqual(nil, stat.GetChannelPos()) + s.Assert().True(merr.Ok(resp.GetStatus())) + s.Assert().Equal("", resp.GetStatus().GetReason()) + s.Assert().NotEqual(nil, resp.GetChannelPos()) getFlowGraphServiceAttempts = 3 - stat, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ + resp, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{ SegmentId: 100, CollectionId: 100, PartitionId: 100, @@ -613,7 +614,9 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() { RowNum: 500, }) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetStatus().GetErrorCode()) + // TODO ASSERT COMBINE ERROR + s.Assert().False(merr.Ok(resp.GetStatus())) + // s.Assert().Equal(merr.Code(merr.ErrChannelNotFound), stat.GetStatus().GetCode()) }) } @@ -651,12 +654,12 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { req.CompactedFrom = []UniqueID{} status, err := s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + s.Assert().False(merr.Ok(status)) req.CompactedFrom = []UniqueID{101, 201} status, err = s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + s.Assert().True(merr.Ok(status)) }) s.Run("valid request numRows>0", func() { @@ -669,11 +672,11 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { cancel() status, err := s.node.SyncSegments(cancelCtx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + s.Assert().False(merr.Ok(status)) status, err = s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + s.Assert().True(merr.Ok(status)) s.Assert().True(fg.channel.hasSegment(req.CompactedTo, true)) s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[0], true)) @@ -681,7 +684,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { status, err = s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + s.Assert().True(merr.Ok(status)) }) s.Run("valid request numRows=0", func() { @@ -702,7 +705,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { } status, err := s.node.SyncSegments(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode()) + s.Assert().True(merr.Ok(status)) s.Assert().False(fg.channel.hasSegment(req.CompactedTo, true)) s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[0], true)) @@ -762,12 +765,12 @@ func (s *DataNodeServicesSuite) TestResendSegmentStats() { resp, err := s.node.ResendSegmentStats(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.Assert().True(merr.Ok(resp.GetStatus())) s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent()) // Duplicate call. resp, err = s.node.ResendSegmentStats(s.ctx, req) s.Assert().NoError(err) - s.Assert().Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.Assert().True(merr.Ok(resp.GetStatus())) s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent()) } diff --git a/internal/util/merr/errors.go b/internal/util/merr/errors.go index 89305f2756..1b2e235ce8 100644 --- a/internal/util/merr/errors.go +++ b/internal/util/merr/errors.go @@ -87,6 +87,7 @@ var ( ErrNodeNotFound = newMilvusError("node not found", 901, false) ErrNodeOffline = newMilvusError("node offline", 902, false) ErrNodeLack = newMilvusError("node lacks", 903, false) + ErrNodeNotMatch = newMilvusError("node not match", 904, false) // IO related ErrIoKeyNotFound = newMilvusError("key not found", 1000, false) diff --git a/internal/util/merr/utils.go b/internal/util/merr/utils.go index 36e2c5f59c..e15fdcb06b 100644 --- a/internal/util/merr/utils.go +++ b/internal/util/merr/utils.go @@ -83,6 +83,8 @@ func oldCode(code int32) commonpb.ErrorCode { return commonpb.ErrorCode_NotReadyServe case ErrCollectionNotFound.code(): return commonpb.ErrorCode_CollectionNotExists + case ErrNodeNotMatch.code(): + return commonpb.ErrorCode_NodeIDNotMatch default: return commonpb.ErrorCode_UnexpectedError } @@ -300,6 +302,14 @@ func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error { return err } +func WrapErrNodeNotMatch(expectedNodeID, actualNodeID int64, msg ...string) error { + err := errors.Wrapf(ErrNodeNotMatch, "expectedNodeID=%d, actualNodeID=%d", expectedNodeID, actualNodeID) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "; ")) + } + return err +} + // IO related func WrapErrIoKeyNotFound(key string, msg ...string) error { err := errors.Wrapf(ErrIoKeyNotFound, "key=%s", key)