mirror of https://github.com/milvus-io/milvus.git
Remove querynode grpc error (#12532)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>pull/12576/head
parent
e6d9be3dfa
commit
3f060f4028
|
@ -13,7 +13,6 @@ package querynode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"go.uber.org/zap"
|
||||
|
@ -42,7 +41,7 @@ func (node *QueryNode) GetComponentStates(ctx context.Context) (*internalpb.Comp
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: errMsg,
|
||||
}
|
||||
return stats, errors.New(errMsg)
|
||||
return stats, nil
|
||||
}
|
||||
nodeID := common.NotRegisteredID
|
||||
if node.session != nil && node.session.Registered() {
|
||||
|
@ -90,7 +89,7 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
dct := &addQueryChannelTask{
|
||||
baseTask: baseTask{
|
||||
|
@ -108,7 +107,7 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("addQueryChannelTask Enqueue done", zap.Any("collectionID", in.CollectionID))
|
||||
|
||||
|
@ -120,7 +119,7 @@ func (node *QueryNode) AddQueryChannel(ctx context.Context, in *queryPb.AddQuery
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("addQueryChannelTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
|
||||
return &commonpb.Status{
|
||||
|
@ -191,7 +190,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
dct := &watchDmChannelsTask{
|
||||
baseTask: baseTask{
|
||||
|
@ -209,7 +208,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("watchDmChannelsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
|
||||
|
||||
|
@ -221,7 +220,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, in *queryPb.WatchDmC
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("watchDmChannelsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
|
||||
return &commonpb.Status{
|
||||
|
@ -241,7 +240,7 @@ func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.Watch
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
dct := &watchDeltaChannelsTask{
|
||||
baseTask: baseTask{
|
||||
|
@ -259,7 +258,7 @@ func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.Watch
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("watchDeltaChannelsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
|
||||
|
||||
|
@ -271,7 +270,7 @@ func (node *QueryNode) WatchDeltaChannels(ctx context.Context, in *queryPb.Watch
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("watchDeltaChannelsTask WaitToFinish done", zap.Any("collectionID", in.CollectionID))
|
||||
return &commonpb.Status{
|
||||
|
@ -291,7 +290,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
dct := &loadSegmentsTask{
|
||||
baseTask: baseTask{
|
||||
|
@ -309,7 +308,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
segmentIDs := make([]UniqueID, 0)
|
||||
for _, info := range in.Infos {
|
||||
|
@ -325,7 +324,7 @@ func (node *QueryNode) LoadSegments(ctx context.Context, in *queryPb.LoadSegment
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("loadSegmentsTask WaitToFinish done", zap.Int64s("segmentIDs", segmentIDs))
|
||||
return &commonpb.Status{
|
||||
|
@ -345,7 +344,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
dct := &releaseCollectionTask{
|
||||
baseTask: baseTask{
|
||||
|
@ -363,7 +362,7 @@ func (node *QueryNode) ReleaseCollection(ctx context.Context, in *queryPb.Releas
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("releaseCollectionTask Enqueue done", zap.Any("collectionID", in.CollectionID))
|
||||
|
||||
|
@ -391,7 +390,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
dct := &releasePartitionsTask{
|
||||
baseTask: baseTask{
|
||||
|
@ -409,7 +408,7 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, in *queryPb.Releas
|
|||
Reason: err.Error(),
|
||||
}
|
||||
log.Error(err.Error())
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
log.Debug("releasePartitionsTask Enqueue done", zap.Any("collectionID", in.CollectionID))
|
||||
|
||||
|
@ -437,7 +436,7 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, in *queryPb.ReleaseS
|
|||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, err
|
||||
return status, nil
|
||||
}
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -470,7 +469,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
|
|||
Reason: err.Error(),
|
||||
},
|
||||
}
|
||||
return res, err
|
||||
return res, nil
|
||||
}
|
||||
infos := make([]*queryPb.SegmentInfo, 0)
|
||||
|
||||
|
@ -485,7 +484,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
|
|||
Reason: err.Error(),
|
||||
},
|
||||
}
|
||||
return res, err
|
||||
return res, nil
|
||||
}
|
||||
infos = append(infos, historicalSegmentInfos...)
|
||||
|
||||
|
@ -500,7 +499,7 @@ func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *queryPb.GetSegmen
|
|||
Reason: err.Error(),
|
||||
},
|
||||
}
|
||||
return res, err
|
||||
return res, nil
|
||||
}
|
||||
infos = append(infos, streamingSegmentInfos...)
|
||||
// log.Debug("GetSegmentInfo: get segment info from query node", zap.Int64("nodeID", node.session.ServerID), zap.Any("segment infos", infos))
|
||||
|
@ -569,7 +568,7 @@ func (node *QueryNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsR
|
|||
zap.Any("metrics", metrics), // TODO(dragondriver): necessary? may be very large
|
||||
zap.Error(err))
|
||||
|
||||
return metrics, err
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
log.Debug("QueryNode.GetMetrics failed, request metric type is not implemented yet",
|
||||
|
|
|
@ -48,7 +48,7 @@ func TestImpl_GetComponentStates(t *testing.T) {
|
|||
node.stateCode = atomic.Value{}
|
||||
node.stateCode.Store("invalid")
|
||||
rsp, err = node.GetComponentStates(ctx)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ func TestImpl_AddQueryChannel(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
status, err = node.AddQueryChannel(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
}
|
||||
|
||||
|
@ -137,7 +137,7 @@ func TestImpl_WatchDmChannels(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
status, err = node.WatchDmChannels(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
}
|
||||
|
||||
|
@ -165,7 +165,7 @@ func TestImpl_LoadSegments(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
status, err = node.LoadSegments(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
}
|
||||
|
||||
|
@ -190,7 +190,7 @@ func TestImpl_ReleaseCollection(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
status, err = node.ReleaseCollection(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
}
|
||||
|
||||
|
@ -216,7 +216,7 @@ func TestImpl_ReleasePartitions(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
status, err = node.ReleasePartitions(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
|
||||
}
|
||||
|
||||
|
@ -243,7 +243,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
rsp, err = node.GetSegmentInfo(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
|
@ -360,7 +360,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
rsp, err = node.GetSegmentInfo(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
|
@ -379,7 +379,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
|
|||
|
||||
node.streaming.replica.(*collectionReplica).partitions = make(map[UniqueID]*Partition)
|
||||
rsp, err := node.GetSegmentInfo(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
|
@ -398,7 +398,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
|
|||
|
||||
node.streaming.replica.(*collectionReplica).segments = make(map[UniqueID]*Segment)
|
||||
rsp, err := node.GetSegmentInfo(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
|
@ -417,7 +417,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
|
|||
|
||||
node.historical.replica.(*collectionReplica).partitions = make(map[UniqueID]*Partition)
|
||||
rsp, err := node.GetSegmentInfo(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
|
@ -436,7 +436,7 @@ func TestImpl_GetSegmentInfo(t *testing.T) {
|
|||
|
||||
node.historical.replica.(*collectionReplica).segments = make(map[UniqueID]*Segment)
|
||||
rsp, err := node.GetSegmentInfo(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, rsp.Status.ErrorCode)
|
||||
})
|
||||
}
|
||||
|
@ -530,7 +530,7 @@ func TestImpl_ReleaseSegments(t *testing.T) {
|
|||
|
||||
node.UpdateStateCode(internalpb.StateCode_Abnormal)
|
||||
_, err = node.ReleaseSegments(ctx, req)
|
||||
assert.Error(t, err)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("test segment not exists", func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue