From 75e6b65c608c727ab3f3c6665f22d76226d119c5 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Mon, 8 Jan 2024 17:46:47 +0800 Subject: [PATCH] enhance: Use ChannelManger interface in Server (#29629) See also: #29447 --------- Signed-off-by: yangxuan --- internal/datacoord/compaction_test.go | 34 +- internal/datacoord/server.go | 2 +- internal/datacoord/server_test.go | 943 +------------------------- internal/datacoord/services.go | 6 +- internal/datacoord/services_test.go | 637 +++++++++++++++++ 5 files changed, 673 insertions(+), 949 deletions(-) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 7dae970d8d..009bf73e82 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -43,7 +43,7 @@ type CompactionPlanHandlerSuite struct { mockAlloc *NMockAllocator mockSch *MockScheduler mockCm *MockChannelManager - mockSession *MockSessionManager + mockSessMgr *MockSessionManager } func (s *CompactionPlanHandlerSuite) SetupTest() { @@ -51,7 +51,7 @@ func (s *CompactionPlanHandlerSuite) SetupTest() { s.mockAlloc = NewNMockAllocator(s.T()) s.mockSch = NewMockScheduler(s.T()) s.mockCm = NewMockChannelManager(s.T()) - s.mockSession = NewMockSessionManager(s.T()) + s.mockSessMgr = NewMockSessionManager(s.T()) } func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { @@ -76,7 +76,7 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { } func (s *CompactionPlanHandlerSuite) TestCheckResult() { - s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ + s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ 1: {PlanID: 1, State: commonpb.CompactionState_Executing}, 2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, 3: {PlanID: 3, State: commonpb.CompactionState_Executing}, @@ -84,13 +84,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() { }) { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() - handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) handler.checkResult() } { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once() - handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) handler.checkResult() } } @@ -277,7 +277,7 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { {"channel with no error", "ch-2", false}, } - handler := newCompactionPlanHandler(nil, s.mockCm, s.mockMeta, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.scheduler = s.mockSch for idx, test := range tests { @@ -310,7 +310,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { s.Run("illegal nil result", func() { s.SetupTest() - handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) err := handler.handleMergeCompactionResult(nil, nil) s.Error(err) }) @@ -324,9 +324,9 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { } return nil }).Once() - s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() + s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() - handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} compactionResult := &datapb.CompactionPlanResult{ @@ -345,7 +345,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return( nil, nil, nil, errors.New("mock error")).Once() - handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} compactionResult := &datapb.CompactionPlanResult{ PlanID: plan.PlanID, @@ -368,7 +368,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything). Return(errors.New("mock error")).Once() - handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} compactionResult := &datapb.CompactionPlanResult{ PlanID: plan.PlanID, @@ -390,9 +390,9 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { &segMetricMutation{}, nil).Once() s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything). Return(nil).Once() - s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() + s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() - handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} compactionResult := &datapb.CompactionPlanResult{ PlanID: plan.PlanID, @@ -422,7 +422,7 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() { }) s.Run("test complete merge compaction task", func() { - s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() + s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() // mock for handleMergeCompactionResult s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once() s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return( @@ -490,7 +490,7 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() { }, } - c := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + c := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) c.scheduler = s.mockSch c.plans = plans @@ -536,7 +536,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { } func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { - s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ + s.mockSessMgr.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{ 1: {PlanID: 1, State: commonpb.CompactionState_Executing}, 2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}}, 3: {PlanID: 3, State: commonpb.CompactionState_Executing}, @@ -565,7 +565,7 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { }, } - handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc) + handler := newCompactionPlanHandler(s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans = inPlans err := handler.updateCompaction(0) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 2c57f33c99..1f30d59633 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -117,7 +117,7 @@ type Server struct { allocator allocator cluster Cluster sessionManager SessionManager - channelManager *ChannelManagerImpl + channelManager ChannelManager rootCoordClient types.RootCoordClient garbageCollector *garbageCollector gcOpt GcOption diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 00cfdedabe..907dab0067 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -37,8 +37,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - grpcStatus "google.golang.org/grpc/status" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -95,135 +93,6 @@ func TestMain(m *testing.M) { os.Exit(code) } -func TestGetSegmentInfoChannel(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - t.Run("get segment info channel", func(t *testing.T) { - resp, err := svr.GetSegmentInfoChannel(context.TODO(), nil) - assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.EqualValues(t, Params.CommonCfg.DataCoordSegmentInfo.GetValue(), resp.Value) - }) -} - -func TestAssignSegmentID(t *testing.T) { - const collID = 100 - const collIDInvalid = 101 - const partID = 0 - const channel0 = "channel0" - - t.Run("assign segment normally", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: collID, - Schema: schema, - Partitions: []int64{}, - }) - req := &datapb.SegmentIDRequest{ - Count: 1000, - ChannelName: channel0, - CollectionID: collID, - PartitionID: partID, - } - - resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ - NodeID: 0, - PeerRole: "", - SegmentIDRequests: []*datapb.SegmentIDRequest{req}, - }) - assert.NoError(t, err) - assert.EqualValues(t, 1, len(resp.SegIDAssignments)) - assign := resp.SegIDAssignments[0] - assert.EqualValues(t, commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode()) - assert.EqualValues(t, collID, assign.CollectionID) - assert.EqualValues(t, partID, assign.PartitionID) - assert.EqualValues(t, channel0, assign.ChannelName) - assert.EqualValues(t, 1000, assign.Count) - }) - - t.Run("assign segment for bulkload", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: collID, - Schema: schema, - Partitions: []int64{}, - }) - req := &datapb.SegmentIDRequest{ - Count: 1000, - ChannelName: channel0, - CollectionID: collID, - PartitionID: partID, - IsImport: true, - } - - resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ - NodeID: 0, - PeerRole: "", - SegmentIDRequests: []*datapb.SegmentIDRequest{req}, - }) - assert.NoError(t, err) - assert.EqualValues(t, 1, len(resp.SegIDAssignments)) - assign := resp.SegIDAssignments[0] - assert.EqualValues(t, commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode()) - assert.EqualValues(t, collID, assign.CollectionID) - assert.EqualValues(t, partID, assign.PartitionID) - assert.EqualValues(t, channel0, assign.ChannelName) - assert.EqualValues(t, 1000, assign.Count) - }) - - t.Run("with closed server", func(t *testing.T) { - req := &datapb.SegmentIDRequest{ - Count: 100, - ChannelName: channel0, - CollectionID: collID, - PartitionID: partID, - } - svr := newTestServer(t, nil) - closeTestServer(t, svr) - resp, err := svr.AssignSegmentID(context.Background(), &datapb.AssignSegmentIDRequest{ - NodeID: 0, - PeerRole: "", - SegmentIDRequests: []*datapb.SegmentIDRequest{req}, - }) - assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - }) - - t.Run("assign segment with invalid collection", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - svr.rootCoordClient = &mockRootCoord{ - RootCoordClient: svr.rootCoordClient, - collID: collID, - } - - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ - ID: collID, - Schema: schema, - Partitions: []int64{}, - }) - req := &datapb.SegmentIDRequest{ - Count: 1000, - ChannelName: channel0, - CollectionID: collIDInvalid, - PartitionID: partID, - } - - resp, err := svr.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ - NodeID: 0, - PeerRole: "", - SegmentIDRequests: []*datapb.SegmentIDRequest{req}, - }) - assert.NoError(t, err) - assert.EqualValues(t, 0, len(resp.SegIDAssignments)) - }) -} - type mockRootCoord struct { types.RootCoordClient collID UniqueID @@ -248,126 +117,6 @@ func (r *mockRootCoord) ReportImport(ctx context.Context, req *rootcoordpb.Impor }, nil } -func TestFlush(t *testing.T) { - req := &datapb.FlushRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Flush, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - DbID: 0, - CollectionID: 0, - } - t.Run("normal case", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}}) - allocations, err := svr.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1) - assert.NoError(t, err) - assert.EqualValues(t, 1, len(allocations)) - expireTs := allocations[0].ExpireTime - segID := allocations[0].SegmentID - - resp, err := svr.Flush(context.TODO(), req) - assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - - svr.meta.SetCurrentRows(segID, 1) - ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) - assert.NoError(t, err) - assert.EqualValues(t, 1, len(ids)) - assert.EqualValues(t, segID, ids[0]) - }) - - t.Run("bulkload segment", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - schema := newTestSchema() - svr.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}}) - - allocations, err := svr.segmentManager.allocSegmentForImport(context.TODO(), 0, 1, "channel-1", 1, 100) - assert.NoError(t, err) - expireTs := allocations.ExpireTime - segID := allocations.SegmentID - - resp, err := svr.Flush(context.TODO(), req) - assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.EqualValues(t, 0, len(resp.SegmentIDs)) - // should not flush anything since this is a normal flush - svr.meta.SetCurrentRows(segID, 1) - ids, err := svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) - assert.NoError(t, err) - assert.EqualValues(t, 0, len(ids)) - - req := &datapb.FlushRequest{ - Base: &commonpb.MsgBase{ - MsgType: commonpb.MsgType_Flush, - MsgID: 0, - Timestamp: 0, - SourceID: 0, - }, - DbID: 0, - CollectionID: 0, - IsImport: true, - } - - resp, err = svr.Flush(context.TODO(), req) - assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.EqualValues(t, 1, len(resp.SegmentIDs)) - - ids, err = svr.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) - assert.NoError(t, err) - assert.EqualValues(t, 1, len(ids)) - assert.EqualValues(t, segID, ids[0]) - }) - - t.Run("closed server", func(t *testing.T) { - svr := newTestServer(t, nil) - closeTestServer(t, svr) - resp, err := svr.Flush(context.Background(), req) - assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - }) - - t.Run("test rolling upgrade", func(t *testing.T) { - svr := newTestServer(t, nil) - closeTestServer(t, svr) - svr.stateCode.Store(commonpb.StateCode_Healthy) - sm := NewSessionManagerImpl() - - datanodeClient := mocks.NewMockDataNodeClient(t) - datanodeClient.EXPECT().FlushChannels(mock.Anything, mock.Anything).Return(nil, - merr.WrapErrServiceUnimplemented(grpcStatus.Error(codes.Unimplemented, "mock grpc unimplemented error"))) - - sm.sessions = struct { - sync.RWMutex - data map[int64]*Session - }{data: map[int64]*Session{1: { - client: datanodeClient, - clientCreator: func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) { - return datanodeClient, nil - }, - }}} - - svr.sessionManager = sm - svr.cluster = NewClusterImpl(sm, svr.channelManager) - - err := svr.channelManager.AddNode(1) - assert.NoError(t, err) - err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 0}) - assert.NoError(t, err) - - resp, err := svr.Flush(context.TODO(), req) - assert.NoError(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.Equal(t, Timestamp(0), resp.GetFlushTs()) - }) -} - // func TestGetComponentStates(t *testing.T) { // svr := newTestServer(t) // defer closeTestServer(t, svr) @@ -1280,435 +1029,6 @@ func (s *spySegmentManager) DropSegmentsOfChannel(ctx context.Context, channel s s.spyCh <- struct{}{} } -func TestSaveBinlogPaths(t *testing.T) { - t.Run("Normal SaveRequest", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - // vecFieldID := int64(201) - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - }) - - segments := []struct { - id UniqueID - collectionID UniqueID - }{ - {0, 0}, - {1, 0}, - } - for _, segment := range segments { - s := &datapb.SegmentInfo{ - ID: segment.id, - CollectionID: segment.collectionID, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - } - err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) - assert.NoError(t, err) - } - - ctx := context.Background() - - err := svr.channelManager.AddNode(0) - assert.NoError(t, err) - err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0}) - assert.NoError(t, err) - - resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ - Base: &commonpb.MsgBase{ - Timestamp: uint64(time.Now().Unix()), - }, - SegmentID: 1, - CollectionID: 0, - Channel: "ch1", - Field2BinlogPaths: []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "/by-dev/test/0/1/1/1/Allo1", - EntriesNum: 5, - }, - { - LogPath: "/by-dev/test/0/1/1/1/Allo2", - EntriesNum: 5, - }, - }, - }, - }, - Field2StatslogPaths: []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "/by-dev/test_stats/0/1/1/1/Allo1", - EntriesNum: 5, - }, - { - LogPath: "/by-dev/test_stats/0/1/1/1/Allo2", - EntriesNum: 5, - }, - }, - }, - }, - CheckPoints: []*datapb.CheckPoint{ - { - SegmentID: 1, - Position: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 0, - }, - NumOfRows: 12, - }, - }, - Flushed: false, - }) - assert.NoError(t, err) - assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success) - - segment := svr.meta.GetHealthySegment(1) - assert.NotNil(t, segment) - binlogs := segment.GetBinlogs() - assert.EqualValues(t, 1, len(binlogs)) - fieldBinlogs := binlogs[0] - assert.NotNil(t, fieldBinlogs) - assert.EqualValues(t, 2, len(fieldBinlogs.GetBinlogs())) - assert.EqualValues(t, 1, fieldBinlogs.GetFieldID()) - assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath()) - assert.EqualValues(t, "/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath()) - - assert.EqualValues(t, segment.DmlPosition.ChannelName, "ch1") - assert.EqualValues(t, segment.DmlPosition.MsgID, []byte{1, 2, 3}) - assert.EqualValues(t, segment.NumOfRows, 10) - }) - - t.Run("Normal L0 SaveRequest", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - // vecFieldID := int64(201) - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - }) - - ctx := context.Background() - - err := svr.channelManager.AddNode(0) - assert.NoError(t, err) - err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0}) - assert.NoError(t, err) - - resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ - Base: &commonpb.MsgBase{ - Timestamp: uint64(time.Now().Unix()), - }, - SegmentID: 1, - PartitionID: 1, - CollectionID: 0, - SegLevel: datapb.SegmentLevel_L0, - Deltalogs: []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "/by-dev/test/0/1/1/1/Allo1", - EntriesNum: 5, - }, - { - LogPath: "/by-dev/test/0/1/1/1/Allo2", - EntriesNum: 5, - }, - }, - }, - }, - CheckPoints: []*datapb.CheckPoint{ - { - SegmentID: 1, - Position: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 0, - }, - NumOfRows: 12, - }, - }, - Flushed: true, - }) - assert.NoError(t, err) - assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success) - - segment := svr.meta.GetHealthySegment(1) - assert.NotNil(t, segment) - }) - - t.Run("SaveDroppedSegment", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - // vecFieldID := int64(201) - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - }) - - segments := []struct { - id UniqueID - collectionID UniqueID - }{ - {0, 0}, - {1, 0}, - } - for _, segment := range segments { - s := &datapb.SegmentInfo{ - ID: segment.id, - CollectionID: segment.collectionID, - InsertChannel: "ch1", - State: commonpb.SegmentState_Dropped, - } - err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) - assert.NoError(t, err) - } - - ctx := context.Background() - err := svr.channelManager.AddNode(0) - assert.NoError(t, err) - err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0}) - assert.NoError(t, err) - - resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ - Base: &commonpb.MsgBase{ - Timestamp: uint64(time.Now().Unix()), - }, - SegmentID: 1, - CollectionID: 0, - Field2BinlogPaths: []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "/by-dev/test/0/1/1/1/Allo1", - EntriesNum: 5, - }, - { - LogPath: "/by-dev/test/0/1/1/1/Allo2", - EntriesNum: 5, - }, - }, - }, - }, - CheckPoints: []*datapb.CheckPoint{ - { - SegmentID: 1, - Position: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 0, - }, - NumOfRows: 12, - }, - }, - Flushed: false, - }) - assert.NoError(t, err) - assert.EqualValues(t, resp.ErrorCode, commonpb.ErrorCode_Success) - - segment := svr.meta.GetSegment(1) - assert.NotNil(t, segment) - binlogs := segment.GetBinlogs() - assert.EqualValues(t, 0, len(binlogs)) - assert.EqualValues(t, segment.NumOfRows, 0) - }) - - t.Run("SaveUnhealthySegment", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - // vecFieldID := int64(201) - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - }) - - segments := []struct { - id UniqueID - collectionID UniqueID - }{ - {0, 0}, - {1, 0}, - } - for _, segment := range segments { - s := &datapb.SegmentInfo{ - ID: segment.id, - CollectionID: segment.collectionID, - InsertChannel: "ch1", - State: commonpb.SegmentState_NotExist, - } - err := svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) - assert.NoError(t, err) - } - - ctx := context.Background() - err := svr.channelManager.AddNode(0) - assert.NoError(t, err) - err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0}) - assert.NoError(t, err) - - resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ - Base: &commonpb.MsgBase{ - Timestamp: uint64(time.Now().Unix()), - }, - SegmentID: 1, - CollectionID: 0, - Field2BinlogPaths: []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "/by-dev/test/0/1/1/1/Allo1", - EntriesNum: 5, - }, - { - LogPath: "/by-dev/test/0/1/1/1/Allo2", - EntriesNum: 5, - }, - }, - }, - }, - CheckPoints: []*datapb.CheckPoint{ - { - SegmentID: 1, - Position: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 0, - }, - NumOfRows: 12, - }, - }, - Flushed: false, - }) - assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound) - }) - - t.Run("SaveNotExistSegment", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - - // vecFieldID := int64(201) - svr.meta.AddCollection(&collectionInfo{ - ID: 0, - }) - - ctx := context.Background() - err := svr.channelManager.AddNode(0) - assert.NoError(t, err) - err = svr.channelManager.Watch(ctx, &channelMeta{Name: "ch1", CollectionID: 0}) - assert.NoError(t, err) - - resp, err := svr.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ - Base: &commonpb.MsgBase{ - Timestamp: uint64(time.Now().Unix()), - }, - SegmentID: 1, - CollectionID: 0, - Field2BinlogPaths: []*datapb.FieldBinlog{ - { - FieldID: 1, - Binlogs: []*datapb.Binlog{ - { - LogPath: "/by-dev/test/0/1/1/1/Allo1", - EntriesNum: 5, - }, - { - LogPath: "/by-dev/test/0/1/1/1/Allo2", - EntriesNum: 5, - }, - }, - }, - }, - CheckPoints: []*datapb.CheckPoint{ - { - SegmentID: 1, - Position: &msgpb.MsgPosition{ - ChannelName: "ch1", - MsgID: []byte{1, 2, 3}, - MsgGroup: "", - Timestamp: 0, - }, - NumOfRows: 12, - }, - }, - Flushed: false, - }) - assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp), merr.ErrSegmentNotFound) - }) - - t.Run("with channel not matched", func(t *testing.T) { - svr := newTestServer(t, nil) - defer closeTestServer(t, svr) - err := svr.channelManager.AddNode(0) - require.Nil(t, err) - err = svr.channelManager.Watch(context.TODO(), &channelMeta{Name: "ch1", CollectionID: 0}) - require.Nil(t, err) - s := &datapb.SegmentInfo{ - ID: 1, - InsertChannel: "ch2", - State: commonpb.SegmentState_Growing, - } - svr.meta.AddSegment(context.TODO(), NewSegmentInfo(s)) - - resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{ - SegmentID: 1, - Channel: "test", - }) - assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp), merr.ErrChannelNotFound) - }) - - t.Run("with closed server", func(t *testing.T) { - svr := newTestServer(t, nil) - closeTestServer(t, svr) - resp, err := svr.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{}) - assert.NoError(t, err) - assert.ErrorIs(t, merr.Error(resp), merr.ErrServiceNotReady) - }) - /* - t.Run("test save dropped segment and remove channel", func(t *testing.T) { - spyCh := make(chan struct{}, 1) - svr := newTestServer(t, nil, WithSegmentManager(&spySegmentManager{spyCh: spyCh})) - defer closeTestServer(t, svr) - - svr.meta.AddCollection(&collectionInfo{ID: 1}) - err := svr.meta.AddSegment(&SegmentInfo{ - Segment: &datapb.SegmentInfo{ - ID: 1, - CollectionID: 1, - InsertChannel: "ch1", - State: commonpb.SegmentState_Growing, - }, - }) - assert.NoError(t, err) - - err = svr.channelManager.AddNode(0) - assert.NoError(t, err) - err = svr.channelManager.Watch(&channel{Name: "ch1", CollectionID: 1}) - assert.NoError(t, err) - - _, err = svr.SaveBinlogPaths(context.TODO(), &datapb.SaveBinlogPathsRequest{ - SegmentID: 1, - Dropped: true, - }) - assert.NoError(t, err) - <-spyCh - })*/ -} - func TestDropVirtualChannel(t *testing.T) { t.Run("normal DropVirtualChannel", func(t *testing.T) { spyCh := make(chan struct{}, 1) @@ -3427,243 +2747,6 @@ func TestPostFlush(t *testing.T) { }) } -func TestGetFlushState(t *testing.T) { - t.Run("get flush state with all flushed segments", func(t *testing.T) { - meta, err := newMemoryMeta() - assert.NoError(t, err) - svr := newTestServerWithMeta(t, nil, meta) - defer closeTestServer(t, svr) - - err = meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - State: commonpb.SegmentState_Flushed, - }, - }) - assert.NoError(t, err) - err = meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - State: commonpb.SegmentState_Flushed, - }, - }) - assert.NoError(t, err) - - var ( - vchannel = "ch1" - collection = int64(0) - ) - - svr.channelManager = &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, - }, - }, - } - - err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{ - MsgID: []byte{1}, - Timestamp: 12, - }) - assert.NoError(t, err) - - resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) - assert.NoError(t, err) - assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: merr.Success(), - Flushed: true, - }, resp) - }) - - t.Run("get flush state with unflushed segments", func(t *testing.T) { - meta, err := newMemoryMeta() - assert.NoError(t, err) - svr := newTestServerWithMeta(t, nil, meta) - defer closeTestServer(t, svr) - - err = meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - State: commonpb.SegmentState_Flushed, - }, - }) - assert.NoError(t, err) - err = meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - State: commonpb.SegmentState_Sealed, - }, - }) - assert.NoError(t, err) - - var ( - vchannel = "ch1" - collection = int64(0) - ) - - svr.channelManager = &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, - }, - }, - } - - err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{ - MsgID: []byte{1}, - Timestamp: 12, - }) - assert.NoError(t, err) - - resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) - assert.NoError(t, err) - assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: merr.Success(), - Flushed: false, - }, resp) - }) - - t.Run("get flush state with compacted segments", func(t *testing.T) { - meta, err := newMemoryMeta() - assert.NoError(t, err) - svr := newTestServerWithMeta(t, nil, meta) - defer closeTestServer(t, svr) - - err = meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 1, - State: commonpb.SegmentState_Flushed, - }, - }) - assert.NoError(t, err) - err = meta.AddSegment(context.TODO(), &SegmentInfo{ - SegmentInfo: &datapb.SegmentInfo{ - ID: 2, - State: commonpb.SegmentState_Dropped, - }, - }) - assert.NoError(t, err) - - var ( - vchannel = "ch1" - collection = int64(0) - ) - - svr.channelManager = &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, - }, - }, - } - - err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{ - MsgID: []byte{1}, - Timestamp: 12, - }) - assert.NoError(t, err) - - resp, err := svr.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{1, 2}}) - assert.NoError(t, err) - assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: merr.Success(), - Flushed: true, - }, resp) - }) - - t.Run("channel flushed", func(t *testing.T) { - meta, err := newMemoryMeta() - assert.NoError(t, err) - svr := newTestServerWithMeta(t, nil, meta) - defer closeTestServer(t, svr) - - var ( - vchannel = "ch1" - collection = int64(0) - ) - - svr.channelManager = &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, - }, - }, - } - - err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{ - MsgID: []byte{1}, - Timestamp: 12, - }) - assert.NoError(t, err) - - resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{ - FlushTs: 11, - CollectionID: collection, - }) - assert.NoError(t, err) - assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: merr.Success(), - Flushed: true, - }, resp) - }) - - t.Run("channel unflushed", func(t *testing.T) { - meta, err := newMemoryMeta() - assert.NoError(t, err) - svr := newTestServerWithMeta(t, nil, meta) - defer closeTestServer(t, svr) - - var ( - vchannel = "ch1" - collection = int64(0) - ) - - svr.channelManager = &ChannelManagerImpl{ - store: &ChannelStore{ - channelsInfo: map[int64]*NodeChannelInfo{ - 1: {NodeID: 1, Channels: []RWChannel{&channelMeta{Name: vchannel, CollectionID: collection}}}, - }, - }, - } - - err = svr.meta.UpdateChannelCheckpoint(vchannel, &msgpb.MsgPosition{ - MsgID: []byte{1}, - Timestamp: 10, - }) - assert.NoError(t, err) - - resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{ - FlushTs: 11, - CollectionID: collection, - }) - assert.NoError(t, err) - assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: merr.Success(), - Flushed: false, - }, resp) - }) - - t.Run("no channels", func(t *testing.T) { - meta, err := newMemoryMeta() - assert.NoError(t, err) - svr := newTestServerWithMeta(t, nil, meta) - defer closeTestServer(t, svr) - - collection := int64(0) - - resp, err := svr.GetFlushState(context.Background(), &datapb.GetFlushStateRequest{ - FlushTs: 11, - CollectionID: collection, - }) - assert.NoError(t, err) - assert.EqualValues(t, &milvuspb.GetFlushStateResponse{ - Status: merr.Success(), - Flushed: true, - }, resp) - }) -} - func TestGetFlushAllState(t *testing.T) { tests := []struct { testName string @@ -4273,11 +3356,6 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server { assert.NoError(t, err) assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode)) - // Stop channal watch state watcher in tests - if svr.channelManager != nil && svr.channelManager.stopChecker != nil { - svr.channelManager.stopChecker() - } - return svr } @@ -4323,8 +3401,11 @@ func newTestServerWithMeta(t *testing.T, receiveCh chan any, meta *meta, opts .. assert.NoError(t, err) // Stop channal watch state watcher in tests - if svr.channelManager != nil && svr.channelManager.stopChecker != nil { - svr.channelManager.stopChecker() + if svr.channelManager != nil { + impl, ok := svr.channelManager.(*ChannelManagerImpl) + if ok && impl.stopChecker != nil { + impl.stopChecker() + } } return svr @@ -4376,8 +3457,11 @@ func newTestServer2(t *testing.T, receiveCh chan any, opts ...Option) *Server { assert.NoError(t, err) // Stop channal watch state watcher in tests - if svr.channelManager != nil && svr.channelManager.stopChecker != nil { - svr.channelManager.stopChecker() + if svr.channelManager != nil { + impl, ok := svr.channelManager.(*ChannelManagerImpl) + if ok && impl.stopChecker != nil { + impl.stopChecker() + } } return svr @@ -4578,8 +3662,11 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server { assert.Equal(t, commonpb.StateCode_Healthy, resp.GetState().GetStateCode()) // stop channal watch state watcher in tests - if svr.channelManager != nil && svr.channelManager.stopChecker != nil { - svr.channelManager.stopChecker() + if svr.channelManager != nil { + impl, ok := svr.channelManager.(*ChannelManagerImpl) + if ok && impl.stopChecker != nil { + impl.stopChecker() + } } return svr diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8ee6a3f6b6..f3151e5ad7 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1261,7 +1261,7 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq } } - channels := s.channelManager.GetChannelNamesByCollectionID(req.GetCollectionID()) + channels := s.channelManager.GetChannelsByCollectionID(req.GetCollectionID()) if len(channels) == 0 { // For compatibility with old client resp.Flushed = true @@ -1270,11 +1270,11 @@ func (s *Server) GetFlushState(ctx context.Context, req *datapb.GetFlushStateReq } for _, channel := range channels { - cp := s.meta.GetChannelCheckpoint(channel) + cp := s.meta.GetChannelCheckpoint(channel.GetName()) if cp == nil || cp.GetTimestamp() < req.GetFlushTs() { resp.Flushed = false - log.RatedInfo(10, "GetFlushState failed, channel unflushed", zap.String("channel", channel), + log.RatedInfo(10, "GetFlushState failed, channel unflushed", zap.String("channel", channel.GetName()), zap.Time("CP", tsoutil.PhysicalTime(cp.GetTimestamp())), zap.Duration("lag", tsoutil.PhysicalTime(req.GetFlushTs()).Sub(tsoutil.PhysicalTime(cp.GetTimestamp())))) return resp, nil diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 119c8c8f60..45a14f73c5 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -3,23 +3,660 @@ package datacoord import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "go.uber.org/zap" + "google.golang.org/grpc/codes" + grpcStatus "google.golang.org/grpc/status" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" ) +type ServerSuite struct { + suite.Suite + + testServer *Server + mockChMgr *MockChannelManager +} + +func (s *ServerSuite) SetupTest() { + s.testServer = newTestServer(s.T(), nil) + if s.testServer.channelManager != nil { + s.testServer.channelManager.Close() + } + + s.mockChMgr = NewMockChannelManager(s.T()) + s.testServer.channelManager = s.mockChMgr + if s.mockChMgr != nil { + s.mockChMgr.EXPECT().Close().Maybe() + } +} + +func (s *ServerSuite) TearDownTest() { + if s.testServer != nil { + log.Info("ServerSuite tears down test", zap.String("name", s.T().Name())) + closeTestServer(s.T(), s.testServer) + } +} + +func TestServerSuite(t *testing.T) { + suite.Run(t, new(ServerSuite)) +} + +func (s *ServerSuite) TestGetFlushState_ByFlushTs() { + s.mockChMgr.EXPECT().GetChannelsByCollectionID(int64(0)). + Return([]RWChannel{&channelMeta{Name: "ch1", CollectionID: 0}}).Times(3) + + s.mockChMgr.EXPECT().GetChannelsByCollectionID(int64(1)).Return(nil).Times(1) + tests := []struct { + description string + inTs Timestamp + + expected bool + }{ + {"channel cp > flush ts", 11, true}, + {"channel cp = flush ts", 12, true}, + {"channel cp < flush ts", 13, false}, + } + + err := s.testServer.meta.UpdateChannelCheckpoint("ch1", &msgpb.MsgPosition{ + MsgID: []byte{1}, + Timestamp: 12, + }) + s.Require().NoError(err) + for _, test := range tests { + s.Run(test.description, func() { + resp, err := s.testServer.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{FlushTs: test.inTs}) + s.NoError(err) + s.EqualValues(&milvuspb.GetFlushStateResponse{ + Status: merr.Success(), + Flushed: test.expected, + }, resp) + }) + } + + resp, err := s.testServer.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{CollectionID: 1, FlushTs: 13}) + s.NoError(err) + s.EqualValues(&milvuspb.GetFlushStateResponse{ + Status: merr.Success(), + Flushed: true, + }, resp) +} + +func (s *ServerSuite) TestGetFlushState_BySegment() { + s.mockChMgr.EXPECT().GetChannelsByCollectionID(mock.Anything). + Return([]RWChannel{&channelMeta{Name: "ch1", CollectionID: 0}}).Times(3) + + tests := []struct { + description string + segID int64 + state commonpb.SegmentState + + expected bool + }{ + {"flushed seg1", 1, commonpb.SegmentState_Flushed, true}, + {"flushed seg2", 2, commonpb.SegmentState_Flushed, true}, + {"sealed seg3", 3, commonpb.SegmentState_Sealed, false}, + {"compacted/dropped seg4", 4, commonpb.SegmentState_Dropped, true}, + } + + for _, test := range tests { + s.Run(test.description, func() { + err := s.testServer.meta.AddSegment(context.TODO(), &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: test.segID, + State: test.state, + }, + }) + + s.Require().NoError(err) + err = s.testServer.meta.UpdateChannelCheckpoint("ch1", &msgpb.MsgPosition{ + MsgID: []byte{1}, + Timestamp: 12, + }) + s.Require().NoError(err) + + resp, err := s.testServer.GetFlushState(context.TODO(), &datapb.GetFlushStateRequest{SegmentIDs: []int64{test.segID}}) + s.NoError(err) + s.EqualValues(&milvuspb.GetFlushStateResponse{ + Status: merr.Success(), + Flushed: test.expected, + }, resp) + }) + } +} + +func (s *ServerSuite) TestSaveBinlogPath_ClosedServer() { + s.TearDownTest() + resp, err := s.testServer.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{ + SegmentID: 1, + Channel: "test", + }) + s.NoError(err) + s.ErrorIs(merr.Error(resp), merr.ErrServiceNotReady) +} + +func (s *ServerSuite) TestSaveBinlogPath_ChannelNotMatch() { + s.mockChMgr.EXPECT().Match(mock.Anything, mock.Anything).Return(false) + resp, err := s.testServer.SaveBinlogPaths(context.Background(), &datapb.SaveBinlogPathsRequest{ + SegmentID: 1, + Channel: "test", + }) + s.NoError(err) + s.ErrorIs(merr.Error(resp), merr.ErrChannelNotFound) +} + +func (s *ServerSuite) TestSaveBinlogPath_SaveUnhealthySegment() { + s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true) + s.testServer.meta.AddCollection(&collectionInfo{ID: 0}) + + segments := map[int64]commonpb.SegmentState{ + 0: commonpb.SegmentState_NotExist, + } + for segID, state := range segments { + info := &datapb.SegmentInfo{ + ID: segID, + InsertChannel: "ch1", + State: state, + } + err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info)) + s.Require().NoError(err) + } + + ctx := context.Background() + resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + Timestamp: uint64(time.Now().Unix()), + }, + SegmentID: 1, + Channel: "ch1", + }) + s.NoError(err) + s.ErrorIs(merr.Error(resp), merr.ErrSegmentNotFound) + + resp, err = s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + Timestamp: uint64(time.Now().Unix()), + }, + SegmentID: 2, + Channel: "ch1", + }) + s.NoError(err) + s.ErrorIs(merr.Error(resp), merr.ErrSegmentNotFound) +} + +func (s *ServerSuite) TestSaveBinlogPath_SaveDroppedSegment() { + s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true) + s.testServer.meta.AddCollection(&collectionInfo{ID: 0}) + + segments := map[int64]int64{ + 0: 0, + 1: 0, + } + for segID, collID := range segments { + info := &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + } + err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info)) + s.Require().NoError(err) + } + + ctx := context.Background() + resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + Timestamp: uint64(time.Now().Unix()), + }, + SegmentID: 1, + CollectionID: 0, + Channel: "ch1", + Flushed: false, + }) + s.NoError(err) + s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success) + + segment := s.testServer.meta.GetSegment(1) + s.NotNil(segment) + s.EqualValues(0, len(segment.GetBinlogs())) + s.EqualValues(segment.NumOfRows, 0) +} + +func (s *ServerSuite) TestSaveBinlogPath_L0Segment() { + s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true) + s.testServer.meta.AddCollection(&collectionInfo{ID: 0}) + + segment := s.testServer.meta.GetHealthySegment(1) + s.Require().Nil(segment) + ctx := context.Background() + resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + Timestamp: uint64(time.Now().Unix()), + }, + SegmentID: 1, + PartitionID: 1, + CollectionID: 0, + SegLevel: datapb.SegmentLevel_L0, + Channel: "ch1", + Deltalogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "/by-dev/test/0/1/1/1/Allo1", + EntriesNum: 5, + }, + { + LogPath: "/by-dev/test/0/1/1/1/Allo2", + EntriesNum: 5, + }, + }, + }, + }, + CheckPoints: []*datapb.CheckPoint{ + { + SegmentID: 1, + Position: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 0, + }, + NumOfRows: 12, + }, + }, + Flushed: true, + }) + s.NoError(err) + s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success) + + segment = s.testServer.meta.GetHealthySegment(1) + s.NotNil(segment) + s.EqualValues(datapb.SegmentLevel_L0, segment.GetLevel()) +} + +func (s *ServerSuite) TestSaveBinlogPath_NormalCase() { + s.mockChMgr.EXPECT().Match(int64(0), "ch1").Return(true) + s.testServer.meta.AddCollection(&collectionInfo{ID: 0}) + + segments := map[int64]int64{ + 0: 0, + 1: 0, + } + for segID, collID := range segments { + info := &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + } + err := s.testServer.meta.AddSegment(context.TODO(), NewSegmentInfo(info)) + s.Require().NoError(err) + } + + ctx := context.Background() + + resp, err := s.testServer.SaveBinlogPaths(ctx, &datapb.SaveBinlogPathsRequest{ + Base: &commonpb.MsgBase{ + Timestamp: uint64(time.Now().Unix()), + }, + SegmentID: 1, + CollectionID: 0, + Channel: "ch1", + Field2BinlogPaths: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "/by-dev/test/0/1/1/1/Allo1", + EntriesNum: 5, + }, + { + LogPath: "/by-dev/test/0/1/1/1/Allo2", + EntriesNum: 5, + }, + }, + }, + }, + Field2StatslogPaths: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "/by-dev/test_stats/0/1/1/1/Allo1", + EntriesNum: 5, + }, + { + LogPath: "/by-dev/test_stats/0/1/1/1/Allo2", + EntriesNum: 5, + }, + }, + }, + }, + CheckPoints: []*datapb.CheckPoint{ + { + SegmentID: 1, + Position: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 0, + }, + NumOfRows: 12, + }, + }, + Flushed: false, + }) + s.NoError(err) + s.EqualValues(resp.ErrorCode, commonpb.ErrorCode_Success) + + segment := s.testServer.meta.GetHealthySegment(1) + s.NotNil(segment) + binlogs := segment.GetBinlogs() + s.EqualValues(1, len(binlogs)) + fieldBinlogs := binlogs[0] + s.NotNil(fieldBinlogs) + s.EqualValues(2, len(fieldBinlogs.GetBinlogs())) + s.EqualValues(1, fieldBinlogs.GetFieldID()) + s.EqualValues("/by-dev/test/0/1/1/1/Allo1", fieldBinlogs.GetBinlogs()[0].GetLogPath()) + s.EqualValues("/by-dev/test/0/1/1/1/Allo2", fieldBinlogs.GetBinlogs()[1].GetLogPath()) + + s.EqualValues(segment.DmlPosition.ChannelName, "ch1") + s.EqualValues(segment.DmlPosition.MsgID, []byte{1, 2, 3}) + s.EqualValues(segment.NumOfRows, 10) +} + +func (s *ServerSuite) TestFlush_NormalCase() { + req := &datapb.FlushRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Flush, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + DbID: 0, + CollectionID: 0, + } + + s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(mock.Anything).Return(map[int64][]string{ + 1: {"channel-1"}, + }) + + mockCluster := NewMockCluster(s.T()) + mockCluster.EXPECT().FlushChannels(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil) + mockCluster.EXPECT().Close().Maybe() + s.testServer.cluster = mockCluster + + schema := newTestSchema() + s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}}) + allocations, err := s.testServer.segmentManager.AllocSegment(context.TODO(), 0, 1, "channel-1", 1) + s.NoError(err) + s.EqualValues(1, len(allocations)) + expireTs := allocations[0].ExpireTime + segID := allocations[0].SegmentID + + resp, err := s.testServer.Flush(context.TODO(), req) + s.NoError(err) + s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + + s.testServer.meta.SetCurrentRows(segID, 1) + ids, err := s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) + s.NoError(err) + s.EqualValues(1, len(ids)) + s.EqualValues(segID, ids[0]) +} + +func (s *ServerSuite) TestFlush_BulkLoadSegment() { + req := &datapb.FlushRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Flush, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + DbID: 0, + CollectionID: 0, + } + s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(mock.Anything).Return(map[int64][]string{ + 1: {"channel-1"}, + }).Twice() + + mockCluster := NewMockCluster(s.T()) + mockCluster.EXPECT().FlushChannels(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil).Twice() + mockCluster.EXPECT().Close().Maybe() + s.testServer.cluster = mockCluster + + schema := newTestSchema() + s.testServer.meta.AddCollection(&collectionInfo{ID: 0, Schema: schema, Partitions: []int64{}}) + + allocations, err := s.testServer.segmentManager.allocSegmentForImport(context.TODO(), 0, 1, "channel-1", 1, 100) + s.NoError(err) + expireTs := allocations.ExpireTime + segID := allocations.SegmentID + + resp, err := s.testServer.Flush(context.TODO(), req) + s.NoError(err) + s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.EqualValues(0, len(resp.SegmentIDs)) + // should not flush anything since this is a normal flush + s.testServer.meta.SetCurrentRows(segID, 1) + ids, err := s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) + s.NoError(err) + s.EqualValues(0, len(ids)) + + req = &datapb.FlushRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Flush, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + DbID: 0, + CollectionID: 0, + IsImport: true, + } + + resp, err = s.testServer.Flush(context.TODO(), req) + s.NoError(err) + s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.EqualValues(1, len(resp.SegmentIDs)) + + ids, err = s.testServer.segmentManager.GetFlushableSegments(context.TODO(), "channel-1", expireTs) + s.NoError(err) + s.EqualValues(1, len(ids)) + s.EqualValues(segID, ids[0]) +} + +func (s *ServerSuite) TestFlush_ClosedServer() { + s.TearDownTest() + req := &datapb.FlushRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Flush, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + DbID: 0, + CollectionID: 0, + } + resp, err := s.testServer.Flush(context.Background(), req) + s.NoError(err) + s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) +} + +func (s *ServerSuite) TestFlush_RollingUpgrade() { + req := &datapb.FlushRequest{ + Base: &commonpb.MsgBase{ + MsgType: commonpb.MsgType_Flush, + MsgID: 0, + Timestamp: 0, + SourceID: 0, + }, + DbID: 0, + CollectionID: 0, + } + mockCluster := NewMockCluster(s.T()) + mockCluster.EXPECT().FlushChannels(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(merr.WrapErrServiceUnimplemented(grpcStatus.Error(codes.Unimplemented, "mock grpc unimplemented error"))) + mockCluster.EXPECT().Close().Maybe() + s.testServer.cluster = mockCluster + s.mockChMgr.EXPECT().GetNodeChannelsByCollectionID(mock.Anything).Return(map[int64][]string{ + 1: {"channel-1"}, + }).Once() + + resp, err := s.testServer.Flush(context.TODO(), req) + s.NoError(err) + s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.EqualValues(0, resp.GetFlushTs()) +} + +func (s *ServerSuite) TestGetSegmentInfoChannel() { + resp, err := s.testServer.GetSegmentInfoChannel(context.TODO(), nil) + s.NoError(err) + s.EqualValues(commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) + s.EqualValues(Params.CommonCfg.DataCoordSegmentInfo.GetValue(), resp.Value) +} + +func (s *ServerSuite) TestAssignSegmentID() { + s.TearDownTest() + const collID = 100 + const collIDInvalid = 101 + const partID = 0 + const channel0 = "channel0" + + s.Run("assign segment normally", func() { + s.SetupTest() + defer s.TearDownTest() + + schema := newTestSchema() + s.testServer.meta.AddCollection(&collectionInfo{ + ID: collID, + Schema: schema, + Partitions: []int64{}, + }) + req := &datapb.SegmentIDRequest{ + Count: 1000, + ChannelName: channel0, + CollectionID: collID, + PartitionID: partID, + } + + resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ + NodeID: 0, + PeerRole: "", + SegmentIDRequests: []*datapb.SegmentIDRequest{req}, + }) + s.NoError(err) + s.EqualValues(1, len(resp.SegIDAssignments)) + assign := resp.SegIDAssignments[0] + s.EqualValues(commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode()) + s.EqualValues(collID, assign.CollectionID) + s.EqualValues(partID, assign.PartitionID) + s.EqualValues(channel0, assign.ChannelName) + s.EqualValues(1000, assign.Count) + }) + + s.Run("assign segment for bulkload", func() { + s.SetupTest() + defer s.TearDownTest() + + schema := newTestSchema() + s.testServer.meta.AddCollection(&collectionInfo{ + ID: collID, + Schema: schema, + Partitions: []int64{}, + }) + req := &datapb.SegmentIDRequest{ + Count: 1000, + ChannelName: channel0, + CollectionID: collID, + PartitionID: partID, + IsImport: true, + } + + resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ + NodeID: 0, + PeerRole: "", + SegmentIDRequests: []*datapb.SegmentIDRequest{req}, + }) + s.NoError(err) + s.EqualValues(1, len(resp.SegIDAssignments)) + assign := resp.SegIDAssignments[0] + s.EqualValues(commonpb.ErrorCode_Success, assign.GetStatus().GetErrorCode()) + s.EqualValues(collID, assign.CollectionID) + s.EqualValues(partID, assign.PartitionID) + s.EqualValues(channel0, assign.ChannelName) + s.EqualValues(1000, assign.Count) + }) + + s.Run("with closed server", func() { + s.SetupTest() + s.TearDownTest() + + req := &datapb.SegmentIDRequest{ + Count: 100, + ChannelName: channel0, + CollectionID: collID, + PartitionID: partID, + } + resp, err := s.testServer.AssignSegmentID(context.Background(), &datapb.AssignSegmentIDRequest{ + NodeID: 0, + PeerRole: "", + SegmentIDRequests: []*datapb.SegmentIDRequest{req}, + }) + s.NoError(err) + s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) + }) + + s.Run("assign segment with invalid collection", func() { + s.SetupTest() + defer s.TearDownTest() + + s.testServer.rootCoordClient = &mockRootCoord{ + RootCoordClient: s.testServer.rootCoordClient, + collID: collID, + } + + schema := newTestSchema() + s.testServer.meta.AddCollection(&collectionInfo{ + ID: collID, + Schema: schema, + Partitions: []int64{}, + }) + req := &datapb.SegmentIDRequest{ + Count: 1000, + ChannelName: channel0, + CollectionID: collIDInvalid, + PartitionID: partID, + } + + resp, err := s.testServer.AssignSegmentID(context.TODO(), &datapb.AssignSegmentIDRequest{ + NodeID: 0, + PeerRole: "", + SegmentIDRequests: []*datapb.SegmentIDRequest{req}, + }) + s.NoError(err) + s.EqualValues(0, len(resp.SegIDAssignments)) + }) +} + func TestBroadcastAlteredCollection(t *testing.T) { t.Run("test server is closed", func(t *testing.T) { s := &Server{}