mirror of https://github.com/milvus-io/milvus.git
fix: encountering orphan channel-cp meta after DataCoord GC (#34612)
issue: #34545 Signed-off-by: jaime <yun.zhang@zilliz.com>pull/34624/head
parent
d7966f46ad
commit
a08a0c831f
|
@ -37,6 +37,7 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/metrics"
|
||||
"github.com/milvus-io/milvus/pkg/util/conc"
|
||||
"github.com/milvus-io/milvus/pkg/util/funcutil"
|
||||
"github.com/milvus-io/milvus/pkg/util/merr"
|
||||
"github.com/milvus-io/milvus/pkg/util/metautil"
|
||||
"github.com/milvus-io/milvus/pkg/util/paramtable"
|
||||
|
@ -156,6 +157,7 @@ func (gc *garbageCollector) work(ctx context.Context) {
|
|||
defer gc.wg.Done()
|
||||
gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
|
||||
gc.recycleDroppedSegments(ctx)
|
||||
gc.recycleChannelCPMeta(ctx)
|
||||
gc.recycleUnusedIndexes(ctx)
|
||||
gc.recycleUnusedSegIndexes(ctx)
|
||||
gc.recycleUnusedAnalyzeFiles()
|
||||
|
@ -474,16 +476,47 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
|
|||
continue
|
||||
}
|
||||
log.Info("GC segment meta drop segment done")
|
||||
}
|
||||
}
|
||||
|
||||
if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 &&
|
||||
!gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) {
|
||||
log.Info("empty channel found during gc, manually cleanup channel checkpoints", zap.String("vChannel", segInsertChannel))
|
||||
// TODO: remove channel checkpoint may be lost, need to be handled before segment GC?
|
||||
if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil {
|
||||
log.Warn("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err))
|
||||
}
|
||||
func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
|
||||
channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx)
|
||||
if err != nil {
|
||||
log.Warn("list channel cp fail during GC", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
collectionID2GcStatus := make(map[int64]bool)
|
||||
skippedCnt := 0
|
||||
|
||||
log.Info("start to GC channel cp", zap.Int("vchannelCnt", len(channelCPs)))
|
||||
for vChannel := range channelCPs {
|
||||
collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)
|
||||
|
||||
// !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case
|
||||
if collectionID == -1 {
|
||||
skippedCnt++
|
||||
log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel))
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := collectionID2GcStatus[collectionID]; !ok {
|
||||
collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
|
||||
}
|
||||
|
||||
// Skip to GC if all segments meta of the corresponding collection are not removed
|
||||
if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed {
|
||||
skippedCnt++
|
||||
continue
|
||||
}
|
||||
|
||||
if err := gc.meta.DropChannelCheckpoint(vChannel); err != nil {
|
||||
// Try to GC in the next gc cycle if drop channel cp meta fail.
|
||||
log.Warn("failed to drop channel check point during gc", zap.String("vchannel", vChannel), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt))
|
||||
}
|
||||
|
||||
func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
|
||||
|
|
|
@ -1476,6 +1476,54 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
|
|||
assert.Nil(t, segB)
|
||||
}
|
||||
|
||||
func TestGarbageCollector_recycleChannelMeta(t *testing.T) {
|
||||
catalog := catalogmocks.NewDataCoordCatalog(t)
|
||||
|
||||
m := &meta{
|
||||
catalog: catalog,
|
||||
channelCPs: newChannelCps(),
|
||||
}
|
||||
|
||||
m.channelCPs.checkpoints = map[string]*msgpb.MsgPosition{
|
||||
"cluster-id-rootcoord-dm_0_123v0": nil,
|
||||
"cluster-id-rootcoord-dm_0_124v0": nil,
|
||||
}
|
||||
|
||||
gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{})
|
||||
|
||||
t.Run("list channel cp fail", func(t *testing.T) {
|
||||
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once()
|
||||
gc.recycleChannelCPMeta(context.TODO())
|
||||
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
|
||||
})
|
||||
|
||||
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{
|
||||
"cluster-id-rootcoord-dm_0_123v0": nil,
|
||||
"cluster-id-rootcoord-dm_0_invalidedCollectionIDv0": nil,
|
||||
"cluster-id-rootcoord-dm_0_124v0": nil,
|
||||
}, nil).Twice()
|
||||
|
||||
catalog.EXPECT().GcConfirm(mock.Anything, mock.Anything, mock.Anything).
|
||||
RunAndReturn(func(ctx context.Context, collectionID int64, i2 int64) bool {
|
||||
if collectionID == 123 {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
t.Run("drop channel cp fail", func(t *testing.T) {
|
||||
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
||||
gc.recycleChannelCPMeta(context.TODO())
|
||||
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
|
||||
})
|
||||
|
||||
t.Run("gc ok", func(t *testing.T) {
|
||||
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Once()
|
||||
gc.recycleChannelCPMeta(context.TODO())
|
||||
assert.Equal(t, 1, len(m.channelCPs.checkpoints))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGarbageCollector_removeObjectPool(t *testing.T) {
|
||||
paramtable.Init()
|
||||
cm := mocks.NewChunkManager(t)
|
||||
|
|
|
@ -282,11 +282,11 @@ func CheckCheckPointsHealth(meta *meta) error {
|
|||
for channel, cp := range meta.GetChannelCheckpoints() {
|
||||
collectionID := funcutil.GetCollectionIDFromVChannel(channel)
|
||||
if collectionID == -1 {
|
||||
log.Warn("can't parse collection id from vchannel, skip check cp lag", zap.String("vchannel", channel))
|
||||
log.RatedWarn(60, "can't parse collection id from vchannel, skip check cp lag", zap.String("vchannel", channel))
|
||||
continue
|
||||
}
|
||||
if meta.GetCollection(collectionID) == nil {
|
||||
log.Warn("corresponding the collection doesn't exists, skip check cp lag", zap.String("vchannel", channel))
|
||||
log.RatedWarn(60, "corresponding the collection doesn't exists, skip check cp lag", zap.String("vchannel", channel))
|
||||
continue
|
||||
}
|
||||
ts, _ := tsoutil.ParseTS(cp.Timestamp)
|
||||
|
|
Loading…
Reference in New Issue