From 146050db82810358b33cb5bd0fcb1dae9a100c30 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Wed, 10 May 2023 09:31:19 +0800 Subject: [PATCH] Fix some wrong ut (#23990) Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/datacoord/server.go | 3 +- internal/datacoord/server_test.go | 3 +- internal/proxy/impl.go | 3 +- internal/proxy/proxy_test.go | 5 +- internal/proxy/task_statistic_test.go | 6 --- internal/proxy/task_test.go | 2 +- internal/querycoordv2/services_test.go | 2 +- internal/querynodev2/services.go | 72 ++++++++++++------------- internal/querynodev2/services_test.go | 36 ++++++------- internal/rootcoord/dml_channels.go | 1 + internal/rootcoord/timeticksync_test.go | 19 ++++--- 11 files changed, 75 insertions(+), 77 deletions(-) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 063438f32c..e67261d124 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -28,6 +28,7 @@ import ( "github.com/blang/semver/v4" "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/tsoutil" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" @@ -880,7 +881,7 @@ func (s *Server) startFlushLoop(ctx context.Context) { func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { segment := s.meta.GetHealthySegment(segmentID) if segment == nil { - return errors.New("segment not found, might be a faked segment, ignore post flush") + return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush") } // set segment to SegmentState_Flushed if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 9e2afb4838..111b253143 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -57,6 +57,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -3305,7 +3306,7 @@ func TestPostFlush(t *testing.T) { defer closeTestServer(t, svr) err := svr.postFlush(context.Background(), 1) - assert.EqualValues(t, errors.New("segment not found, might be a faked segment, ignore post flush"), err) + assert.ErrorIs(t, err, merr.ErrSegmentNotFound) }) t.Run("success post flush", func(t *testing.T) { diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 52efbbdf1d..48bf3ed395 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -1783,8 +1783,9 @@ func (node *Proxy) DescribeIndex(ctx context.Context, request *milvuspb.Describe // GetIndexStatistics get the information of index. func (node *Proxy) GetIndexStatistics(ctx context.Context, request *milvuspb.GetIndexStatisticsRequest) (*milvuspb.GetIndexStatisticsResponse, error) { if !node.checkHealthy() { + err := merr.WrapErrServiceNotReady(fmt.Sprintf("proxy %d is unhealthy", paramtable.GetNodeID())) return &milvuspb.GetIndexStatisticsResponse{ - Status: unhealthyStatus(), + Status: merr.Status(err), }, nil } diff --git a/internal/proxy/proxy_test.go b/internal/proxy/proxy_test.go index 599c98ab98..d81e87e284 100644 --- a/internal/proxy/proxy_test.go +++ b/internal/proxy/proxy_test.go @@ -4440,7 +4440,8 @@ func TestProxy_GetLoadState(t *testing.T) { }) } -func TestProxy_GetIndexStatistics(t *testing.T) { +func TestUnhealthProxy_GetIndexStatistics(t *testing.T) { + // check GetIndexStatistics when proxy is unhealthy factory := dependency.NewDefaultFactory(true) ctx := context.Background() proxy, err := NewProxy(ctx, factory) @@ -4454,6 +4455,6 @@ func TestProxy_GetIndexStatistics(t *testing.T) { IndexName: "", }) assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode) + assert.Equal(t, commonpb.ErrorCode_NotReadyServe, resp.Status.ErrorCode) }) } diff --git a/internal/proxy/task_statistic_test.go b/internal/proxy/task_statistic_test.go index 0b8f1af164..bd3bdf5cba 100644 --- a/internal/proxy/task_statistic_test.go +++ b/internal/proxy/task_statistic_test.go @@ -101,12 +101,6 @@ func TestStatisticTask_all(t *testing.T) { collectionID, err := globalMetaCache.GetCollectionID(ctx, collectionName) assert.NoError(t, err) - qc.EXPECT().ShowCollections(mock.Anything, mock.Anything).Return(&querypb.ShowCollectionsResponse{ - Status: &successStatus, - CollectionIDs: []int64{collectionID}, - InMemoryPercentages: []int64{100}, - }, nil) - status, err := qc.LoadCollection(ctx, &querypb.LoadCollectionRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadCollection, diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index f0a998b970..fe818afb39 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -2343,7 +2343,7 @@ func Test_dropCollectionTask_Execute(t *testing.T) { dct.DropCollectionRequest.CollectionName = "c2" err = dct.Execute(ctx) - assert.NoError(t, err) + assert.Error(t, err) assert.Equal(t, commonpb.ErrorCode_Success, dct.result.GetErrorCode()) } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 7dca34010b..09ea4ebbd9 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -361,7 +361,7 @@ func (suite *ServiceSuite) TestResourceGroup() { resp, err = server.CreateResourceGroup(ctx, createRG) suite.NoError(err) - suite.True(merr.Ok(resp)) + suite.False(merr.Ok(resp)) listRG := &milvuspb.ListResourceGroupsRequest{} resp1, err := server.ListResourceGroups(ctx, listRG) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index e1dadc0b84..7526c18d70 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -109,10 +109,10 @@ func (node *QueryNode) GetStatistics(ctx context.Context, req *querypb.GetStatis zap.Uint64("timeTravel", req.GetReq().GetTravelTimestamp())) if !node.lifetime.Add(commonpbutil.IsHealthy) { + msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) + err := merr.WrapErrServiceNotReady(msg) return &internalpb.GetStatisticsResponse{ - Status: util.WrapStatus(commonpb.ErrorCode_UnexpectedError, - fmt.Sprintf("query node %d is not healthy", paramtable.GetNodeID()), - ), + Status: merr.Status(err), }, nil } defer node.lifetime.Done() @@ -194,8 +194,8 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) - log.Warn(msg) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() @@ -331,12 +331,10 @@ func (node *QueryNode) UnsubDmChannel(ctx context.Context, req *querypb.UnsubDmC // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { - err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID()) - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - } - return status, nil + + msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() @@ -375,7 +373,8 @@ func (node *QueryNode) LoadPartitions(ctx context.Context, req *querypb.LoadPart // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() @@ -415,8 +414,9 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen ) // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { - err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID()) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, err.Error()), nil + msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } node.lifetime.Done() @@ -477,7 +477,8 @@ func (node *QueryNode) LoadSegments(ctx context.Context, req *querypb.LoadSegmen func (node *QueryNode) ReleaseCollection(ctx context.Context, in *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) { if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() @@ -495,12 +496,9 @@ func (node *QueryNode) ReleasePartitions(ctx context.Context, req *querypb.Relea // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { - err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID()) - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - } - return status, nil + msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() @@ -531,7 +529,8 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthyOrStopping) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() @@ -574,11 +573,9 @@ func (node *QueryNode) ReleaseSegments(ctx context.Context, req *querypb.Release func (node *QueryNode) GetSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) { if !node.lifetime.Add(commonpbutil.IsHealthy) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) + err := merr.WrapErrServiceNotReady(msg) return &querypb.GetSegmentInfoResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: msg, - }, + Status: merr.Status(err), }, nil } defer node.lifetime.Done() @@ -650,7 +647,10 @@ func (node *QueryNode) Search(ctx context.Context, req *querypb.SearchRequest) ( if !node.lifetime.Add(commonpbutil.IsHealthy) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) - return WrapSearchResult(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrServiceNotReady(msg) + return &internalpb.SearchResults{ + Status: merr.Status(err), + }, nil } defer node.lifetime.Done() @@ -744,7 +744,10 @@ func (node *QueryNode) Query(ctx context.Context, req *querypb.QueryRequest) (*i if !node.lifetime.Add(commonpbutil.IsHealthy) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) - return WrapRetrieveResult(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrServiceNotReady(msg) + return &internalpb.RetrieveResults{ + Status: merr.Status(err), + }, nil } defer node.lifetime.Done() @@ -1004,12 +1007,9 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi log := log.Ctx(ctx).With(zap.Int64("collectionID", req.GetCollectionID()), zap.String("channel", req.GetChannel())) // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { - err := fmt.Errorf("query node %d is not ready", paramtable.GetNodeID()) - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - } - return status, nil + msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() @@ -1101,8 +1101,8 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( // check node healthy if !node.lifetime.Add(commonpbutil.IsHealthy) { msg := fmt.Sprintf("query node %d is not ready", paramtable.GetNodeID()) - log.Warn(msg) - return util.WrapStatus(commonpb.ErrorCode_UnexpectedError, msg), nil + err := merr.WrapErrServiceNotReady(msg) + return merr.Status(err), nil } defer node.lifetime.Done() diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 447d894675..9ed27da18a 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -216,7 +216,7 @@ func (suite *ServiceSuite) TestGetStatistics_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = suite.node.GetStatistics(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode()) } func (suite *ServiceSuite) TestWatchDmChannelsInt64() { @@ -362,7 +362,7 @@ func (suite *ServiceSuite) TestWatchDmChannels_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err = suite.node.WatchDmChannels(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) TestUnsubDmChannels_Normal() { @@ -415,7 +415,7 @@ func (suite *ServiceSuite) TestUnsubDmChannels_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err = suite.node.UnsubDmChannel(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) genSegmentLoadInfos(schema *schemapb.CollectionSchema) []*querypb.SegmentLoadInfo { @@ -592,7 +592,7 @@ func (suite *ServiceSuite) TestLoadSegments_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err = suite.node.LoadSegments(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) TestLoadSegments_Transfer() { @@ -688,7 +688,7 @@ func (suite *ServiceSuite) TestReleaseCollection_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err := suite.node.ReleaseCollection(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) TestReleasePartitions_Normal() { @@ -715,7 +715,7 @@ func (suite *ServiceSuite) TestReleasePartitions_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err := suite.node.ReleasePartitions(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) TestReleaseSegments_Normal() { @@ -759,7 +759,7 @@ func (suite *ServiceSuite) TestReleaseSegments_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err = suite.node.ReleaseSegments(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) TestReleaseSegments_Transfer() { @@ -858,7 +858,7 @@ func (suite *ServiceSuite) TestGetSegmentInfo_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) rsp, err := suite.node.GetSegmentInfo(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, rsp.GetStatus().GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, rsp.GetStatus().GetErrorCode()) } // Test Search @@ -957,7 +957,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { resp, err := suite.node.Search(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - suite.Contains(resp.GetStatus().GetReason(), "GetShardDelefatorFailed") + suite.Contains(resp.GetStatus().GetReason(), "failed to get query shard delegator") suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() @@ -972,7 +972,7 @@ func (suite *ServiceSuite) TestSearch_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = suite.node.Search(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode()) } // Test Query @@ -1035,7 +1035,7 @@ func (suite *ServiceSuite) TestQuery_Failed() { resp, err := suite.node.Query(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.GetStatus().GetErrorCode()) - suite.Contains(resp.GetStatus().GetReason(), "GetShardDelefatorFailed") + suite.Contains(resp.GetStatus().GetReason(), "failed to get query shard delegator") suite.TestWatchDmChannelsInt64() suite.TestLoadSegments_Int64() @@ -1050,7 +1050,7 @@ func (suite *ServiceSuite) TestQuery_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = suite.node.Query(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode()) } func (suite *ServiceSuite) TestSyncReplicaSegments_Normal() { @@ -1092,7 +1092,7 @@ func (suite *ServiceSuite) TestShowConfigurations_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err := suite.node.ShowConfigurations(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode()) } func (suite *ServiceSuite) TestGetMetric_Normal() { @@ -1146,7 +1146,7 @@ func (suite *ServiceSuite) TestGetMetric_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = suite.node.GetMetrics(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.ErrorCode) } func (suite *ServiceSuite) TestGetDataDistribution_Normal() { @@ -1185,7 +1185,7 @@ func (suite *ServiceSuite) TestGetDataDistribution_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) resp, err = suite.node.GetDataDistribution(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, resp.Status.GetErrorCode()) } func (suite *ServiceSuite) TestSyncDistribution_Normal() { @@ -1301,7 +1301,7 @@ func (suite *ServiceSuite) TestSyncDistribution_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err = suite.node.SyncDistribution(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) TestDelete_Int64() { @@ -1403,7 +1403,7 @@ func (suite *ServiceSuite) TestDelete_Failed() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err = suite.node.Delete(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) } func (suite *ServiceSuite) TestLoadPartition() { @@ -1421,7 +1421,7 @@ func (suite *ServiceSuite) TestLoadPartition() { suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) status, err := suite.node.LoadPartitions(ctx, req) suite.NoError(err) - suite.Equal(commonpb.ErrorCode_UnexpectedError, status.GetErrorCode()) + suite.Equal(commonpb.ErrorCode_NotReadyServe, status.GetErrorCode()) suite.node.UpdateStateCode(commonpb.StateCode_Healthy) // collection not exist and schema is nil diff --git a/internal/rootcoord/dml_channels.go b/internal/rootcoord/dml_channels.go index 78d6cb00b3..e744ad42f9 100644 --- a/internal/rootcoord/dml_channels.go +++ b/internal/rootcoord/dml_channels.go @@ -205,6 +205,7 @@ func newDmlChannels(ctx context.Context, factory msgstream.Factory, chanNamePref pos: i, } d.pool.Store(name, dms) + log.Info("lxg", zap.Any("name", name)) d.channelsHeap = append(d.channelsHeap, dms) } diff --git a/internal/rootcoord/timeticksync_test.go b/internal/rootcoord/timeticksync_test.go index dad8e95b48..30f7a138ce 100644 --- a/internal/rootcoord/timeticksync_test.go +++ b/internal/rootcoord/timeticksync_test.go @@ -187,13 +187,13 @@ func TestTimetickSyncWithExistChannels(t *testing.T) { // int64(1): {"rootcoord-dml_0"}, //} - var baseParams = paramtable.BaseTable{} - baseParams.Save("msgChannel.chanNamePrefix.rootCoordDml", "common.chanNamePrefix.rootCoordDml") - baseParams.Save("msgChannel.chanNamePrefix.rootCoordDelta", "common.chanNamePrefix.rootCoordDelta") + paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, "rootcoord-dml") + paramtable.Get().Save(Params.CommonCfg.RootCoordDelta.Key, "rootcoord-delta") + paramtable.Get().Save(Params.RootCoordCfg.DmlChannelNum.Key, "5") chans := map[UniqueID][]string{} - chans[UniqueID(100)] = []string{"rootcoord-dml_4", "rootcoord-dml_8"} - chans[UniqueID(102)] = []string{"rootcoord-dml_2", "rootcoord-dml_9"} + chans[UniqueID(100)] = []string{"by-dev-rootcoord-dml_4", "by-dev-rootcoord-dml_8"} + chans[UniqueID(102)] = []string{"by-dev-rootcoord-dml_2", "by-dev-rootcoord-dml_9"} ttSync := newTimeTickSync(ctx, sourceID, factory, chans) var wg sync.WaitGroup @@ -218,10 +218,10 @@ func TestTimetickSyncWithExistChannels(t *testing.T) { t.Run("assign channels", func(t *testing.T) { defer wg.Done() channels := ttSync.getDmlChannelNames(int(4)) - assert.Equal(t, channels, []string{"rootcoord-dml_0", "rootcoord-dml_1", "rootcoord-dml_3", "rootcoord-dml_5"}) + assert.Equal(t, channels, []string{"by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1", "by-dev-rootcoord-dml_3", "by-dev-rootcoord-dml_5"}) channels = ttSync.getDmlChannelNames(int(4)) - assert.Equal(t, channels, []string{"rootcoord-dml_6", "rootcoord-dml_7", "rootcoord-dml_0", "rootcoord-dml_1"}) + assert.Equal(t, channels, []string{"by-dev-rootcoord-dml_6", "by-dev-rootcoord-dml_7", "by-dev-rootcoord-dml_0", "by-dev-rootcoord-dml_1"}) }) // test get new channels @@ -238,9 +238,8 @@ func TestTimetickSyncInvalidName(t *testing.T) { // int64(1): {"rootcoord-dml_0"}, //} - var baseParams = paramtable.BaseTable{} - baseParams.Save("msgChannel.chanNamePrefix.rootCoordDml", "common.chanNamePrefix.rootCoordDml") - baseParams.Save("msgChannel.chanNamePrefix.rootCoordDelta", "common.chanNamePrefix.rootCoordDelta") + paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, "rootcoord-dml") + paramtable.Get().Save(Params.CommonCfg.RootCoordDelta.Key, "rootcoord-delta") chans := map[UniqueID][]string{} chans[UniqueID(100)] = []string{"rootcoord-dml4"} assert.Panics(t, func() {