mirror of https://github.com/milvus-io/milvus.git
Fix some wrong ut (#23990)
Signed-off-by: lixinguo <xinguo.li@zilliz.com> Co-authored-by: lixinguo <xinguo.li@zilliz.com>pull/23913/head
parent
f7c7df33cb
commit
146050db82
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
"github.com/blang/semver/v4"
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
||||
clientv3 "go.etcd.io/etcd/client/v3"
|
||||
"go.uber.org/zap"
|
||||
|
@ -880,7 +881,7 @@ func (s *Server) startFlushLoop(ctx context.Context) {
|
|||
func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
|
||||
segment := s.meta.GetHealthySegment(segmentID)
|
||||
if segment == nil {
|
||||
return errors.New("segment not found, might be a faked segment, ignore post flush")
|
||||
return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush")
|
||||
}
|
||||
// set segment to SegmentState_Flushed
|
||||
if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
|
||||
|
|
|
@ -57,6 +57,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/mq/msgstream"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -3305,7 +3306,7 @@ func TestPostFlush(t *testing.T) {
|
|||
defer closeTestServer(t, svr)
|
||||
|
||||
err := svr.postFlush(context.Background(), 1)
|
||||
assert.EqualValues(t, errors.New("segment not found, might be a faked segment, ignore post flush"), err)
|
||||
assert.ErrorIs(t, err, merr.ErrSegmentNotFound)
|
||||
})
|
||||
|
||||
t.Run("success post flush", func(t *testing.T) {
|
||||
|
|
|
@ -1783,8 +1783,9 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe
|
|||
// GetIndexStatistics get the information of index.
|
||||
func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.GetIndexStatisticsRequest) (*milvuspb.GetIndexStatisticsResponse, error) {
|
||||
if !node.checkHealthy() {
|
||||
err := merr.WrapErrServiceNotReady(fmt.Sprintf("proxy %d is unhealthy", paramtable.GetNodeID()))
|
||||
return &milvuspb.GetIndexStatisticsResponse{
|
||||
Status: unhealthyStatus(),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4440,7 +4440,8 @@ func TestProxy_GetLoadState(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestProxy_GetIndexStatistics(t *testing.T) {
|
||||
func TestUnhealthProxy_GetIndexStatistics(t *testing.T) {
|
||||
// check GetIndexStatistics when proxy is unhealthy
|
||||
factory := dependency.NewDefaultFactory(true)
|
||||
ctx := context.Background()
|
||||
proxy, err := NewProxy(ctx, factory)
|
||||
|
@ -4454,6 +4455,6 @@ func TestProxy_GetIndexStatistics(t *testing.T) {
|
|||
IndexName: "",
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.Status.ErrorCode)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -101,12 +101,6 @@ func TestStatisticTask_all(t *testing.T) {
|
|||
collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName)
|
||||
assert.NoError(t, err)
|
||||
|
||||
qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{
|
||||
Status: &successStatus,
|
||||
CollectionIDs: []int64{collectionID},
|
||||
InMemoryPercentages: []int64{100},
|
||||
}, nil)
|
||||
|
||||
status, err := qc.LoadCollection(ctx, &querypb.LoadCollectionRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_LoadCollection,
|
||||
|
|
|
@ -2343,7 +2343,7 @@ func Test_dropCollectionTask_Execute(t *testing.T) {
|
|||
|
||||
dct.DropCollectionRequest.CollectionName = "c2"
|
||||
err = dct.Execute(ctx)
|
||||
assert.NoError(t, err)
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, dct.result.GetErrorCode())
|
||||
}
|
||||
|
||||
|
|
|
@ -361,7 +361,7 @@ func (suite *ServiceSuite) TestResourceGroup() {
|
|||
|
||||
resp, err = server.CreateResourceGroup(ctx, createRG)
|
||||
suite.NoError(err)
|
||||
suite.True(merr.Ok(resp))
|
||||
suite.False(merr.Ok(resp))
|
||||
|
||||
listRG := &milvuspb.ListResourceGroupsRequest{}
|
||||
resp1, err := server.ListResourceGroups(ctx, listRG)
|
||||
|
|
|
@ -109,10 +109,10 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis
|
|||
zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp()))
|
||||
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return &internalpb.GetStatisticsResponse{
|
||||
Status: util.WrapStatus(commonpb.ErrorCode_UnexpectedError,
|
||||
fmt.Sprintf("query node %d is not healthy", paramtable.GetNodeID()),
|
||||
),
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
@ -194,8 +194,8 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
|
|||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
log.Warn(msg)
|
||||
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -331,12 +331,10 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC
|
|||
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID())
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, nil
|
||||
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -375,7 +373,8 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart
|
|||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -415,8 +414,9 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
|||
)
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID())
|
||||
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
node.lifetime.Done()
|
||||
|
||||
|
@ -477,7 +477,8 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen
|
|||
func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -495,12 +496,9 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.Relea
|
|||
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID())
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, nil
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -531,7 +529,8 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
|
|||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -574,11 +573,9 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release
|
|||
func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return &querypb.GetSegmentInfoResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: msg,
|
||||
},
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
@ -650,7 +647,10 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) (
|
|||
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
return WrapSearchResult(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return &internalpb.SearchResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -744,7 +744,10 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i
|
|||
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
return WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return &internalpb.RetrieveResults{
|
||||
Status: merr.Status(err),
|
||||
}, nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -1004,12 +1007,9 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
|
|||
log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel()))
|
||||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID())
|
||||
status := &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}
|
||||
return status, nil
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
@ -1101,8 +1101,8 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
|
|||
// check node healthy
|
||||
if !node.lifetime.Add(commonpbutil.IsHealthy) {
|
||||
msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID())
|
||||
log.Warn(msg)
|
||||
return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil
|
||||
err := merr.WrapErrServiceNotReady(msg)
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
defer node.lifetime.Done()
|
||||
|
||||
|
|
|
@ -216,7 +216,7 @@ func (suite *ServiceSuite) TestGetStatistics_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
resp, err = suite.node.GetStatistics(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestWatchDmChannelsInt64() {
|
||||
|
@ -362,7 +362,7 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err = suite.node.WatchDmChannels(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestUnsubDmChannels_Normal() {
|
||||
|
@ -415,7 +415,7 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err = suite.node.UnsubDmChannel(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema) []*querypb.SegmentLoadInfo {
|
||||
|
@ -592,7 +592,7 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err = suite.node.LoadSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestLoadSegments_Transfer() {
|
||||
|
@ -688,7 +688,7 @@ func (suite *ServiceSuite) TestReleaseCollection_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err := suite.node.ReleaseCollection(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestReleasePartitions_Normal() {
|
||||
|
@ -715,7 +715,7 @@ func (suite *ServiceSuite) TestReleasePartitions_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err := suite.node.ReleasePartitions(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestReleaseSegments_Normal() {
|
||||
|
@ -759,7 +759,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err = suite.node.ReleaseSegments(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestReleaseSegments_Transfer() {
|
||||
|
@ -858,7 +858,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
rsp, err := suite.node.GetSegmentInfo(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, rsp.GetStatus().GetErrorCode())
|
||||
}
|
||||
|
||||
// Test Search
|
||||
|
@ -957,7 +957,7 @@ func (suite *ServiceSuite) TestSearch_Failed() {
|
|||
resp, err := suite.node.Search(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
|
||||
suite.Contains(resp.GetStatus().GetReason(), "GetShardDelefatorFailed")
|
||||
suite.Contains(resp.GetStatus().GetReason(), "failed to get query shard delegator")
|
||||
|
||||
suite.TestWatchDmChannelsInt64()
|
||||
suite.TestLoadSegments_Int64()
|
||||
|
@ -972,7 +972,7 @@ func (suite *ServiceSuite) TestSearch_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
resp, err = suite.node.Search(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
|
||||
}
|
||||
|
||||
// Test Query
|
||||
|
@ -1035,7 +1035,7 @@ func (suite *ServiceSuite) TestQuery_Failed() {
|
|||
resp, err := suite.node.Query(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode())
|
||||
suite.Contains(resp.GetStatus().GetReason(), "GetShardDelefatorFailed")
|
||||
suite.Contains(resp.GetStatus().GetReason(), "failed to get query shard delegator")
|
||||
|
||||
suite.TestWatchDmChannelsInt64()
|
||||
suite.TestLoadSegments_Int64()
|
||||
|
@ -1050,7 +1050,7 @@ func (suite *ServiceSuite) TestQuery_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
resp, err = suite.node.Query(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestSyncReplicaSegments_Normal() {
|
||||
|
@ -1092,7 +1092,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
resp, err := suite.node.ShowConfigurations(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestGetMetric_Normal() {
|
||||
|
@ -1146,7 +1146,7 @@ func (suite *ServiceSuite) TestGetMetric_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
resp, err = suite.node.GetMetrics(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.ErrorCode)
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestGetDataDistribution_Normal() {
|
||||
|
@ -1185,7 +1185,7 @@ func (suite *ServiceSuite) TestGetDataDistribution_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
resp, err = suite.node.GetDataDistribution(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestSyncDistribution_Normal() {
|
||||
|
@ -1301,7 +1301,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err = suite.node.SyncDistribution(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestDelete_Int64() {
|
||||
|
@ -1403,7 +1403,7 @@ func (suite *ServiceSuite) TestDelete_Failed() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err = suite.node.Delete(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
}
|
||||
|
||||
func (suite *ServiceSuite) TestLoadPartition() {
|
||||
|
@ -1421,7 +1421,7 @@ func (suite *ServiceSuite) TestLoadPartition() {
|
|||
suite.node.UpdateStateCode(commonpb.StateCode_Abnormal)
|
||||
status, err := suite.node.LoadPartitions(ctx, req)
|
||||
suite.NoError(err)
|
||||
suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode())
|
||||
suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode())
|
||||
suite.node.UpdateStateCode(commonpb.StateCode_Healthy)
|
||||
|
||||
// collection not exist and schema is nil
|
||||
|
|
|
@ -205,6 +205,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref
|
|||
pos: i,
|
||||
}
|
||||
d.pool.Store(name, dms)
|
||||
log.Info("lxg", zap.Any("name", name))
|
||||
d.channelsHeap = append(d.channelsHeap, dms)
|
||||
}
|
||||
|
||||
|
|
|
@ -187,13 +187,13 @@ func TestTimetickSyncWithExistChannels(t *testing.T) {
|
|||
// int64(1): {"rootcoord-dml_0"},
|
||||
//}
|
||||
|
||||
var baseParams = paramtable.BaseTable{}
|
||||
baseParams.Save("msgChannel.chanNamePrefix.rootCoordDml", "common.chanNamePrefix.rootCoordDml")
|
||||
baseParams.Save("msgChannel.chanNamePrefix.rootCoordDelta", "common.chanNamePrefix.rootCoordDelta")
|
||||
paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, "rootcoord-dml")
|
||||
paramtable.Get().Save(Params.CommonCfg.RootCoordDelta.Key, "rootcoord-delta")
|
||||
paramtable.Get().Save(Params.RootCoordCfg.DmlChannelNum.Key, "5")
|
||||
chans := map[UniqueID][]string{}
|
||||
|
||||
chans[UniqueID(100)] = []string{"rootcoord-dml_4", "rootcoord-dml_8"}
|
||||
chans[UniqueID(102)] = []string{"rootcoord-dml_2", "rootcoord-dml_9"}
|
||||
chans[UniqueID(100)] = []string{"by-dev-rootcoord-dml_4", "by-dev-rootcoord-dml_8"}
|
||||
chans[UniqueID(102)] = []string{"by-dev-rootcoord-dml_2", "by-dev-rootcoord-dml_9"}
|
||||
ttSync := newTimeTickSync(ctx, sourceID, factory, chans)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
@ -218,10 +218,10 @@ func TestTimetickSyncWithExistChannels(t *testing.T) {
|
|||
t.Run("assign channels", func(t *testing.T) {
|
||||
defer wg.Done()
|
||||
channels := ttSync.getDmlChannelNames(int(4))
|
||||
assert.Equal(t, channels, []string{"rootcoord-dml_0", "rootcoord-dml_1", "rootcoord-dml_3", "rootcoord-dml_5"})
|
||||
assert.Equal(t, channels, []string{"by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1", "by-dev-rootcoord-dml_3", "by-dev-rootcoord-dml_5"})
|
||||
|
||||
channels = ttSync.getDmlChannelNames(int(4))
|
||||
assert.Equal(t, channels, []string{"rootcoord-dml_6", "rootcoord-dml_7", "rootcoord-dml_0", "rootcoord-dml_1"})
|
||||
assert.Equal(t, channels, []string{"by-dev-rootcoord-dml_6", "by-dev-rootcoord-dml_7", "by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1"})
|
||||
})
|
||||
|
||||
// test get new channels
|
||||
|
@ -238,9 +238,8 @@ func TestTimetickSyncInvalidName(t *testing.T) {
|
|||
// int64(1): {"rootcoord-dml_0"},
|
||||
//}
|
||||
|
||||
var baseParams = paramtable.BaseTable{}
|
||||
baseParams.Save("msgChannel.chanNamePrefix.rootCoordDml", "common.chanNamePrefix.rootCoordDml")
|
||||
baseParams.Save("msgChannel.chanNamePrefix.rootCoordDelta", "common.chanNamePrefix.rootCoordDelta")
|
||||
paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, "rootcoord-dml")
|
||||
paramtable.Get().Save(Params.CommonCfg.RootCoordDelta.Key, "rootcoord-delta")
|
||||
chans := map[UniqueID][]string{}
|
||||
chans[UniqueID(100)] = []string{"rootcoord-dml4"}
|
||||
assert.Panics(t, func() {
|
||||
|
|
Loading…
Reference in New Issue