mirror of https://github.com/milvus-io/milvus.git
fix: mistaken deletions may occur during GC channel checkpoints (#35707)
issue: #35706 Signed-off-by: jaime <yun.zhang@zilliz.com>pull/35765/head
parent
7de210ff28
commit
b7ea1defd3
|
@ -29,6 +29,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/datacoord/broker"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
|
||||
"github.com/milvus-io/milvus/internal/metastore/model"
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
|
@ -53,6 +54,7 @@ type GcOption struct {
|
|||
dropTolerance time.Duration // dropped segment related key tolerance time
|
||||
scanInterval time.Duration // interval for scan residue for interupted log wrttien
|
||||
|
||||
broker broker.Broker
|
||||
removeObjectPool *conc.Pool[struct{}]
|
||||
}
|
||||
|
||||
|
@ -489,7 +491,7 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
|
|||
collectionID2GcStatus := make(map[int64]bool)
|
||||
skippedCnt := 0
|
||||
|
||||
log.Info("start to GC channel cp", zap.Int("vchannelCnt", len(channelCPs)))
|
||||
log.Info("start to GC channel cp", zap.Int("vchannelCPCnt", len(channelCPs)))
|
||||
for vChannel := range channelCPs {
|
||||
collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)
|
||||
|
||||
|
@ -500,8 +502,20 @@ func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
|
||||
if _, ok := collectionID2GcStatus[collectionID]; !ok {
|
||||
collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
|
||||
_, ok := collectionID2GcStatus[collectionID]
|
||||
if !ok {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
defer cancel()
|
||||
has, err := gc.option.broker.HasCollection(ctx, collectionID)
|
||||
if err == nil && !has {
|
||||
collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
|
||||
} else {
|
||||
// skip checkpoints GC of this cycle if describe collection fails or the collection state is available.
|
||||
log.Debug("skip channel cp GC, the collection state is available",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Bool("dropped", has), zap.Error(err))
|
||||
collectionID2GcStatus[collectionID] = false
|
||||
}
|
||||
}
|
||||
|
||||
// Skip to GC if all segments meta of the corresponding collection are not removed
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
|
||||
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
|
||||
broker2 "github.com/milvus-io/milvus/internal/datacoord/broker"
|
||||
kvmocks "github.com/milvus-io/milvus/internal/kv/mocks"
|
||||
"github.com/milvus-io/milvus/internal/metastore"
|
||||
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
|
||||
|
@ -1486,22 +1487,28 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) {
|
|||
|
||||
m.channelCPs.checkpoints = map[string]*msgpb.MsgPosition{
|
||||
"cluster-id-rootcoord-dm_0_123v0": nil,
|
||||
"cluster-id-rootcoord-dm_1_123v0": nil,
|
||||
"cluster-id-rootcoord-dm_0_124v0": nil,
|
||||
}
|
||||
|
||||
gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{})
|
||||
broker := broker2.NewMockBroker(t)
|
||||
broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(true, nil).Twice()
|
||||
|
||||
gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{broker: broker})
|
||||
|
||||
t.Run("list channel cp fail", func(t *testing.T) {
|
||||
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once()
|
||||
gc.recycleChannelCPMeta(context.TODO())
|
||||
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
|
||||
assert.Equal(t, 3, len(m.channelCPs.checkpoints))
|
||||
})
|
||||
|
||||
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Unset()
|
||||
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{
|
||||
"cluster-id-rootcoord-dm_0_123v0": nil,
|
||||
"cluster-id-rootcoord-dm_1_123v0": nil,
|
||||
"cluster-id-rootcoord-dm_0_invalidedCollectionIDv0": nil,
|
||||
"cluster-id-rootcoord-dm_0_124v0": nil,
|
||||
}, nil).Twice()
|
||||
}, nil).Times(3)
|
||||
|
||||
catalog.EXPECT().GcConfirm(mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, collectionID int64, i2 int64) bool {
|
||||
|
@ -1509,16 +1516,22 @@ func TestGarbageCollector_recycleChannelMeta(t *testing.T) {
|
|||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
}).Maybe()
|
||||
|
||||
t.Run("drop channel cp fail", func(t *testing.T) {
|
||||
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
||||
t.Run("skip drop channel due to collection is available", func(t *testing.T) {
|
||||
gc.recycleChannelCPMeta(context.TODO())
|
||||
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
|
||||
assert.Equal(t, 3, len(m.channelCPs.checkpoints))
|
||||
})
|
||||
|
||||
t.Run("gc ok", func(t *testing.T) {
|
||||
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
broker.EXPECT().HasCollection(mock.Anything, mock.Anything).Return(false, nil).Times(4)
|
||||
t.Run("drop channel cp fail", func(t *testing.T) {
|
||||
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Twice()
|
||||
gc.recycleChannelCPMeta(context.TODO())
|
||||
assert.Equal(t, 3, len(m.channelCPs.checkpoints))
|
||||
})
|
||||
|
||||
t.Run("channel cp gc ok", func(t *testing.T) {
|
||||
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Twice()
|
||||
gc.recycleChannelCPMeta(context.TODO())
|
||||
assert.Equal(t, 1, len(m.channelCPs.checkpoints))
|
||||
})
|
||||
|
|
|
@ -533,6 +533,7 @@ func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
|
|||
func (s *Server) initGarbageCollection(cli storage.ChunkManager) {
|
||||
s.garbageCollector = newGarbageCollector(s.meta, s.handler, GcOption{
|
||||
cli: cli,
|
||||
broker: s.broker,
|
||||
enabled: Params.DataCoordCfg.EnableGarbageCollection.GetAsBool(),
|
||||
checkInterval: Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second),
|
||||
scanInterval: Params.DataCoordCfg.GCScanIntervalInHour.GetAsDuration(time.Hour),
|
||||
|
|
Loading…
Reference in New Issue