diff --git a/.gitignore b/.gitignore index b6adfcbdb4..041d808936 100644 --- a/.gitignore +++ b/.gitignore @@ -104,3 +104,4 @@ internal/proto/**/*.pb.go internal/core/src/pb/*.pb.h internal/core/src/pb/*.pb.cc **/legacypb/*.pb.go +pkg/streaming/proto/**/*.pb.go diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 27df4ca29e..c0fe9e66fd 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -1,27 +1,29 @@ package datacoord import ( + "sync" + "time" + "github.com/samber/lo" "go.uber.org/atomic" "go.uber.org/zap" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type l0CompactionPolicy struct { meta *meta - view *FullViews - emptyLoopCount *atomic.Int64 + activeCollections *activeCollections } func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy { return &l0CompactionPolicy{ - meta: meta, - // donot share views with other compaction policy - view: &FullViews{collections: make(map[int64][]*SegmentView)}, - emptyLoopCount: atomic.NewInt64(0), + meta: meta, + activeCollections: newActiveCollections(), } } @@ -29,93 +31,53 @@ func (policy *l0CompactionPolicy) Enable() bool { return Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() } -func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]CompactionView, error) { - // support config hot refresh - events := policy.generateEventForLevelZeroViewChange() - if len(events) != 0 { - // each time when triggers a compaction, the idleTicker would reset - policy.emptyLoopCount.Store(0) - return events, nil - } - policy.emptyLoopCount.Inc() - - if policy.emptyLoopCount.Load() >= 3 { - policy.emptyLoopCount.Store(0) - return policy.generateEventForLevelZeroViewIDLE(), nil - } - - return make(map[CompactionTriggerType][]CompactionView), nil +// Notify policy to record the active updated(when adding a new L0 segment) collections. +func (policy *l0CompactionPolicy) OnCollectionUpdate(collectionID int64) { + policy.activeCollections.Record(collectionID) } -func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) { +func (policy *l0CompactionPolicy) Trigger() (events map[CompactionTriggerType][]CompactionView, err error) { + events = make(map[CompactionTriggerType][]CompactionView) latestCollSegs := policy.meta.GetCompactableSegmentGroupByCollection() - latestCollIDs := lo.Keys(latestCollSegs) - viewCollIDs := lo.Keys(policy.view.collections) - _, diffRemove := lo.Difference(latestCollIDs, viewCollIDs) - for _, collID := range diffRemove { - delete(policy.view.collections, collID) - } + // 1. Get active collections + activeColls := policy.activeCollections.GetActiveCollections() - refreshedL0Views := policy.RefreshLevelZeroViews(latestCollSegs) - if len(refreshedL0Views) > 0 { - events = make(map[CompactionTriggerType][]CompactionView) - events[TriggerTypeLevelZeroViewChange] = refreshedL0Views - } + // 2. Idle collections = all collections - active collections + missCached, idleColls := lo.Difference(activeColls, lo.Keys(latestCollSegs)) + policy.activeCollections.ClearMissCached(missCached...) - return events -} - -func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64][]*SegmentInfo) []CompactionView { - var allRefreshedL0Veiws []CompactionView + idleCollsSet := typeutil.NewUniqueSet(idleColls...) + activeL0Views, idleL0Views := []CompactionView{}, []CompactionView{} for collID, segments := range latestCollSegs { + policy.activeCollections.Read(collID) + levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { return info.GetLevel() == datapb.SegmentLevel_L0 }) - latestL0Segments := GetViewsByInfo(levelZeroSegments...) - needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments) - if needRefresh { - log.Info("Refresh compaction level zero views", - zap.Int64("collectionID", collID), - zap.Strings("views", lo.Map(collRefreshedViews, func(view CompactionView, _ int) string { - return view.String() - }))) - policy.view.collections[collID] = latestL0Segments + if len(levelZeroSegments) == 0 { + continue } - if len(collRefreshedViews) > 0 { - allRefreshedL0Veiws = append(allRefreshedL0Veiws, collRefreshedViews...) + labelViews := policy.groupL0ViewsByPartChan(collID, GetViewsByInfo(levelZeroSegments...)) + if idleCollsSet.Contain(collID) { + idleL0Views = append(idleL0Views, labelViews...) + } else { + activeL0Views = append(activeL0Views, labelViews...) } + + } + if len(activeL0Views) > 0 { + events[TriggerTypeLevelZeroViewChange] = activeL0Views } - return allRefreshedL0Veiws -} - -func (policy *l0CompactionPolicy) getChangedLevelZeroViews(collID UniqueID, LevelZeroViews []*SegmentView) (needRefresh bool, refreshed []CompactionView) { - cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { - return v.Level == datapb.SegmentLevel_L0 - }) - - if len(LevelZeroViews) == 0 && len(cachedViews) != 0 { - needRefresh = true - return - } - - latestViews := policy.groupL0ViewsByPartChan(collID, LevelZeroViews) - for _, latestView := range latestViews { - views := lo.Filter(cachedViews, func(v *SegmentView, _ int) bool { - return v.label.Equal(latestView.GetGroupLabel()) - }) - - if !latestView.Equal(views) { - refreshed = append(refreshed, latestView) - needRefresh = true - } + if len(idleL0Views) > 0 { + events[TriggerTypeLevelZeroViewIDLE] = idleL0Views } return } -func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) map[string]*LevelZeroSegmentsView { +func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, levelZeroSegments []*SegmentView) []CompactionView { partChanView := make(map[string]*LevelZeroSegmentsView) // "part-chan" as key for _, view := range levelZeroSegments { key := view.label.Key() @@ -130,26 +92,71 @@ func (policy *l0CompactionPolicy) groupL0ViewsByPartChan(collectionID UniqueID, } } - return partChanView + return lo.Map(lo.Values(partChanView), func(view *LevelZeroSegmentsView, _ int) CompactionView { + return view + }) } -func (policy *l0CompactionPolicy) generateEventForLevelZeroViewIDLE() map[CompactionTriggerType][]CompactionView { - events := make(map[CompactionTriggerType][]CompactionView, 0) - for collID := range policy.view.collections { - cachedViews := policy.view.GetSegmentViewBy(collID, func(v *SegmentView) bool { - return v.Level == datapb.SegmentLevel_L0 - }) - if len(cachedViews) > 0 { - log.Info("Views idle for a long time, try to trigger a TriggerTypeLevelZeroViewIDLE compaction event") - grouped := policy.groupL0ViewsByPartChan(collID, cachedViews) - events[TriggerTypeLevelZeroViewIDLE] = lo.Map(lo.Values(grouped), - func(l0View *LevelZeroSegmentsView, _ int) CompactionView { - return l0View - }) - log.Info("Generate TriggerTypeLevelZeroViewIDLE compaction event", zap.Int64("collectionID", collID)) - break +type activeCollection struct { + ID int64 + lastRefresh time.Time + readCount *atomic.Int64 +} + +func newActiveCollection(ID int64) *activeCollection { + return &activeCollection{ + ID: ID, + lastRefresh: time.Now(), + readCount: atomic.NewInt64(0), + } +} + +type activeCollections struct { + collections map[int64]*activeCollection + collGuard sync.RWMutex +} + +func newActiveCollections() *activeCollections { + return &activeCollections{ + collections: make(map[int64]*activeCollection), + } +} + +func (ac *activeCollections) ClearMissCached(collectionIDs ...int64) { + ac.collGuard.Lock() + defer ac.collGuard.Unlock() + lo.ForEach(collectionIDs, func(collID int64, _ int) { + delete(ac.collections, collID) + }) +} + +func (ac *activeCollections) Record(collectionID int64) { + ac.collGuard.Lock() + defer ac.collGuard.Unlock() + if _, ok := ac.collections[collectionID]; !ok { + ac.collections[collectionID] = newActiveCollection(collectionID) + } else { + ac.collections[collectionID].lastRefresh = time.Now() + ac.collections[collectionID].readCount.Store(0) + } +} + +func (ac *activeCollections) Read(collectionID int64) { + ac.collGuard.Lock() + defer ac.collGuard.Unlock() + if _, ok := ac.collections[collectionID]; ok { + ac.collections[collectionID].readCount.Inc() + if ac.collections[collectionID].readCount.Load() >= 3 && + time.Since(ac.collections[collectionID].lastRefresh) > 3*paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.GetAsDuration(time.Second) { + log.Info("Active(of deletions) collections become idle", zap.Int64("collectionID", collectionID)) + delete(ac.collections, collectionID) } } - - return events +} + +func (ac *activeCollections) GetActiveCollections() []int64 { + ac.collGuard.RLock() + defer ac.collGuard.RUnlock() + + return lo.Keys(ac.collections) } diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index 760ee5771a..a10eebcc76 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -17,6 +17,7 @@ package datacoord import ( "testing" + "time" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -26,6 +27,7 @@ import ( "github.com/milvus-io/milvus/internal/allocator" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestL0CompactionPolicySuite(t *testing.T) { @@ -44,14 +46,63 @@ type L0CompactionPolicySuite struct { l0_policy *l0CompactionPolicy } +func (s *L0CompactionPolicySuite) SetupTest() { + s.testLabel = &CompactionGroupLabel{ + CollectionID: 1, + PartitionID: 10, + Channel: "ch-1", + } + + segments := genSegmentsForMeta(s.testLabel) + meta := &meta{segments: NewSegmentsInfo()} + for id, segment := range segments { + meta.segments.SetSegment(id, segment) + } + + s.l0_policy = newL0CompactionPolicy(meta) +} + const MB = 1024 * 1024 -func (s *L0CompactionPolicySuite) TestTrigger() { - s.Require().Empty(s.l0_policy.view.collections) +func (s *L0CompactionPolicySuite) TestActiveToIdle() { + paramtable.Get().Save(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) + + s.l0_policy.OnCollectionUpdate(1) + s.Require().EqualValues(1, s.l0_policy.activeCollections.GetActiveCollections()[0]) + + <-time.After(3 * time.Second) + + for range 3 { + gotViews, err := s.l0_policy.Trigger() + s.NoError(err) + s.NotNil(gotViews) + s.NotEmpty(gotViews) + _, ok := gotViews[TriggerTypeLevelZeroViewChange] + s.True(ok) + } + + s.Empty(s.l0_policy.activeCollections.GetActiveCollections()) + gotViews, err := s.l0_policy.Trigger() + s.NoError(err) + s.NotNil(gotViews) + s.NotEmpty(gotViews) + _, ok := gotViews[TriggerTypeLevelZeroViewIDLE] + s.True(ok) +} + +func (s *L0CompactionPolicySuite) TestTriggerIdle() { + s.Require().Empty(s.l0_policy.activeCollections.GetActiveCollections()) events, err := s.l0_policy.Trigger() s.NoError(err) + s.NotEmpty(events) + gotViews, ok := events[TriggerTypeLevelZeroViewChange] + s.False(ok) + s.Empty(gotViews) + + gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] s.True(ok) s.NotNil(gotViews) s.Equal(1, len(gotViews)) @@ -63,31 +114,9 @@ func (s *L0CompactionPolicySuite) TestTrigger() { s.Equal(datapb.SegmentLevel_L0, view.Level) } log.Info("cView", zap.String("string", cView.String())) +} - // Test for idle trigger - for i := 0; i < 2; i++ { - events, err = s.l0_policy.Trigger() - s.NoError(err) - s.Equal(0, len(events)) - } - s.EqualValues(2, s.l0_policy.emptyLoopCount.Load()) - - events, err = s.l0_policy.Trigger() - s.NoError(err) - s.EqualValues(0, s.l0_policy.emptyLoopCount.Load()) - s.Equal(1, len(events)) - gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] - s.True(ok) - s.NotNil(gotViews) - s.Equal(1, len(gotViews)) - cView = gotViews[0] - s.Equal(s.testLabel, cView.GetGroupLabel()) - s.Equal(4, len(cView.GetSegmentsView())) - for _, view := range cView.GetSegmentsView() { - s.Equal(datapb.SegmentLevel_L0, view.Level) - } - log.Info("cView", zap.String("string", cView.String())) - +func (s *L0CompactionPolicySuite) TestTriggerViewChange() { segArgs := []struct { ID UniqueID PosT Timestamp @@ -114,34 +143,17 @@ func (s *L0CompactionPolicySuite) TestTrigger() { } s.l0_policy.meta = meta - events, err = s.l0_policy.Trigger() + s.l0_policy.OnCollectionUpdate(s.testLabel.CollectionID) + events, err := s.l0_policy.Trigger() s.NoError(err) - gotViews, ok = events[TriggerTypeLevelZeroViewChange] - s.True(ok) - s.Equal(1, len(gotViews)) -} - -func (s *L0CompactionPolicySuite) TestGenerateEventForLevelZeroViewChange() { - s.Require().Empty(s.l0_policy.view.collections) - - events := s.l0_policy.generateEventForLevelZeroViewChange() - s.NotEmpty(events) - s.NotEmpty(s.l0_policy.view.collections) - + s.Equal(1, len(events)) gotViews, ok := events[TriggerTypeLevelZeroViewChange] s.True(ok) - s.NotNil(gotViews) s.Equal(1, len(gotViews)) - storedViews, ok := s.l0_policy.view.collections[s.testLabel.CollectionID] - s.True(ok) - s.NotNil(storedViews) - s.Equal(4, len(storedViews)) - - for _, view := range storedViews { - s.Equal(s.testLabel, view.label) - s.Equal(datapb.SegmentLevel_L0, view.Level) - } + gotViews, ok = events[TriggerTypeLevelZeroViewIDLE] + s.False(ok) + s.Empty(gotViews) } func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { @@ -185,22 +197,6 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { return segments } -func (s *L0CompactionPolicySuite) SetupTest() { - s.testLabel = &CompactionGroupLabel{ - CollectionID: 1, - PartitionID: 10, - Channel: "ch-1", - } - - segments := genSegmentsForMeta(s.testLabel) - meta := &meta{segments: NewSegmentsInfo()} - for id, segment := range segments { - meta.segments.SetSegment(id, segment) - } - - s.l0_policy = newL0CompactionPolicy(meta) -} - func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo { return &SegmentInfo{ SegmentInfo: &datapb.SegmentInfo{ diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index bea3b137e2..7bed1f5c9d 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -27,7 +27,6 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/proto/datapb" - "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/logutil" ) @@ -41,9 +40,27 @@ const ( TriggerTypeSingle ) +func (t CompactionTriggerType) String() string { + switch t { + case TriggerTypeLevelZeroViewChange: + return "LevelZeroViewChange" + case TriggerTypeLevelZeroViewIDLE: + return "LevelZeroViewIDLE" + case TriggerTypeSegmentSizeViewChange: + return "SegmentSizeViewChange" + case TriggerTypeClustering: + return "Clustering" + case TriggerTypeSingle: + return "Single" + default: + return "" + } +} + type TriggerManager interface { Start() Stop() + OnCollectionUpdate(collectionID int64) ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error) } @@ -64,10 +81,6 @@ type CompactionTriggerManager struct { handler Handler allocator allocator.Allocator - view *FullViews - // todo handle this lock - viewGuard lock.RWMutex - meta *meta l0Policy *l0CompactionPolicy clusteringPolicy *clusteringCompactionPolicy @@ -82,11 +95,8 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, com allocator: alloc, handler: handler, compactionHandler: compactionHandler, - view: &FullViews{ - collections: make(map[int64][]*SegmentView), - }, - meta: meta, - closeSig: make(chan struct{}), + meta: meta, + closeSig: make(chan struct{}), } m.l0Policy = newL0CompactionPolicy(meta) m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.allocator, m.handler) @@ -94,6 +104,12 @@ func NewCompactionTriggerManager(alloc allocator.Allocator, handler Handler, com return m } +// OnCollectionUpdate notifies L0Policy about latest collection's L0 segment changes +// This tells the l0 triggers about which collections are active +func (m *CompactionTriggerManager) OnCollectionUpdate(collectionID int64) { + m.l0Policy.OnCollectionUpdate(collectionID) +} + func (m *CompactionTriggerManager) Start() { m.closeWg.Add(1) go m.startLoop() @@ -200,47 +216,26 @@ func (m *CompactionTriggerManager) ManualTrigger(ctx context.Context, collection func (m *CompactionTriggerManager) notify(ctx context.Context, eventType CompactionTriggerType, views []CompactionView) { log := log.Ctx(ctx) + log.Debug("Start to trigger compactions", zap.String("eventType", eventType.String())) for _, view := range views { - switch eventType { - case TriggerTypeLevelZeroViewChange: - log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewChange") - outView, reason := view.Trigger() - if outView != nil { - log.Info("Success to trigger a LevelZeroCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) - m.SubmitL0ViewToScheduler(ctx, outView) - } - case TriggerTypeLevelZeroViewIDLE: - log.Debug("Start to trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") - outView, reason := view.Trigger() - if outView == nil { - log.Info("Start to force trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") - outView, reason = view.ForceTrigger() - } + outView, reason := view.Trigger() + if outView == nil && eventType == TriggerTypeLevelZeroViewIDLE { + log.Info("Start to force trigger a level zero compaction") + outView, reason = view.ForceTrigger() + } - if outView != nil { - log.Info("Success to trigger a LevelZeroCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) + if outView != nil { + log.Info("Success to trigger a compaction, try to submit", + zap.String("eventType", eventType.String()), + zap.String("reason", reason), + zap.String("output view", outView.String())) + + switch eventType { + case TriggerTypeLevelZeroViewChange, TriggerTypeLevelZeroViewIDLE: m.SubmitL0ViewToScheduler(ctx, outView) - } - case TriggerTypeClustering: - log.Debug("Start to trigger a clustering compaction by TriggerTypeClustering") - outView, reason := view.Trigger() - if outView != nil { - log.Info("Success to trigger a ClusteringCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) + case TriggerTypeClustering: m.SubmitClusteringViewToScheduler(ctx, outView) - } - case TriggerTypeSingle: - log.Debug("Start to trigger a single compaction by TriggerTypeSingle") - outView, reason := view.Trigger() - if outView != nil { - log.Info("Success to trigger a MixCompaction output view, try to submit", - zap.String("reason", reason), - zap.String("output view", outView.String())) + case TriggerTypeSingle: m.SubmitSingleViewToScheduler(ctx, outView) } } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 8a7d99a424..3bb670aa09 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -76,10 +76,9 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { expectedSegID := seg1.ID s.Require().Equal(1, len(latestL0Segments)) - needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments) - s.True(needRefresh) - s.Require().Equal(1, len(levelZeroView)) - cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) + levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) + s.Require().Equal(1, len(levelZeroViews)) + cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -102,7 +101,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { return nil }).Return(nil).Once() s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() - s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroView) + s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewIDLE, levelZeroViews) } func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { @@ -120,10 +119,9 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { latestL0Segments := GetViewsByInfo(levelZeroSegments...) s.Require().NotEmpty(latestL0Segments) - needRefresh, levelZeroView := s.triggerManager.l0Policy.getChangedLevelZeroViews(1, latestL0Segments) - s.Require().True(needRefresh) - s.Require().Equal(1, len(levelZeroView)) - cView, ok := levelZeroView[0].(*LevelZeroSegmentsView) + levelZeroViews := s.triggerManager.l0Policy.groupL0ViewsByPartChan(1, latestL0Segments) + s.Require().Equal(1, len(levelZeroViews)) + cView, ok := levelZeroViews[0].(*LevelZeroSegmentsView) s.True(ok) s.NotNil(cView) log.Info("view", zap.Any("cView", cView)) @@ -132,8 +130,6 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.mockPlanContext.EXPECT().enqueueCompaction(mock.Anything). RunAndReturn(func(task *datapb.CompactionTask) error { s.EqualValues(19530, task.GetTriggerID()) - // s.True(signal.isGlobal) - // s.False(signal.isForce) s.EqualValues(30000, task.GetPos().GetTimestamp()) s.Equal(s.testLabel.CollectionID, task.GetCollectionID()) s.Equal(s.testLabel.PartitionID, task.GetPartitionID()) @@ -145,7 +141,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { return nil }).Return(nil).Once() s.mockAlloc.EXPECT().AllocID(mock.Anything).Return(19530, nil).Maybe() - s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroView) + s.triggerManager.notify(context.Background(), TriggerTypeLevelZeroViewChange, levelZeroViews) } func (s *CompactionTriggerManagerSuite) TestGetExpectedSegmentSize() { diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 000be0f6c0..0f659ecdcc 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -571,6 +571,8 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath if req.GetSegLevel() == datapb.SegmentLevel_L0 { metrics.DataCoordSizeStoredL0Segment.WithLabelValues(fmt.Sprint(req.GetCollectionID())).Observe(calculateL0SegmentSize(req.GetField2StatslogPaths())) + + s.compactionTriggerManager.OnCollectionUpdate(req.GetCollectionID()) return merr.Success(), nil } diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 8633d214a7..1b316ffcee 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -164,7 +164,7 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo } collInfo, err := dependency.handler.GetCollection(ctx, segment.GetCollectionID()) - if err != nil { + if err != nil || collInfo == nil { log.Warn("stats task get collection info failed", zap.Int64("collectionID", segment.GetCollectionID()), zap.Error(err)) st.SetState(indexpb.JobState_JobStateInit, err.Error())