From d23f87a393f3534b907726966511eb95f07ce1ad Mon Sep 17 00:00:00 2001 From: SimFG Date: Wed, 3 Jan 2024 13:16:57 +0800 Subject: [PATCH] enhance: Add concurrency for datacoord segment GC (#29561) issue: https://github.com/milvus-io/milvus/issues/29553 /kind improvement Signed-off-by: SimFG --- internal/datacoord/garbage_collector.go | 51 ++++-- internal/datacoord/garbage_collector_test.go | 171 ++++++++++++++----- pkg/util/paramtable/component_param.go | 10 ++ 3 files changed, 180 insertions(+), 52 deletions(-) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index ccf3e96d6e..64a4bffe71 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -49,6 +50,8 @@ type GcOption struct { checkInterval time.Duration // each interval missingTolerance time.Duration // key missing in meta tolerance time dropTolerance time.Duration // dropped segment related key tolerance time + + removeLogPool *conc.Pool[struct{}] } // garbageCollector handles garbage files in object storage @@ -75,6 +78,7 @@ type gcCmd struct { func newGarbageCollector(meta *meta, handler Handler, opt GcOption) *garbageCollector { log.Info("GC with option", zap.Bool("enabled", opt.enabled), zap.Duration("interval", opt.checkInterval), zap.Duration("missingTolerance", opt.missingTolerance), zap.Duration("dropTolerance", opt.dropTolerance)) + opt.removeLogPool = conc.NewPool[struct{}](Params.DataCoordCfg.GCRemoveConcurrent.GetAsInt(), conc.WithExpiryDuration(time.Minute)) return &garbageCollector{ meta: meta, handler: handler, @@ -353,6 +357,7 @@ func (gc *garbageCollector) clearEtcd() { return dropIDs[i] < dropIDs[j] }) + log.Info("start to GC segments", zap.Int("drop_num", len(dropIDs))) for _, segmentID := range dropIDs { segment, ok := drops[segmentID] if !ok { @@ -366,7 +371,10 @@ func (gc *garbageCollector) clearEtcd() { } logs := getLogs(segment) - log.Info("GC segment", zap.Int64("segmentID", segment.GetID())) + log.Info("GC segment", zap.Int64("segmentID", segment.GetID()), + zap.Int("insert_logs", len(segment.GetBinlogs())), + zap.Int("delta_logs", len(segment.GetDeltalogs())), + zap.Int("stats_logs", len(segment.GetStatslogs()))) if gc.removeLogs(logs) { err := gc.meta.DropSegment(segment.GetID()) if err != nil { @@ -409,22 +417,39 @@ func getLogs(sinfo *SegmentInfo) []*datapb.Binlog { func (gc *garbageCollector) removeLogs(logs []*datapb.Binlog) bool { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - delFlag := true + var w sync.WaitGroup + w.Add(len(logs)) for _, l := range logs { - err := gc.option.cli.Remove(ctx, l.GetLogPath()) - if err != nil { - switch err.(type) { - case minio.ErrorResponse: - errResp := minio.ToErrorResponse(err) - if errResp.Code != "" && errResp.Code != "NoSuchKey" { - delFlag = false - } + tmpLog := l + gc.option.removeLogPool.Submit(func() (struct{}, error) { + defer w.Done() + select { + case <-ctx.Done(): + return struct{}{}, nil default: - delFlag = false + err := gc.option.cli.Remove(ctx, tmpLog.GetLogPath()) + if err != nil { + switch err.(type) { + case minio.ErrorResponse: + errResp := minio.ToErrorResponse(err) + if errResp.Code != "" && errResp.Code != "NoSuchKey" { + cancel() + } + default: + cancel() + } + } + return struct{}{}, nil } - } + }) + } + w.Wait() + select { + case <-ctx.Done(): + return false + default: + return true } - return delFlag } func (gc *garbageCollector) recycleUnusedIndexes() { diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 5442ea5538..75d25a80c0 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -404,9 +404,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) { mock.Anything, mock.Anything, ).Return(nil) - gc := &garbageCollector{ - meta: createMetaForRecycleUnusedIndexes(catalog), - } + gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{}) gc.recycleUnusedIndexes() }) @@ -417,9 +415,7 @@ func TestGarbageCollector_recycleUnusedIndexes(t *testing.T) { mock.Anything, mock.Anything, ).Return(errors.New("fail")) - gc := &garbageCollector{ - meta: createMetaForRecycleUnusedIndexes(catalog), - } + gc := newGarbageCollector(createMetaForRecycleUnusedIndexes(catalog), nil, GcOption{}) gc.recycleUnusedIndexes() }) } @@ -545,9 +541,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) { mock.Anything, mock.Anything, ).Return(nil) - gc := &garbageCollector{ - meta: createMetaForRecycleUnusedSegIndexes(catalog), - } + gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{}) gc.recycleUnusedSegIndexes() }) @@ -560,9 +554,7 @@ func TestGarbageCollector_recycleUnusedSegIndexes(t *testing.T) { mock.Anything, mock.Anything, ).Return(errors.New("fail")) - gc := &garbageCollector{ - meta: createMetaForRecycleUnusedSegIndexes(catalog), - } + gc := newGarbageCollector(createMetaForRecycleUnusedSegIndexes(catalog), nil, GcOption{}) gc.recycleUnusedSegIndexes() }) } @@ -707,12 +699,13 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil) cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(nil) cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil) - gc := &garbageCollector{ - meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), - option: GcOption{ + gc := newGarbageCollector( + createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), + nil, + GcOption{ cli: cm, - }, - } + }) + gc.recycleUnusedIndexFiles() }) @@ -720,12 +713,12 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { cm := &mocks.ChunkManager{} cm.EXPECT().RootPath().Return("root") cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, errors.New("error")) - gc := &garbageCollector{ - meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), - option: GcOption{ + gc := newGarbageCollector( + createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), + nil, + GcOption{ cli: cm, - }, - } + }) gc.recycleUnusedIndexFiles() }) @@ -735,12 +728,12 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("error")) cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil) cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(nil) - gc := &garbageCollector{ - meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), - option: GcOption{ + gc := newGarbageCollector( + createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), + nil, + GcOption{ cli: cm, - }, - } + }) gc.recycleUnusedIndexFiles() }) @@ -750,12 +743,12 @@ func TestGarbageCollector_recycleUnusedIndexFiles(t *testing.T) { cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("error")) cm.EXPECT().ListWithPrefix(mock.Anything, mock.Anything, mock.Anything).Return([]string{"a/b/c/", "a/b/600/", "a/b/601/", "a/b/602/"}, nil, nil) cm.EXPECT().RemoveWithPrefix(mock.Anything, mock.Anything).Return(errors.New("error")) - gc := &garbageCollector{ - meta: createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), - option: GcOption{ + gc := newGarbageCollector( + createMetaTableForRecycleUnusedIndexFiles(&datacoord.Catalog{MetaKv: kvmocks.NewMetaKv(t)}), + nil, + GcOption{ cli: cm, - }, - } + }) gc.recycleUnusedIndexFiles() }) } @@ -804,6 +797,57 @@ func TestGarbageCollector_clearETCD(t *testing.T) { DmlPosition: &msgpb.MsgPosition{ Timestamp: 900, }, + Binlogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "log1", + LogSize: 1024, + }, + }, + }, + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + { + LogPath: "log2", + LogSize: 1024, + }, + }, + }, + }, + Deltalogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "del_log1", + LogSize: 1024, + }, + }, + }, + { + FieldID: 2, + Binlogs: []*datapb.Binlog{ + { + LogPath: "del_log2", + LogSize: 1024, + }, + }, + }, + }, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 1, + Binlogs: []*datapb.Binlog{ + { + LogPath: "stats_log1", + LogSize: 1024, + }, + }, + }, + }, }, segmentIndexes: map[UniqueID]*model.SegmentIndex{ indexID: { @@ -1045,14 +1089,13 @@ func TestGarbageCollector_clearETCD(t *testing.T) { } cm := &mocks.ChunkManager{} cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil) - gc := &garbageCollector{ - option: GcOption{ - cli: &mocks.ChunkManager{}, + gc := newGarbageCollector( + m, + newMockHandlerWithMeta(m), + GcOption{ + cli: cm, dropTolerance: 1, - }, - meta: m, - handler: newMockHandlerWithMeta(m), - } + }) gc.clearEtcd() /* @@ -1136,6 +1179,56 @@ func TestGarbageCollector_clearETCD(t *testing.T) { assert.Nil(t, segB) } +func TestGarbageCollector_removelogs(t *testing.T) { + paramtable.Init() + cm := &mocks.ChunkManager{} + gc := newGarbageCollector( + nil, + nil, + GcOption{ + cli: cm, + dropTolerance: 1, + }) + var logs []*datapb.Binlog + for i := 0; i < 50; i++ { + logs = append(logs, &datapb.Binlog{ + LogPath: "log" + strconv.Itoa(i), + }) + } + + t.Run("success", func(t *testing.T) { + call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(nil) + defer call.Unset() + b := gc.removeLogs(logs) + assert.True(t, b) + }) + + t.Run("minio not found error", func(t *testing.T) { + call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(minio.ErrorResponse{ + Code: "NoSuchKey", + }) + defer call.Unset() + b := gc.removeLogs(logs) + assert.True(t, b) + }) + + t.Run("minio server error", func(t *testing.T) { + call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(minio.ErrorResponse{ + Code: "Server Error", + }) + defer call.Unset() + b := gc.removeLogs(logs) + assert.False(t, b) + }) + + t.Run("other type error", func(t *testing.T) { + call := cm.EXPECT().Remove(mock.Anything, mock.Anything).Return(errors.New("other error")) + defer call.Unset() + b := gc.removeLogs(logs) + assert.False(t, b) + }) +} + type GarbageCollectorSuite struct { suite.Suite diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 079f350d41..da0f4694d8 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2238,6 +2238,7 @@ type dataCoordConfig struct { GCInterval ParamItem `refreshable:"false"` GCMissingTolerance ParamItem `refreshable:"false"` GCDropTolerance ParamItem `refreshable:"false"` + GCRemoveConcurrent ParamItem `refreshable:"false"` EnableActiveStandby ParamItem `refreshable:"false"` BindIndexNodeMode ParamItem `refreshable:"false"` @@ -2587,6 +2588,15 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.GCDropTolerance.Init(base.mgr) + p.GCRemoveConcurrent = ParamItem{ + Key: "dataCoord.gc.removeConcurrent", + Version: "2.3.4", + DefaultValue: "32", + Doc: "number of concurrent goroutines to remove dropped s3 objects", + Export: true, + } + p.GCRemoveConcurrent.Init(base.mgr) + p.EnableActiveStandby = ParamItem{ Key: "dataCoord.enableActiveStandby", Version: "2.0.0",