From 74b4369c5b84c6c55085d63caf13ab76deba5311 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 16 Jan 2025 20:19:02 +0800 Subject: [PATCH] fix: Record active collections for l0Policy (#39217) By recording the active collection lists, The l0 compaction trigger of view change and idle won't influence each other. Also this pr replaces the L0View cache with real L0 segments' change. Save some memory and make L0 compaction triggers more accurate. See also: #39187 Signed-off-by: yangxuan --------- Signed-off-by: yangxuan --- .gitignore | 1 + internal/datacoord/compaction_policy_l0.go | 191 +++++++++--------- .../datacoord/compaction_policy_l0_test.go | 126 ++++++------ internal/datacoord/compaction_trigger_v2.go | 89 ++++---- .../datacoord/compaction_trigger_v2_test.go | 20 +- internal/datacoord/services.go | 2 + internal/datacoord/task_stats.go | 2 +- 7 files changed, 214 insertions(+), 217 deletions(-) diff --git a/.gitignore b/.gitignore index b6adfcbdb4..041d808936 100644 --- a/.gitignore +++ b/.gitignore @@ -104,3 +104,4 @@ internal/proto/**/*.pb.go internal/core/src/pb/*.pb.h internal/core/src/pb/*.pb.cc **/legacypb/*.pb.go +pkg/streaming/proto/**/*.pb.go diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 27df4ca29e..c0fe9e66fd 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -1,27 +1,29 @@ package datacoord import ( + "sync" + "time" + "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type l0CompactionPolicy struct { meta *meta - view *FullViews - emptyLoopCount *atomic.Int64 + activeCollections *activeCollections } func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy { return &l0CompactionPolicy{ - meta: meta, - // donot share views with other compaction policy - view: &FullViews{collections: make(map[int64][]*SegmentView)}, - emptyLoopCount: atomic.NewInt64(0), + meta: meta, + activeCollections: newActiveCollections(), } } @@ -29,93 +31,53 @@ func (policy *l0CompactionPolicy) Enable() bool { return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() } -func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { - // support config hot refresh - events := policy.generateEventForLevelZeroViewChange() - if len(events) != 0 { - // each time when triggers a compaction, the idleTicker would reset - policy.emptyLoopCount.Store(0) - return events, nil - } - policy.emptyLoopCount.Inc() - - if policy.emptyLoopCount.Load() >= 3 { - policy.emptyLoopCount.Store(0) - return policy.generateEventForLevelZeroViewIDLE(), nil - } - - return make(map[CompactionTriggerType][]CompactionView), nil +// Notify policy to record the active updated(when adding a new L0 segment) collections. +func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) { + policy.activeCollections.Record(collectionID) } -func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) { +func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) { + events = make(map[CompactionTriggerType][]CompactionView) latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection() - latestCollIDs := lo.Keys(latestCollSegs) - viewCollIDs := lo.Keys(policy.view.collections) - _, diffRemove := lo.Difference(latestCollIDs, viewCollIDs) - for _, collID := range diffRemove { - delete(policy.view.collections, collID) - } + // 1. Get active collections + activeColls := policy.activeCollections.GetActiveCollections() - refreshedL0Views := policy.RefreshLevelZeroViews(latestCollSegs) - if len(refreshedL0Views) > 0 { - events = make(map[CompactionTriggerType][]CompactionView) - events[TriggerTypeLevelZeroViewChange] = refreshedL0Views - } + // 2. Idle collections = all collections - active collections + missCached, idleColls := lo.Difference(activeColls, lo.Keys(latestCollSegs)) + policy.activeCollections.ClearMissCached(missCached...) - return events -} - -func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView { - var allRefreshedL0Veiws []CompactionView + idleCollsSet := typeutil.NewUniqueSet(idleColls...) + activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{} for collID, segments := range latestCollSegs { + policy.activeCollections.Read(collID) + levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { return info.GetLevel() == datapb.SegmentLevel_L0 }) - latestL0Segments := GetViewsByInfo(levelZeroSegments...) - needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments) - if needRefresh { - log.Info("Refresh compaction level zero views", - zap.Int64("collectionID", collID), - zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string { - return view.String() - }))) - policy.view.collections[collID] = latestL0Segments + if len(levelZeroSegments) == 0 { + continue } - if len(collRefreshedViews) > 0 { - allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...) + labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...)) + if idleCollsSet.Contain(collID) { + idleL0Views = append(idleL0Views, labelViews...) + } else { + activeL0Views = append(activeL0Views, labelViews...) } + + } + if len(activeL0Views) > 0 { + events[TriggerTypeLevelZeroViewChange] = activeL0Views } - return allRefreshedL0Veiws -} - -func (policy *l0CompactionPolicy) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) { - cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { - return v.Level == datapb.SegmentLevel_L0 - }) - - if len(LevelZeroViews) == 0 && len(cachedViews) != 0 { - needRefresh = true - return - } - - latestViews := policy.groupL0ViewsByPartChan(collID, LevelZeroViews) - for _, latestView := range latestViews { - views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool { - return v.label.Equal(latestView.GetGroupLabel()) - }) - - if !latestView.Equal(views) { - refreshed = append(refreshed, latestView) - needRefresh = true - } + if len(idleL0Views) > 0 { + events[TriggerTypeLevelZeroViewIDLE] = idleL0Views } return } -func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView { +func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView { partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key for _, view := range levelZeroSegments { key := view.label.Key() @@ -130,26 +92,71 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, } } - return partChanView + return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView { + return view + }) } -func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView { - events := make(map[CompactionTriggerType][]CompactionView, 0) - for collID := range policy.view.collections { - cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { - return v.Level == datapb.SegmentLevel_L0 - }) - if len(cachedViews) > 0 { - log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event") - grouped := policy.groupL0ViewsByPartChan(collID, cachedViews) - events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped), - func(l0View *LevelZeroSegmentsView, _ int) CompactionView { - return l0View - }) - log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID)) - break +type activeCollection struct { + ID int64 + lastRefresh time.Time + readCount *atomic.Int64 +} + +func newActiveCollection(ID int64) *activeCollection { + return &activeCollection{ + ID: ID, + lastRefresh: time.Now(), + readCount: atomic.NewInt64(0), + } +} + +type activeCollections struct { + collections map[int64]*activeCollection + collGuard sync.RWMutex +} + +func newActiveCollections() *activeCollections { + return &activeCollections{ + collections: make(map[int64]*activeCollection), + } +} + +func (ac *activeCollections) ClearMissCached(collectionIDs ...int64) { + ac.collGuard.Lock() + defer ac.collGuard.Unlock() + lo.ForEach(collectionIDs, func(collID int64, _ int) { + delete(ac.collections, collID) + }) +} + +func (ac *activeCollections) Record(collectionID int64) { + ac.collGuard.Lock() + defer ac.collGuard.Unlock() + if _, ok := ac.collections[collectionID]; !ok { + ac.collections[collectionID] = newActiveCollection(collectionID) + } else { + ac.collections[collectionID].lastRefresh = time.Now() + ac.collections[collectionID].readCount.Store(0) + } +} + +func (ac *activeCollections) Read(collectionID int64) { + ac.collGuard.Lock() + defer ac.collGuard.Unlock() + if _, ok := ac.collections[collectionID]; ok { + ac.collections[collectionID].readCount.Inc() + if ac.collections[collectionID].readCount.Load() >= 3 && + time.Since(ac.collections[collectionID].lastRefresh) > 3*paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second) { + log.Info("Active(of deletions) collections become idle", zap.Int64("collectionID", collectionID)) + delete(ac.collections, collectionID) } } - - return events +} + +func (ac *activeCollections) GetActiveCollections() []int64 { + ac.collGuard.RLock() + defer ac.collGuard.RUnlock() + + return lo.Keys(ac.collections) } diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index 760ee5771a..a10eebcc76 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -17,6 +17,7 @@ package datacoord import ( "testing" + "time" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestL0CompactionPolicySuite(t *testing.T) { @@ -44,14 +46,63 @@ type L0CompactionPolicySuite struct { l0_policy *l0CompactionPolicy } +func (s *L0CompactionPolicySuite) SetupTest() { + s.testLabel = &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + segments := genSegmentsForMeta(s.testLabel) + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + + s.l0_policy = newL0CompactionPolicy(meta) +} + const MB = 1024 * 1024 -func (s *L0CompactionPolicySuite) TestTrigger() { - s.Require().Empty(s.l0_policy.view.collections) +func (s *L0CompactionPolicySuite) TestActiveToIdle() { + paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) + + s.l0_policy.OnCollectionUpdate(1) + s.Require().EqualValues(1, s.l0_policy.activeCollections.GetActiveCollections()[0]) + + <-time.After(3 * time.Second) + + for range 3 { + gotViews, err := s.l0_policy.Trigger() + s.NoError(err) + s.NotNil(gotViews) + s.NotEmpty(gotViews) + _, ok := gotViews[TriggerTypeLevelZeroViewChange] + s.True(ok) + } + + s.Empty(s.l0_policy.activeCollections.GetActiveCollections()) + gotViews, err := s.l0_policy.Trigger() + s.NoError(err) + s.NotNil(gotViews) + s.NotEmpty(gotViews) + _, ok := gotViews[TriggerTypeLevelZeroViewIDLE] + s.True(ok) +} + +func (s *L0CompactionPolicySuite) TestTriggerIdle() { + s.Require().Empty(s.l0_policy.activeCollections.GetActiveCollections()) events, err := s.l0_policy.Trigger() s.NoError(err) + s.NotEmpty(events) + gotViews, ok := events[TriggerTypeLevelZeroViewChange] + s.False(ok) + s.Empty(gotViews) + + gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] s.True(ok) s.NotNil(gotViews) s.Equal(1, len(gotViews)) @@ -63,31 +114,9 @@ func (s *L0CompactionPolicySuite) TestTrigger() { s.Equal(datapb.SegmentLevel_L0, view.Level) } log.Info("cView", zap.String("string", cView.String())) +} - // Test for idle trigger - for i := 0; i < 2; i++ { - events, err = s.l0_policy.Trigger() - s.NoError(err) - s.Equal(0, len(events)) - } - s.EqualValues(2, s.l0_policy.emptyLoopCount.Load()) - - events, err = s.l0_policy.Trigger() - s.NoError(err) - s.EqualValues(0, s.l0_policy.emptyLoopCount.Load()) - s.Equal(1, len(events)) - gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] - s.True(ok) - s.NotNil(gotViews) - s.Equal(1, len(gotViews)) - cView = gotViews[0] - s.Equal(s.testLabel, cView.GetGroupLabel()) - s.Equal(4, len(cView.GetSegmentsView())) - for _, view := range cView.GetSegmentsView() { - s.Equal(datapb.SegmentLevel_L0, view.Level) - } - log.Info("cView", zap.String("string", cView.String())) - +func (s *L0CompactionPolicySuite) TestTriggerViewChange() { segArgs := []struct { ID UniqueID PosT Timestamp @@ -114,34 +143,17 @@ func (s *L0CompactionPolicySuite) TestTrigger() { } s.l0_policy.meta = meta - events, err = s.l0_policy.Trigger() + s.l0_policy.OnCollectionUpdate(s.testLabel.CollectionID) + events, err := s.l0_policy.Trigger() s.NoError(err) - gotViews, ok = events[TriggerTypeLevelZeroViewChange] - s.True(ok) - s.Equal(1, len(gotViews)) -} - -func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChange() { - s.Require().Empty(s.l0_policy.view.collections) - - events := s.l0_policy.generateEventForLevelZeroViewChange() - s.NotEmpty(events) - s.NotEmpty(s.l0_policy.view.collections) - + s.Equal(1, len(events)) gotViews, ok := events[TriggerTypeLevelZeroViewChange] s.True(ok) - s.NotNil(gotViews) s.Equal(1, len(gotViews)) - storedViews, ok := s.l0_policy.view.collections[s.testLabel.CollectionID] - s.True(ok) - s.NotNil(storedViews) - s.Equal(4, len(storedViews)) - - for _, view := range storedViews { - s.Equal(s.testLabel, view.label) - s.Equal(datapb.SegmentLevel_L0, view.Level) - } + gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] + s.False(ok) + s.Empty(gotViews) } func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { @@ -185,22 +197,6 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { return segments } -func (s *L0CompactionPolicySuite) SetupTest() { - s.testLabel = &CompactionGroupLabel{ - CollectionID: 1, - PartitionID: 10, - Channel: "ch-1", - } - - segments := genSegmentsForMeta(s.testLabel) - meta := &meta{segments: NewSegmentsInfo()} - for id, segment := range segments { - meta.segments.SetSegment(id, segment) - } - - s.l0_policy = newL0CompactionPolicy(meta) -} - func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo { return &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index bea3b137e2..7bed1f5c9d 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" - "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" ) @@ -41,9 +40,27 @@ const ( TriggerTypeSingle ) +func (t CompactionTriggerType) String() string { + switch t { + case TriggerTypeLevelZeroViewChange: + return "LevelZeroViewChange" + case TriggerTypeLevelZeroViewIDLE: + return "LevelZeroViewIDLE" + case TriggerTypeSegmentSizeViewChange: + return "SegmentSizeViewChange" + case TriggerTypeClustering: + return "Clustering" + case TriggerTypeSingle: + return "Single" + default: + return "" + } +} + type TriggerManager interface { Start() Stop() + OnCollectionUpdate(collectionID int64) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) } @@ -64,10 +81,6 @@ type CompactionTriggerManager struct { handler Handler allocator allocator.Allocator - view *FullViews - // todo handle this lock - viewGuard lock.RWMutex - meta *meta l0Policy *l0CompactionPolicy clusteringPolicy *clusteringCompactionPolicy @@ -82,11 +95,8 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, com allocator: alloc, handler: handler, compactionHandler: compactionHandler, - view: &FullViews{ - collections: make(map[int64][]*SegmentView), - }, - meta: meta, - closeSig: make(chan struct{}), + meta: meta, + closeSig: make(chan struct{}), } m.l0Policy = newL0CompactionPolicy(meta) m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler) @@ -94,6 +104,12 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, com return m } +// OnCollectionUpdate notifies L0Policy about latest collection's L0 segment changes +// This tells the l0 triggers about which collections are active +func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) { + m.l0Policy.OnCollectionUpdate(collectionID) +} + func (m *CompactionTriggerManager) Start() { m.closeWg.Add(1) go m.startLoop() @@ -200,47 +216,26 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) { log := log.Ctx(ctx) + log.Debug("Start to trigger compactions", zap.String("eventType", eventType.String())) for _, view := range views { - switch eventType { - case TriggerTypeLevelZeroViewChange: - log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange") - outView, reason := view.Trigger() - if outView != nil { - log.Info("Success to trigger a LevelZeroCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) - m.SubmitL0ViewToScheduler(ctx, outView) - } - case TriggerTypeLevelZeroViewIDLE: - log.Debug("Start to trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") - outView, reason := view.Trigger() - if outView == nil { - log.Info("Start to force trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") - outView, reason = view.ForceTrigger() - } + outView, reason := view.Trigger() + if outView == nil && eventType == TriggerTypeLevelZeroViewIDLE { + log.Info("Start to force trigger a level zero compaction") + outView, reason = view.ForceTrigger() + } - if outView != nil { - log.Info("Success to trigger a LevelZeroCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) + if outView != nil { + log.Info("Success to trigger a compaction, try to submit", + zap.String("eventType", eventType.String()), + zap.String("reason", reason), + zap.String("output view", outView.String())) + + switch eventType { + case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE: m.SubmitL0ViewToScheduler(ctx, outView) - } - case TriggerTypeClustering: - log.Debug("Start to trigger a clustering compaction by TriggerTypeClustering") - outView, reason := view.Trigger() - if outView != nil { - log.Info("Success to trigger a ClusteringCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) + case TriggerTypeClustering: m.SubmitClusteringViewToScheduler(ctx, outView) - } - case TriggerTypeSingle: - log.Debug("Start to trigger a single compaction by TriggerTypeSingle") - outView, reason := view.Trigger() - if outView != nil { - log.Info("Success to trigger a MixCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) + case TriggerTypeSingle: m.SubmitSingleViewToScheduler(ctx, outView) } } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 8a7d99a424..3bb670aa09 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -76,10 +76,9 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { expectedSegID := seg1.ID s.Require().Equal(1, len(latestL0Segments)) - needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments) - s.True(needRefresh) - s.Require().Equal(1, len(levelZeroView)) - cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) + levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) + s.Require().Equal(1, len(levelZeroViews)) + cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -102,7 +101,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { return nil }).Return(nil).Once() s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() - s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroView) + s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroViews) } func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { @@ -120,10 +119,9 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { latestL0Segments := GetViewsByInfo(levelZeroSegments...) s.Require().NotEmpty(latestL0Segments) - needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments) - s.Require().True(needRefresh) - s.Require().Equal(1, len(levelZeroView)) - cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) + levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) + s.Require().Equal(1, len(levelZeroViews)) + cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -132,8 +130,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything). RunAndReturn(func(task *datapb.CompactionTask) error { s.EqualValues(19530, task.GetTriggerID()) - // s.True(signal.isGlobal) - // s.False(signal.isForce) s.EqualValues(30000, task.GetPos().GetTimestamp()) s.Equal(s.testLabel.CollectionID, task.GetCollectionID()) s.Equal(s.testLabel.PartitionID, task.GetPartitionID()) @@ -145,7 +141,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { return nil }).Return(nil).Once() s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() - s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView) + s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroViews) } func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 000be0f6c0..0f659ecdcc 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -571,6 +571,8 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath if req.GetSegLevel() == datapb.SegmentLevel_L0 { metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths())) + + s.compactionTriggerManager.OnCollectionUpdate(req.GetCollectionID()) return merr.Success(), nil } diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 8633d214a7..1b316ffcee 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -164,7 +164,7 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo } collInfo, err := dependency.handler.GetCollection(ctx, segment.GetCollectionID()) - if err != nil { + if err != nil || collInfo == nil { log.Warn("stats task get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err)) st.SetState(indexpb.JobState_JobStateInit, err.Error())