diff --git a/internal/datacoord/mock_segment_manager.go b/internal/datacoord/mock_segment_manager.go index 65f306ddef..3f8e2392b1 100644 --- a/internal/datacoord/mock_segment_manager.go +++ b/internal/datacoord/mock_segment_manager.go @@ -145,6 +145,41 @@ func (_c *MockManager_AllocSegment_Call) RunAndReturn(run func(context.Context, return _c } +// CleanZeroSealedSegmentsOfChannel provides a mock function with given fields: ctx, channel, cpTs +func (_m *MockManager) CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs uint64) { + _m.Called(ctx, channel, cpTs) +} + +// MockManager_CleanZeroSealedSegmentsOfChannel_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CleanZeroSealedSegmentsOfChannel' +type MockManager_CleanZeroSealedSegmentsOfChannel_Call struct { + *mock.Call +} + +// CleanZeroSealedSegmentsOfChannel is a helper method to define mock.On call +// - ctx context.Context +// - channel string +// - cpTs uint64 +func (_e *MockManager_Expecter) CleanZeroSealedSegmentsOfChannel(ctx interface{}, channel interface{}, cpTs interface{}) *MockManager_CleanZeroSealedSegmentsOfChannel_Call { + return &MockManager_CleanZeroSealedSegmentsOfChannel_Call{Call: _e.mock.On("CleanZeroSealedSegmentsOfChannel", ctx, channel, cpTs)} +} + +func (_c *MockManager_CleanZeroSealedSegmentsOfChannel_Call) Run(run func(ctx context.Context, channel string, cpTs uint64)) *MockManager_CleanZeroSealedSegmentsOfChannel_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(uint64)) + }) + return _c +} + +func (_c *MockManager_CleanZeroSealedSegmentsOfChannel_Call) Return() *MockManager_CleanZeroSealedSegmentsOfChannel_Call { + _c.Call.Return() + return _c +} + +func (_c *MockManager_CleanZeroSealedSegmentsOfChannel_Call) RunAndReturn(run func(context.Context, string, uint64)) *MockManager_CleanZeroSealedSegmentsOfChannel_Call { + _c.Call.Return(run) + return _c +} + // DropSegment provides a mock function with given fields: ctx, channel, segmentID func (_m *MockManager) DropSegment(ctx context.Context, channel string, segmentID int64) { _m.Called(ctx, channel, segmentID) diff --git a/internal/datacoord/mock_trigger_manager.go b/internal/datacoord/mock_trigger_manager.go index 4a0b72c460..fdb093a8e4 100644 --- a/internal/datacoord/mock_trigger_manager.go +++ b/internal/datacoord/mock_trigger_manager.go @@ -79,6 +79,39 @@ func (_c *MockTriggerManager_ManualTrigger_Call) RunAndReturn(run func(context.C return _c } +// OnCollectionUpdate provides a mock function with given fields: collectionID +func (_m *MockTriggerManager) OnCollectionUpdate(collectionID int64) { + _m.Called(collectionID) +} + +// MockTriggerManager_OnCollectionUpdate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnCollectionUpdate' +type MockTriggerManager_OnCollectionUpdate_Call struct { + *mock.Call +} + +// OnCollectionUpdate is a helper method to define mock.On call +// - collectionID int64 +func (_e *MockTriggerManager_Expecter) OnCollectionUpdate(collectionID interface{}) *MockTriggerManager_OnCollectionUpdate_Call { + return &MockTriggerManager_OnCollectionUpdate_Call{Call: _e.mock.On("OnCollectionUpdate", collectionID)} +} + +func (_c *MockTriggerManager_OnCollectionUpdate_Call) Run(run func(collectionID int64)) *MockTriggerManager_OnCollectionUpdate_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64)) + }) + return _c +} + +func (_c *MockTriggerManager_OnCollectionUpdate_Call) Return() *MockTriggerManager_OnCollectionUpdate_Call { + _c.Call.Return() + return _c +} + +func (_c *MockTriggerManager_OnCollectionUpdate_Call) RunAndReturn(run func(int64)) *MockTriggerManager_OnCollectionUpdate_Call { + _c.Call.Return(run) + return _c +} + // Start provides a mock function with given fields: func (_m *MockTriggerManager) Start() { _m.Called() diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index f66f5a0b0d..0d3361fdf4 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -90,6 +90,8 @@ type Manager interface { ExpireAllocations(ctx context.Context, channel string, ts Timestamp) // DropSegmentsOfChannel drops all segments in a channel DropSegmentsOfChannel(ctx context.Context, channel string) + // CleanZeroSealedSegmentsOfChannel try to clean real empty sealed segments in a channel + CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs Timestamp) } // Allocation records the allocation info @@ -511,9 +513,6 @@ func (s *SegmentManager) GetFlushableSegments(ctx context.Context, channel strin return nil, err } - // TODO: It's too frequent; perhaps each channel could check once per minute instead. - s.cleanupSealedSegment(ctx, t, channel) - sealed, ok := s.channel2Sealed.Get(channel) if !ok { return nil, nil @@ -565,26 +564,35 @@ func (s *SegmentManager) ExpireAllocations(ctx context.Context, channel string, }) } -func (s *SegmentManager) cleanupSealedSegment(ctx context.Context, ts Timestamp, channel string) { +func (s *SegmentManager) CleanZeroSealedSegmentsOfChannel(ctx context.Context, channel string, cpTs Timestamp) { + s.channelLock.Lock(channel) + defer s.channelLock.Unlock(channel) + sealed, ok := s.channel2Sealed.Get(channel) if !ok { + log.Info("try remove empty sealed segment after channel cp updated failed to get channel", zap.String("channel", channel)) return } sealed.Range(func(id int64) bool { segment := s.meta.GetHealthySegment(ctx, id) if segment == nil { - log.Warn("failed to get segment, remove it", zap.String("channel", channel), zap.Int64("segmentID", id)) + log.Warn("try remove empty sealed segment, failed to get segment, remove it in channel2Sealed", zap.String("channel", channel), zap.Int64("segmentID", id)) sealed.Remove(id) return true } // Check if segment is empty - if segment.GetLastExpireTime() <= ts && segment.currRows == 0 { - log.Info("remove empty sealed segment", zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id)) + if segment.GetLastExpireTime() > 0 && segment.GetLastExpireTime() < cpTs && segment.currRows == 0 && segment.GetNumOfRows() == 0 { + log.Info("try remove empty sealed segment after channel cp updated", + zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id), + zap.String("channel", channel), zap.Any("cpTs", cpTs)) if err := s.meta.SetState(ctx, id, commonpb.SegmentState_Dropped); err != nil { - log.Warn("failed to set segment state to dropped", zap.String("channel", channel), + log.Warn("try remove empty sealed segment after channel cp updated, failed to set segment state to dropped", zap.String("channel", channel), zap.Int64("segmentID", id), zap.Error(err)) } else { sealed.Remove(id) + log.Info("succeed to remove empty sealed segment", + zap.Int64("collection", segment.CollectionID), zap.Int64("segment", id), + zap.String("channel", channel), zap.Any("cpTs", cpTs), zap.Any("expireTs", segment.GetLastExpireTime())) } } return true diff --git a/internal/datacoord/segment_manager_test.go b/internal/datacoord/segment_manager_test.go index fdeefcc52a..68374ab376 100644 --- a/internal/datacoord/segment_manager_test.go +++ b/internal/datacoord/segment_manager_test.go @@ -28,11 +28,13 @@ import ( "github.com/stretchr/testify/mock" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/datacoord/allocator" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" mockkv "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" + "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/lock" @@ -498,6 +500,15 @@ func TestGetFlushableSegments(t *testing.T) { assert.EqualValues(t, allocations[0].SegmentID, ids[0]) meta.SetCurrentRows(allocations[0].SegmentID, 0) + postions := make([]*msgpb.MsgPosition, 0) + cpTs := allocations[0].ExpireTime + 1 + postions = append(postions, &msgpb.MsgPosition{ + ChannelName: "c1", + MsgID: []byte{1, 2, 3}, + Timestamp: cpTs, + }) + meta.UpdateChannelCheckpoints(context.TODO(), postions) + segmentManager.CleanZeroSealedSegmentsOfChannel(context.TODO(), "c1", cpTs) ids, err = segmentManager.GetFlushableSegments(context.TODO(), "c1", allocations[0].ExpireTime) assert.NoError(t, err) assert.Empty(t, ids) @@ -884,3 +895,148 @@ func TestSegmentManager_DropSegmentsOfChannel(t *testing.T) { }) } } + +func TestSegmentManager_CleanZeroSealedSegmentsOfChannel(t *testing.T) { + partitionID := int64(100) + type fields struct { + meta *meta + segments []UniqueID + } + type args struct { + channel string + cpTs Timestamp + } + + mockCatalog := mocks.NewDataCoordCatalog(t) + mockCatalog.EXPECT().AlterSegments(mock.Anything, mock.Anything, mock.Anything).Return(nil) + + seg1 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + PartitionID: partitionID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Sealed, + NumOfRows: 1, + LastExpireTime: 100, + }, + currRows: 1, + } + seg2 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + PartitionID: partitionID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Sealed, + NumOfRows: 0, + LastExpireTime: 100, + }, + currRows: 0, + } + seg3 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + PartitionID: partitionID, + InsertChannel: "ch1", + State: commonpb.SegmentState_Sealed, + LastExpireTime: 90, + }, + } + seg4 := &SegmentInfo{ + SegmentInfo: &datapb.SegmentInfo{ + ID: 4, + PartitionID: partitionID, + InsertChannel: "ch2", + State: commonpb.SegmentState_Growing, + NumOfRows: 1, + LastExpireTime: 100, + }, + currRows: 1, + } + newMetaFunc := func() *meta { + return &meta{ + catalog: mockCatalog, + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + }, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + 0: {1: seg1, 2: seg2, 3: seg3, 4: seg4}, + }, + channel2Segments: map[string]map[UniqueID]*SegmentInfo{ + "ch1": {1: seg1, 2: seg2, 3: seg3}, + "ch2": {4: seg4}, + }, + }, + }, + } + } + + tests := []struct { + name string + fields fields + args args + want []UniqueID + }{ + { + "test clean empty sealed segments with normal channel cp <= lastExpireTs", + fields{ + meta: newMetaFunc(), + segments: []UniqueID{1, 2, 3, 4}, + }, + args{ + "ch1", 100, + }, + []UniqueID{1, 2, 4}, + }, + { + "test clean empty sealed segments with normal channel cp > lastExpireTs", + fields{ + meta: newMetaFunc(), + segments: []UniqueID{1, 2, 3, 4}, + }, + args{ + "ch1", 101, + }, + []UniqueID{1, 4}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + s := &SegmentManager{ + meta: tt.fields.meta, + channelLock: lock.NewKeyLock[string](), + channel2Growing: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + channel2Sealed: typeutil.NewConcurrentMap[string, typeutil.UniqueSet](), + } + for _, segmentID := range tt.fields.segments { + segmentInfo := tt.fields.meta.GetSegment(context.TODO(), segmentID) + channel := tt.args.channel + if segmentInfo != nil { + channel = segmentInfo.GetInsertChannel() + } + if segmentInfo == nil || segmentInfo.GetState() == commonpb.SegmentState_Growing { + growing, _ := s.channel2Growing.GetOrInsert(channel, typeutil.NewUniqueSet()) + growing.Insert(segmentID) + } else if segmentInfo.GetState() == commonpb.SegmentState_Sealed { + sealed, _ := s.channel2Sealed.GetOrInsert(channel, typeutil.NewUniqueSet()) + sealed.Insert(segmentID) + } + } + s.CleanZeroSealedSegmentsOfChannel(context.TODO(), tt.args.channel, tt.args.cpTs) + all := make([]int64, 0) + s.channel2Sealed.Range(func(_ string, segments typeutil.UniqueSet) bool { + all = append(all, segments.Collect()...) + return true + }) + s.channel2Growing.Range(func(_ string, segments typeutil.UniqueSet) bool { + all = append(all, segments.Collect()...) + return true + }) + assert.ElementsMatch(t, tt.want, all) + }) + } +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 0f659ecdcc..e914479c87 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1448,6 +1448,13 @@ func (s *Server) UpdateChannelCheckpoint(ctx context.Context, req *datapb.Update return merr.Status(err), nil } + for _, pos := range checkpoints { + if pos == nil || pos.GetMsgID() == nil || pos.GetChannelName() == "" { + continue + } + s.segmentManager.CleanZeroSealedSegmentsOfChannel(ctx, pos.GetChannelName(), pos.GetTimestamp()) + } + return merr.Success(), nil }