diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 4296e69449..a910dbb039 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/metastore/model" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -53,6 +54,7 @@ type GcOption struct { dropTolerance time.Duration // dropped segment related key tolerance time scanInterval time.Duration // interval for scan residue for interupted log wrttien + broker broker.Broker removeObjectPool *conc.Pool[struct{}] } @@ -489,7 +491,7 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) { collectionID2GcStatus := make(map[int64]bool) skippedCnt := 0 - log.Info("start to GC channel cp", zap.Int("vchannelCnt", len(channelCPs))) + log.Info("start to GC channel cp", zap.Int("vchannelCPCnt", len(channelCPs))) for vChannel := range channelCPs { collectionID := funcutil.GetCollectionIDFromVChannel(vChannel) @@ -500,8 +502,20 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) { continue } - if _, ok := collectionID2GcStatus[collectionID]; !ok { - collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1) + _, ok := collectionID2GcStatus[collectionID] + if !ok { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + has, err := gc.option.broker.HasCollection(ctx, collectionID) + if err == nil && !has { + collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1) + } else { + // skip checkpoints GC of this cycle if describe collection fails or the collection state is available. + log.Debug("skip channel cp GC, the collection state is available", + zap.Int64("collectionID", collectionID), + zap.Bool("dropped", has), zap.Error(err)) + collectionID2GcStatus[collectionID] = false + } } // Skip to GC if all segments meta of the corresponding collection are not removed diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index e64ca2522e..0a360a4ddb 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -39,6 +39,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + broker2 "github.com/milvus-io/milvus/internal/datacoord/broker" kvmocks "github.com/milvus-io/milvus/internal/kv/mocks" "github.com/milvus-io/milvus/internal/metastore" "github.com/milvus-io/milvus/internal/metastore/kv/datacoord" @@ -1486,22 +1487,28 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) { m.channelCPs.checkpoints = map[string]*msgpb.MsgPosition{ "cluster-id-rootcoord-dm_0_123v0": nil, + "cluster-id-rootcoord-dm_1_123v0": nil, "cluster-id-rootcoord-dm_0_124v0": nil, } - gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{}) + broker := broker2.NewMockBroker(t) + broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(true, nil).Twice() + + gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{broker: broker}) t.Run("list channel cp fail", func(t *testing.T) { catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once() gc.recycleChannelCPMeta(context.TODO()) - assert.Equal(t, 2, len(m.channelCPs.checkpoints)) + assert.Equal(t, 3, len(m.channelCPs.checkpoints)) }) + catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Unset() catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{ "cluster-id-rootcoord-dm_0_123v0": nil, + "cluster-id-rootcoord-dm_1_123v0": nil, "cluster-id-rootcoord-dm_0_invalidedCollectionIDv0": nil, "cluster-id-rootcoord-dm_0_124v0": nil, - }, nil).Twice() + }, nil).Times(3) catalog.EXPECT().GcConfirm(mock.Anything, mock.Anything, mock.Anything). RunAndReturn(func(ctx context.Context, collectionID int64, i2 int64) bool { @@ -1509,16 +1516,22 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) { return true } return false - }) + }).Maybe() - t.Run("drop channel cp fail", func(t *testing.T) { - catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() + t.Run("skip drop channel due to collection is available", func(t *testing.T) { gc.recycleChannelCPMeta(context.TODO()) - assert.Equal(t, 2, len(m.channelCPs.checkpoints)) + assert.Equal(t, 3, len(m.channelCPs.checkpoints)) }) - t.Run("gc ok", func(t *testing.T) { - catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Once() + broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil).Times(4) + t.Run("drop channel cp fail", func(t *testing.T) { + catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Twice() + gc.recycleChannelCPMeta(context.TODO()) + assert.Equal(t, 3, len(m.channelCPs.checkpoints)) + }) + + t.Run("channel cp gc ok", func(t *testing.T) { + catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Twice() gc.recycleChannelCPMeta(context.TODO()) assert.Equal(t, 1, len(m.channelCPs.checkpoints)) }) diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 9830690c73..137990b74b 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -533,6 +533,7 @@ func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) { func (s *Server) initGarbageCollection(cli storage.ChunkManager) { s.garbageCollector = newGarbageCollector(s.meta, s.handler, GcOption{ cli: cli, + broker: s.broker, enabled: Params.DataCoordCfg.EnableGarbageCollection.GetAsBool(), checkInterval: Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second), scanInterval: Params.DataCoordCfg.GCScanIntervalInHour.GetAsDuration(time.Hour),