Refine errors of datanode (#22852)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/23038/head
aoiasd 2023-03-28 19:04:00 +08:00 committed by GitHub
parent e3c3c949c4
commit 5d172d0f4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 120 additions and 256 deletions

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
@ -206,7 +207,7 @@ func (c *ChannelMeta) addSegment(req addSegmentReq) error {
log.Warn("failed to addSegment, collection mismatch",
zap.Int64("current collection ID", req.collID),
zap.Int64("expected collection ID", c.collectionID))
return fmt.Errorf("failed to addSegment, mismatch collection, ID=%d", req.collID)
return merr.WrapErrParameterInvalid(c.collectionID, req.collID, "collection not match")
}
log.Info("adding segment",
zap.String("type", req.segType.String()),
@ -533,7 +534,7 @@ func (c *ChannelMeta) getCollectionSchema(collID UniqueID, ts Timestamp) (*schem
log.Warn("failed to getCollectionSchema, collection mismatch",
zap.Int64("current collection ID", collID),
zap.Int64("expected collection ID", c.collectionID))
return nil, fmt.Errorf("failed to getCollectionSchema, mismatch collection, want %d, actual %d", c.collectionID, collID)
return nil, merr.WrapErrParameterInvalid(c.collectionID, collID, "collection not match")
}
c.schemaMut.RLock()
@ -569,7 +570,7 @@ func (c *ChannelMeta) mergeFlushedSegments(ctx context.Context, seg *Segment, pl
log.Warn("failed to mergeFlushedSegments, collection mismatch",
zap.Int64("current collection ID", seg.collectionID),
zap.Int64("expected collection ID", c.collectionID))
return errors.Newf("failed to mergeFlushedSegments, mismatch collection, ID=%d", seg.collectionID)
return merr.WrapErrParameterInvalid(c.collectionID, seg.collectionID, "collection not match")
}
var inValidSegments []UniqueID
@ -619,7 +620,7 @@ func (c *ChannelMeta) addFlushedSegmentWithPKs(segID, collID, partID UniqueID, n
log.Warn("failed to addFlushedSegmentWithPKs, collection mismatch",
zap.Int64("current collection ID", collID),
zap.Int64("expected collection ID", c.collectionID))
return fmt.Errorf("failed to addFlushedSegmentWithPKs, mismatch collection, ID=%d", collID)
return merr.WrapErrParameterInvalid(c.collectionID, collID, "collection not match")
}
log.Info("Add Flushed segment",

View File

@ -1,36 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"fmt"
"github.com/cockroachdb/errors"
)
var (
// errSegmentStatsNotChanged error stands for segment stats not changed.
errSegmentStatsNotChanged = errors.New("segment stats not changed")
)
func msgDataNodeIsUnhealthy(nodeID UniqueID) string {
return fmt.Sprintf("DataNode %d is not ready", nodeID)
}
func errDataNodeIsUnhealthy(nodeID UniqueID) error {
return errors.New(msgDataNodeIsUnhealthy(nodeID))
}

View File

@ -1,40 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datanode
import (
"testing"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
func TestMsgDataNodeIsUnhealthy(t *testing.T) {
nodeIDList := []typeutil.UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestMsgDataNodeIsUnhealthy", zap.String("msg", msgDataNodeIsUnhealthy(nodeID)))
}
}
func TestErrDataNodeIsUnhealthy(t *testing.T) {
nodeIDList := []typeutil.UniqueID{1, 2, 3}
for _, nodeID := range nodeIDList {
log.Info("TestErrDataNodeIsUnhealthy", zap.Error(errDataNodeIsUnhealthy(nodeID)))
}
}

View File

@ -29,6 +29,7 @@ import (
"github.com/milvus-io/milvus/internal/metrics"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/util/hardware"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/paramtable"
)
@ -147,7 +148,7 @@ func (fm *flowgraphManager) getFlushCh(segID UniqueID) (chan<- flushMsg, error)
return flushCh, nil
}
return nil, fmt.Errorf("cannot find segment %d in all flowgraphs", segID)
return nil, merr.WrapErrSegmentNotFound(segID, "failed to get flush channel has this segment")
}
func (fm *flowgraphManager) getChannel(segID UniqueID) (Channel, error) {

View File

@ -18,12 +18,12 @@ package datanode
import (
"context"
"fmt"
"reflect"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/paramtable"
"go.uber.org/zap"
@ -74,11 +74,14 @@ func (mService *metaService) getCollectionInfo(ctx context.Context, collID Uniqu
response, err := mService.rootCoord.DescribeCollectionInternal(ctx, req)
if err != nil {
return nil, fmt.Errorf("grpc error when describe collection %v from rootcoord: %s", collID, err.Error())
log.Error("grpc error when describe", zap.Int64("collectionID", collID), zap.Error(err))
return nil, err
}
if response.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
return nil, fmt.Errorf("describe collection %v from rootcoord wrong: %s", collID, response.GetStatus().GetReason())
err := merr.Error(response.Status)
log.Error("describe collection from rootcoord failed", zap.Int64("collectionID", collID), zap.Error(err))
return nil, err
}
return response, nil

View File

@ -42,6 +42,7 @@ import (
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/commonpbutil"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
@ -55,6 +56,7 @@ import (
func (node *DataNode) WatchDmChannels(ctx context.Context, in *datapb.WatchDmChannelsRequest) (*commonpb.Status, error) {
log.Warn("DataNode WatchDmChannels is not in use")
// TODO ERROR OF GRPC NOT IN USE
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "watchDmChannels do nothing",
@ -76,7 +78,7 @@ func (node *DataNode) GetComponentStates(ctx context.Context) (*milvuspb.Compone
StateCode: node.stateCode.Load().(commonpb.StateCode),
},
SubcomponentStates: make([]*milvuspb.ComponentInfo, 0),
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Status: merr.Status(nil),
}
return states, nil
}
@ -91,13 +93,11 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
fmt.Sprint(paramtable.GetNodeID()),
MetricRequestsTotal).Inc()
errStatus := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}
if !node.isHealthy() {
errStatus.Reason = "dataNode not in HEALTHY state"
return errStatus, nil
err := merr.WrapErrServiceNotReady(node.GetStateCode().String())
log.Warn("DataNode.FlushSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
}
serverID := node.GetSession().ServerID
@ -106,11 +106,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
zap.Int64("targetID", req.GetBase().GetTargetID()),
zap.Int64("serverID", serverID),
)
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_NodeIDNotMatch,
Reason: common.WrapNodeIDNotMatchMsg(req.GetBase().GetTargetID(), serverID),
}
return status, nil
return merr.Status(merr.WrapErrNodeNotMatch(req.GetBase().GetTargetID(), serverID)), nil
}
log.Info("receiving FlushSegments request",
@ -130,9 +127,8 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
// If no flush channel is found, report an error.
flushCh, err := node.flowgraphManager.getFlushCh(segID)
if err != nil {
errStatus.Reason = "no flush channel found for the segment, unable to flush"
log.Error(errStatus.Reason, zap.Int64("segmentID", segID), zap.Error(err))
return errStatus, nil
log.Error("no flush channel found for the segment, unable to flush", zap.Int64("segmentID", segID), zap.Error(err))
return merr.Status(err), nil
}
// Double check that the segment is still not cached.
@ -163,9 +159,7 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
metrics.DataNodeFlushReqCounter.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
MetricRequestsSuccess).Inc()
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
return merr.Status(nil), nil
}
// ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message.
@ -177,9 +171,7 @@ func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.Resend
log.Info("found segment(s) with stats to resend",
zap.Int64s("segment IDs", segResent))
return &datapb.ResendSegmentStatsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: merr.Status(nil),
SegResent: segResent,
}, nil
}
@ -187,18 +179,14 @@ func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.Resend
// GetTimeTickChannel currently do nothing
func (node *DataNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: merr.Status(nil),
}, nil
}
// GetStatisticsChannel currently do nothing
func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: merr.Status(nil),
}, nil
}
@ -206,16 +194,11 @@ func (node *DataNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
log.Debug("DataNode.ShowConfigurations", zap.String("pattern", req.Pattern))
if !node.isHealthy() {
log.Warn("DataNode.ShowConfigurations failed",
zap.Int64("nodeId", paramtable.GetNodeID()),
zap.String("req", req.Pattern),
zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID())))
err := merr.WrapErrServiceNotReady(node.GetStateCode().String())
log.Warn("DataNode.ShowConfigurations failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()),
},
Status: merr.Status(err),
Configuations: nil,
}, nil
}
@ -229,10 +212,7 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh
}
return &internalpb.ShowConfigurationsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Status: merr.Status(nil),
Configuations: configList,
}, nil
}
@ -240,16 +220,11 @@ func (node *DataNode) ShowConfigurations(ctx context.Context, req *internalpb.Sh
// GetMetrics return datanode metrics
func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if !node.isHealthy() {
log.Warn("DataNode.GetMetrics failed",
zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("req", req.Request),
zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID())))
err := merr.WrapErrServiceNotReady(node.GetStateCode().String())
log.Warn("DataNode.GetMetrics failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()),
},
Status: merr.Status(err),
}, nil
}
@ -261,10 +236,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()),
},
Status: merr.Status(err),
}, nil
}
@ -273,10 +245,7 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
if err != nil {
log.Warn("DataNode GetMetrics failed", zap.Int64("nodeID", paramtable.GetNodeID()), zap.Error(err))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: fmt.Sprintf("datanode GetMetrics failed, nodeID=%d, err=%s", paramtable.GetNodeID(), err.Error()),
},
Status: merr.Status(err),
}, nil
}
@ -289,31 +258,28 @@ func (node *DataNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
zap.String("metric_type", metricType))
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Status: merr.Status(merr.WrapErrMetricNotFound(metricType)),
}, nil
}
// Compaction handles compaction request from DataCoord
// returns status as long as compaction task enqueued or invalid
func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan) (*commonpb.Status, error) {
status := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
if !node.isHealthy() {
err := merr.WrapErrServiceNotReady(node.GetStateCode().String())
log.Warn("DataNode.Compaction failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
}
ds, ok := node.flowgraphManager.getFlowgraphService(req.GetChannel())
if !ok {
log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channel name", req.GetChannel()))
status.Reason = errIllegalCompactionPlan.Error()
return status, nil
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil
}
if !node.compactionExecutor.channelValidateForCompaction(req.GetChannel()) {
log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channel name", req.GetChannel()))
status.Reason = "channel marked invalid"
return status, nil
return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil
}
binlogIO := &binlogIO{node.chunkManager, ds.idAllocator}
@ -329,20 +295,17 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan
node.compactionExecutor.execute(task)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
return merr.Status(nil), nil
}
// GetCompactionState called by DataCoord
// return status of all compaction plans
func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.CompactionStateRequest) (*datapb.CompactionStateResponse, error) {
if !node.isHealthy() {
err := merr.WrapErrServiceNotReady(node.GetStateCode().String())
log.Warn("DataNode.GetCompactionState failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return &datapb.CompactionStateResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "DataNode is unhealthy",
},
Status: merr.Status(err),
}, nil
}
results := make([]*datapb.CompactionStateResult, 0)
@ -366,7 +329,7 @@ func (node *DataNode) GetCompactionState(ctx context.Context, req *datapb.Compac
log.Info("Compaction results", zap.Any("results", results))
}
return &datapb.CompactionStateResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
Status: merr.Status(nil),
Results: results,
}, nil
}
@ -379,16 +342,15 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
zap.Int64s("compacted from", req.GetCompactedFrom()),
zap.Int64("numOfRows", req.GetNumOfRows()),
)
status := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError}
if !node.isHealthy() {
status.Reason = "DataNode is unhealthy"
return status, nil
err := merr.WrapErrServiceNotReady(node.GetStateCode().String())
log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
}
if len(req.GetCompactedFrom()) <= 0 {
status.Reason = "invalid request, compacted from segments shouldn't be empty"
return status, nil
return merr.Status(merr.WrapErrParameterInvalid(">0", "0", "compacted from segments shouldn't be empty")), nil
}
var (
@ -415,8 +377,7 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
}
if oneSegment == 0 {
log.Ctx(ctx).Warn("no valid segment, maybe the request is a retry")
status.ErrorCode = commonpb.ErrorCode_Success
return status, nil
return merr.Status(nil), nil
}
// oneSegment is definitely in the channel, guaranteed by the check before.
@ -430,20 +391,17 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
err = channel.InitPKstats(ctx, targetSeg, req.GetStatsLogs(), tsoutil.GetCurrentTime())
if err != nil {
status.Reason = fmt.Sprintf("init pk stats fail, err=%s", err.Error())
return status, nil
return merr.Status(err), nil
}
// block all flow graph so it's safe to remove segment
ds.fg.Blockall()
defer ds.fg.Unblock()
if err := channel.mergeFlushedSegments(ctx, targetSeg, req.GetPlanID(), req.GetCompactedFrom()); err != nil {
status.Reason = err.Error()
return status, nil
return merr.Status(err), nil
}
node.compactionExecutor.injectDone(req.GetPlanID())
status.ErrorCode = commonpb.ErrorCode_Success
return status, nil
return merr.Status(nil), nil
}
// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments
@ -459,9 +417,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
}()
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: merr.Status(nil),
TaskId: req.GetImportTask().TaskId,
DatanodeId: paramtable.GetNodeID(),
State: commonpb.ImportState_ImportStarted,
@ -488,16 +444,9 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
}
if !node.isHealthy() {
log.Warn("DataNode import failed",
zap.Int64("collection ID", req.GetImportTask().GetCollectionId()),
zap.Int64("partition ID", req.GetImportTask().GetPartitionId()),
zap.Int64("task ID", req.GetImportTask().GetTaskId()),
zap.Error(errDataNodeIsUnhealthy(paramtable.GetNodeID())))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msgDataNodeIsUnhealthy(paramtable.GetNodeID()),
}, nil
err := merr.WrapErrServiceNotReady(node.GetStateCode().String())
log.Warn("DataNode.SyncSegments failed", zap.Int64("nodeId", paramtable.GetNodeID()), zap.Error(err))
return merr.Status(err), nil
}
// get a timestamp for all the rows
@ -520,10 +469,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
log.Warn("fail to report import state to RootCoord", zap.Error(reportErr))
}
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: msg,
}, nil
return merr.Status(err), nil
}
}
@ -543,10 +489,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
if reportErr != nil {
log.Warn("fail to report import state to RootCoord", zap.Error(err))
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
}, nil
return merr.Status(err), nil
}
returnFailFunc := func(inputErr error) (*commonpb.Status, error) {
@ -559,10 +502,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
if reportErr != nil {
log.Warn("fail to report import state to RootCoord", zap.Error(inputErr))
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: inputErr.Error(),
}, nil
return merr.Status(err), nil
}
// parse files and generate segments
@ -585,10 +525,7 @@ func (node *DataNode) Import(ctx context.Context, req *datapb.ImportTaskRequest)
return returnFailFunc(err)
}
resp := &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}
return resp, nil
return merr.Status(nil), nil
}
// AddImportSegment adds the import segment to the current DataNode.
@ -606,7 +543,7 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
var ok bool
ds, ok = node.flowgraphManager.getFlowgraphService(req.GetChannelName())
if !ok {
return errors.New("channel not found")
return merr.WrapErrChannelNotFound(req.ChannelName)
}
return nil
}, retry.Attempts(getFlowGraphServiceAttempts))
@ -615,22 +552,14 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
zap.String("channel name", req.GetChannelName()),
zap.Int64("node ID", paramtable.GetNodeID()))
return &datapb.AddImportSegmentResponse{
Status: &commonpb.Status{
// TODO: Add specific error code.
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "channel not found in current DataNode",
},
Status: merr.Status(err),
}, nil
}
// Get the current dml channel position ID, that will be used in segments start positions and end positions.
posID, err := ds.getChannelLatestMsgID(context.Background(), req.GetChannelName(), req.GetSegmentId())
if err != nil {
return &datapb.AddImportSegmentResponse{
Status: &commonpb.Status{
// TODO: Add specific error code.
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: "failed to get channel position",
},
Status: merr.Status(merr.WrapErrChannelNotFound(req.ChannelName, "failed to get channel position")),
}, nil
}
// Add the new segment to the channel.
@ -663,19 +592,13 @@ func (node *DataNode) AddImportSegment(ctx context.Context, req *datapb.AddImpor
log.Error("failed to add segment to flow graph",
zap.Error(err))
return &datapb.AddImportSegmentResponse{
Status: &commonpb.Status{
// TODO: Add specific error code.
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Status: merr.Status(err),
}, nil
}
}
ds.flushingSegCache.Remove(req.GetSegmentId())
return &datapb.AddImportSegmentResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: merr.Status(nil),
ChannelPos: posID,
}, nil
}
@ -723,9 +646,7 @@ func assignSegmentFunc(node *DataNode, req *datapb.ImportTaskRequest) importutil
// ignore the returned error, since even report failed the segments still can be cleaned
retry.Do(context.Background(), func() error {
importResult := &rootcoordpb.ImportResult{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Status: merr.Status(nil),
TaskId: req.GetImportTask().TaskId,
DatanodeId: paramtable.GetNodeID(),
State: commonpb.ImportState_ImportStarted,

View File

@ -44,6 +44,7 @@ import (
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/importutil"
"github.com/milvus-io/milvus/internal/util/merr"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/sessionutil"
@ -117,7 +118,7 @@ func (s *DataNodeServicesSuite) TestNotInUseAPIs() {
s.Run("WatchDmChannels", func() {
status, err := s.node.WatchDmChannels(s.ctx, &datapb.WatchDmChannelsRequest{})
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode)
s.Assert().True(merr.Ok(status))
})
s.Run("GetTimeTickChannel", func() {
_, err := s.node.GetTimeTickChannel(s.ctx)
@ -133,14 +134,14 @@ func (s *DataNodeServicesSuite) TestNotInUseAPIs() {
func (s *DataNodeServicesSuite) TestGetComponentStates() {
resp, err := s.node.GetComponentStates(s.ctx)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().Equal(common.NotRegisteredID, resp.State.NodeID)
s.node.SetSession(&sessionutil.Session{})
s.node.session.UpdateRegistered(true)
resp, err = s.node.GetComponentStates(context.Background())
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
s.Assert().True(merr.Ok(resp.GetStatus()))
}
func (s *DataNodeServicesSuite) TestGetCompactionState() {
@ -184,7 +185,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() {
node := &DataNode{}
node.UpdateStateCode(commonpb.StateCode_Abnormal)
resp, _ := node.GetCompactionState(s.ctx, nil)
s.Assert().Equal("DataNode is unhealthy", resp.GetStatus().GetReason())
s.Assert().Equal(merr.Code(merr.ErrServiceNotReady), resp.GetStatus().GetCode())
})
}
@ -231,7 +232,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
status, err := s.node.FlushSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode)
s.Assert().True(merr.Ok(status))
}()
go func() {
@ -273,7 +274,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
// dup call
status, err := s.node.FlushSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode)
s.Assert().True(merr.Ok(status))
// failure call
req = &datapb.FlushSegmentsRequest{
@ -286,7 +287,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
}
status, err = s.node.FlushSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_NodeIDNotMatch, status.ErrorCode)
s.Assert().Equal(merr.Code(merr.ErrNodeNotMatch), status.GetCode())
req = &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{
@ -299,7 +300,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
status, err = s.node.FlushSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
s.Assert().False(merr.Ok(status))
req = &datapb.FlushSegmentsRequest{
Base: &commonpb.MsgBase{
@ -312,7 +313,7 @@ func (s *DataNodeServicesSuite) TestFlushSegments() {
status, err = s.node.FlushSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.ErrorCode)
s.Assert().True(merr.Ok(status))
}
func (s *DataNodeServicesSuite) TestShowConfigurations() {
@ -332,12 +333,12 @@ func (s *DataNodeServicesSuite) TestShowConfigurations() {
resp, err := node.ShowConfigurations(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
s.Assert().False(merr.Ok(resp.GetStatus()))
node.stateCode.Store(commonpb.StateCode_Healthy)
resp, err = node.ShowConfigurations(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().Equal(1, len(resp.Configuations))
s.Assert().Equal("datanode.port", resp.Configuations[0].Key)
}
@ -350,7 +351,7 @@ func (s *DataNodeServicesSuite) TestGetMetrics() {
node.stateCode.Store(commonpb.StateCode_Abnormal)
resp, err := node.GetMetrics(s.ctx, &milvuspb.GetMetricsRequest{})
s.Assert().NoError(err)
s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
s.Assert().False(merr.Ok(resp.GetStatus()))
node.stateCode.Store(commonpb.StateCode_Healthy)
@ -360,7 +361,7 @@ func (s *DataNodeServicesSuite) TestGetMetrics() {
Request: invalidRequest,
})
s.Assert().NoError(err)
s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
s.Assert().False(merr.Ok(resp.GetStatus()))
// unsupported metric type
unsupportedMetricType := "unsupported"
@ -368,14 +369,14 @@ func (s *DataNodeServicesSuite) TestGetMetrics() {
s.Assert().NoError(err)
resp, err = node.GetMetrics(s.ctx, req)
s.Assert().NoError(err)
s.Assert().NotEqual(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
s.Assert().False(merr.Ok(resp.GetStatus()))
// normal case
req, err = metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
s.Assert().NoError(err)
resp, err = node.GetMetrics(node.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode)
s.Assert().True(merr.Ok(resp.GetStatus()))
log.Info("Test DataNode.GetMetrics",
zap.String("name", resp.ComponentName),
zap.String("response", resp.Response))
@ -453,7 +454,7 @@ func (s *DataNodeServicesSuite) TestImport() {
stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetErrorCode())
s.Assert().True(merr.Ok(stat))
s.Assert().Equal("", stat.GetReason())
})
@ -504,7 +505,7 @@ func (s *DataNodeServicesSuite) TestImport() {
}
stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetErrorCode())
s.Assert().True(merr.Ok(stat))
s.Assert().Equal("", stat.GetReason())
})
s.Run("Test Import report import error", func() {
@ -537,7 +538,7 @@ func (s *DataNodeServicesSuite) TestImport() {
}
stat, err := s.node.Import(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode())
s.Assert().False(merr.Ok(stat))
})
s.Run("Test Import error", func() {
@ -550,16 +551,16 @@ func (s *DataNodeServicesSuite) TestImport() {
}
stat, err := s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.ErrorCode)
s.Assert().False(merr.Ok(stat))
stat, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, returnError), req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode())
s.Assert().False(merr.Ok(stat))
s.node.stateCode.Store(commonpb.StateCode_Abnormal)
stat, err = s.node.Import(context.WithValue(s.ctx, ctxKey{}, ""), req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetErrorCode())
s.Assert().False(merr.Ok(stat))
})
}
@ -592,7 +593,7 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
_, ok = s.node.flowgraphManager.getFlowgraphService(chName2)
s.Assert().True(ok)
stat, err := s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{
resp, err := s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{
SegmentId: 100,
CollectionId: 100,
PartitionId: 100,
@ -600,12 +601,12 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
RowNum: 500,
})
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, stat.GetStatus().GetErrorCode())
s.Assert().Equal("", stat.GetStatus().GetReason())
s.Assert().NotEqual(nil, stat.GetChannelPos())
s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().Equal("", resp.GetStatus().GetReason())
s.Assert().NotEqual(nil, resp.GetChannelPos())
getFlowGraphServiceAttempts = 3
stat, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{
resp, err = s.node.AddImportSegment(context.WithValue(s.ctx, ctxKey{}, ""), &datapb.AddImportSegmentRequest{
SegmentId: 100,
CollectionId: 100,
PartitionId: 100,
@ -613,7 +614,9 @@ func (s *DataNodeServicesSuite) TestAddImportSegment() {
RowNum: 500,
})
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, stat.GetStatus().GetErrorCode())
// TODO ASSERT COMBINE ERROR
s.Assert().False(merr.Ok(resp.GetStatus()))
// s.Assert().Equal(merr.Code(merr.ErrChannelNotFound), stat.GetStatus().GetCode())
})
}
@ -651,12 +654,12 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
req.CompactedFrom = []UniqueID{}
status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
s.Assert().False(merr.Ok(status))
req.CompactedFrom = []UniqueID{101, 201}
status, err = s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
s.Assert().True(merr.Ok(status))
})
s.Run("valid request numRows>0", func() {
@ -669,11 +672,11 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
cancel()
status, err := s.node.SyncSegments(cancelCtx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
s.Assert().False(merr.Ok(status))
status, err = s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
s.Assert().True(merr.Ok(status))
s.Assert().True(fg.channel.hasSegment(req.CompactedTo, true))
s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[0], true))
@ -681,7 +684,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
status, err = s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
s.Assert().True(merr.Ok(status))
})
s.Run("valid request numRows=0", func() {
@ -702,7 +705,7 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
}
status, err := s.node.SyncSegments(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, status.GetErrorCode())
s.Assert().True(merr.Ok(status))
s.Assert().False(fg.channel.hasSegment(req.CompactedTo, true))
s.Assert().False(fg.channel.hasSegment(req.CompactedFrom[0], true))
@ -762,12 +765,12 @@ func (s *DataNodeServicesSuite) TestResendSegmentStats() {
resp, err := s.node.ResendSegmentStats(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent())
// Duplicate call.
resp, err = s.node.ResendSegmentStats(s.ctx, req)
s.Assert().NoError(err)
s.Assert().Equal(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent())
}

View File

@ -87,6 +87,7 @@ var (
ErrNodeNotFound = newMilvusError("node not found", 901, false)
ErrNodeOffline = newMilvusError("node offline", 902, false)
ErrNodeLack = newMilvusError("node lacks", 903, false)
ErrNodeNotMatch = newMilvusError("node not match", 904, false)
// IO related
ErrIoKeyNotFound = newMilvusError("key not found", 1000, false)

View File

@ -83,6 +83,8 @@ func oldCode(code int32) commonpb.ErrorCode {
return commonpb.ErrorCode_NotReadyServe
case ErrCollectionNotFound.code():
return commonpb.ErrorCode_CollectionNotExists
case ErrNodeNotMatch.code():
return commonpb.ErrorCode_NodeIDNotMatch
default:
return commonpb.ErrorCode_UnexpectedError
}
@ -300,6 +302,14 @@ func WrapErrNodeLack(expectedNum, actualNum int64, msg ...string) error {
return err
}
func WrapErrNodeNotMatch(expectedNodeID, actualNodeID int64, msg ...string) error {
err := errors.Wrapf(ErrNodeNotMatch, "expectedNodeID=%d, actualNodeID=%d", expectedNodeID, actualNodeID)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "; "))
}
return err
}
// IO related
func WrapErrIoKeyNotFound(key string, msg ...string) error {
err := errors.Wrapf(ErrIoKeyNotFound, "key=%s", key)