From d7a3697fb58734ed99c4adafcc2e093776253b8a Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Thu, 11 Jul 2024 17:45:37 +0800 Subject: [PATCH] enhance: Add back compactionTaskNum metrics (#34583) Fix L0 compaction task recover unable to set segment not isCompacting See also: #34460 Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 111 ++++++++++-------- internal/datacoord/compaction_policy_l0.go | 18 ++- .../datacoord/compaction_policy_l0_test.go | 6 +- .../datacoord/compaction_task_clustering.go | 6 +- internal/datacoord/compaction_task_l0.go | 20 ++-- internal/datacoord/compaction_task_mix.go | 32 +++-- internal/datacoord/compaction_trigger_v2.go | 20 ++-- internal/datacoord/meta.go | 4 +- internal/metastore/kv/datacoord/kv_catalog.go | 5 +- 9 files changed, 114 insertions(+), 108 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 3991b8b2f2..c4ae60b117 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" @@ -70,10 +71,10 @@ type compactionInfo struct { } type compactionPlanHandler struct { - mu lock.RWMutex + queueGuard lock.RWMutex queueTasks map[int64]CompactionTask // planID -> task - executingMu lock.RWMutex + executingGuard lock.RWMutex executingTasks map[int64]CompactionTask // planID -> task meta CompactionMeta @@ -157,21 +158,21 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo { func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int { cnt := 0 - c.mu.RLock() + c.queueGuard.RLock() for _, t := range c.queueTasks { if t.GetTriggerID() == triggerID { cnt += 1 } // if t.GetPlanID() } - c.mu.RUnlock() - c.executingMu.RLock() + c.queueGuard.RUnlock() + c.executingGuard.RLock() for _, t := range c.executingTasks { if t.GetTriggerID() == triggerID { cnt += 1 } } - c.executingMu.RUnlock() + c.executingGuard.RUnlock() return cnt } @@ -193,12 +194,12 @@ func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm Chann } func (c *compactionPlanHandler) schedule() []CompactionTask { - c.mu.RLock() + c.queueGuard.RLock() if len(c.queueTasks) == 0 { - c.mu.RUnlock() + c.queueGuard.RUnlock() return nil } - c.mu.RUnlock() + c.queueGuard.RUnlock() l0ChannelExcludes := typeutil.NewSet[string]() mixChannelExcludes := typeutil.NewSet[string]() @@ -206,7 +207,7 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { mixLabelExcludes := typeutil.NewSet[string]() clusterLabelExcludes := typeutil.NewSet[string]() - c.executingMu.RLock() + c.executingGuard.RLock() for _, t := range c.executingTasks { switch t.GetType() { case datapb.CompactionType_Level0DeleteCompaction: @@ -219,11 +220,11 @@ func (c *compactionPlanHandler) schedule() []CompactionTask { clusterLabelExcludes.Insert(t.GetLabel()) } } - c.executingMu.RUnlock() + c.executingGuard.RUnlock() var picked []CompactionTask - c.mu.RLock() - defer c.mu.RUnlock() + c.queueGuard.RLock() + defer c.queueGuard.RUnlock() keys := lo.Keys(c.queueTasks) sort.SliceStable(keys, func(i, j int) bool { return keys[i] < keys[j] @@ -268,8 +269,8 @@ func (c *compactionPlanHandler) start() { } func (c *compactionPlanHandler) loadMeta() { - // todo: make it compatible to all types of compaction with persist meta - triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks() + // TODO: make it compatible to all types of compaction with persist meta + triggers := c.meta.GetCompactionTasks() for _, tasks := range triggers { for _, task := range tasks { state := task.GetState() @@ -278,14 +279,19 @@ func (c *compactionPlanHandler) loadMeta() { state == datapb.CompactionTaskState_unknown { log.Info("compactionPlanHandler loadMeta abandon compactionTask", zap.Int64("planID", task.GetPlanID()), - zap.String("State", task.GetState().String())) + zap.String("type", task.GetType().String()), + zap.String("state", task.GetState().String())) continue } else { + // TODO: how to deal with the create failed tasks, leave it in meta forever? t, err := c.createCompactTask(task) if err != nil { log.Warn("compactionPlanHandler loadMeta create compactionTask failed", zap.Int64("planID", task.GetPlanID()), - zap.String("State", task.GetState().String())) + zap.String("type", task.GetType().String()), + zap.String("state", task.GetState().String()), + zap.Error(err), + ) continue } if t.NeedReAssignNodeID() { @@ -294,6 +300,7 @@ func (c *compactionPlanHandler) loadMeta() { zap.Int64("planID", t.GetPlanID()), zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("collectionID", t.GetCollectionID()), + zap.String("type", task.GetType().String()), zap.String("state", t.GetState().String())) } else { c.restoreTask(t) @@ -301,6 +308,7 @@ func (c *compactionPlanHandler) loadMeta() { zap.Int64("planID", t.GetPlanID()), zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("collectionID", t.GetCollectionID()), + zap.String("type", task.GetType().String()), zap.String("state", t.GetState().String())) } } @@ -311,17 +319,20 @@ func (c *compactionPlanHandler) loadMeta() { func (c *compactionPlanHandler) doSchedule() { picked := c.schedule() if len(picked) > 0 { - c.executingMu.Lock() + c.executingGuard.Lock() for _, t := range picked { c.executingTasks[t.GetPlanID()] = t } - c.executingMu.Unlock() + c.executingGuard.Unlock() - c.mu.Lock() + c.queueGuard.Lock() for _, t := range picked { delete(c.queueTasks, t.GetPlanID()) + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc() } - c.mu.Unlock() + c.queueGuard.Unlock() + } } @@ -394,7 +405,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() { // try best to delete meta err := c.meta.DropCompactionTask(task) if err != nil { - log.Warn("fail to drop task", zap.Int64("taskPlanID", task.PlanID), zap.Error(err)) + log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err)) } } } @@ -460,7 +471,7 @@ func (c *compactionPlanHandler) stop() { } func (c *compactionPlanHandler) removeTasksByChannel(channel string) { - c.mu.Lock() + c.queueGuard.Lock() for id, task := range c.queueTasks { log.Info("Compaction handler removing tasks by channel", zap.String("channel", channel), zap.Any("id", id), zap.Any("task_channel", task.GetChannel())) @@ -472,13 +483,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) { ) delete(c.queueTasks, id) c.taskNumber.Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Pending).Dec() } } - c.mu.Unlock() - c.executingMu.Lock() + c.queueGuard.Unlock() + c.executingGuard.Lock() for id, task := range c.executingTasks { log.Info("Compaction handler removing tasks by channel", - zap.String("channel", channel), zap.Any("id", id), zap.Any("task_channel", task.GetChannel())) + zap.String("channel", channel), zap.Int64("planID", id), zap.Any("task_channel", task.GetChannel())) if task.GetChannel() == channel { log.Info("Compaction handler removing tasks by channel", zap.String("channel", channel), @@ -487,46 +499,49 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) { ) delete(c.executingTasks, id) c.taskNumber.Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Executing).Dec() } } - c.executingMu.Unlock() + c.executingGuard.Unlock() } func (c *compactionPlanHandler) submitTask(t CompactionTask) { _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType())) t.SetSpan(span) - c.mu.Lock() + c.queueGuard.Lock() c.queueTasks[t.GetPlanID()] = t - c.mu.Unlock() + c.queueGuard.Unlock() c.taskNumber.Add(1) + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc() } // restoreTask used to restore Task from etcd func (c *compactionPlanHandler) restoreTask(t CompactionTask) { _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType())) t.SetSpan(span) - c.executingMu.Lock() + c.executingGuard.Lock() c.executingTasks[t.GetPlanID()] = t - c.executingMu.Unlock() + c.executingGuard.Unlock() c.taskNumber.Add(1) + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc() } // getCompactionTask return compaction func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask { - c.mu.RLock() + c.queueGuard.RLock() t, ok := c.queueTasks[planID] if ok { - c.mu.RUnlock() + c.queueGuard.RUnlock() return t } - c.mu.RUnlock() - c.executingMu.RLock() + c.queueGuard.RUnlock() + c.executingGuard.RLock() t, ok = c.executingTasks[planID] if ok { - c.executingMu.RUnlock() + c.executingGuard.RUnlock() return t } - c.executingMu.RUnlock() + c.executingGuard.RUnlock() return t } @@ -604,6 +619,8 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { } else { log.Info("compactionHandler assignNodeID success", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID)) + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc() } } } @@ -613,34 +630,36 @@ func (c *compactionPlanHandler) checkCompaction() error { // for DC might add new task while GetCompactionState. var needAssignIDTasks []CompactionTask - c.executingMu.RLock() + c.executingGuard.RLock() for _, t := range c.executingTasks { if t.NeedReAssignNodeID() { needAssignIDTasks = append(needAssignIDTasks, t) } } - c.executingMu.RUnlock() + c.executingGuard.RUnlock() if len(needAssignIDTasks) > 0 { c.assignNodeIDs(needAssignIDTasks) } var finishedTasks []CompactionTask - c.executingMu.RLock() + c.executingGuard.RLock() for _, t := range c.executingTasks { finished := t.Process() if finished { finishedTasks = append(finishedTasks, t) } } - c.executingMu.RUnlock() + c.executingGuard.RUnlock() // delete all finished - c.executingMu.Lock() + c.executingGuard.Lock() for _, t := range finishedTasks { delete(c.executingTasks, t.GetPlanID()) + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc() } - c.executingMu.Unlock() - c.taskNumber.Add(-int32(len(finishedTasks))) + c.executingGuard.Unlock() + c.taskNumber.Sub(int32(len(finishedTasks))) return nil } @@ -681,8 +700,8 @@ func (c *compactionPlanHandler) getTaskCount() int { } func (c *compactionPlanHandler) getTasksByState(state datapb.CompactionTaskState) []CompactionTask { - c.mu.RLock() - defer c.mu.RUnlock() + c.queueGuard.RLock() + defer c.queueGuard.RUnlock() tasks := make([]CompactionTask, 0, len(c.queueTasks)) for _, t := range c.queueTasks { if t.GetState() == state { diff --git a/internal/datacoord/compaction_policy_l0.go b/internal/datacoord/compaction_policy_l0.go index 1f15a40386..353e520da5 100644 --- a/internal/datacoord/compaction_policy_l0.go +++ b/internal/datacoord/compaction_policy_l0.go @@ -16,10 +16,11 @@ type l0CompactionPolicy struct { emptyLoopCount *atomic.Int64 } -func newL0CompactionPolicy(meta *meta, view *FullViews) *l0CompactionPolicy { +func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy { return &l0CompactionPolicy{ - meta: meta, - view: view, + meta: meta, + // donot share views with other compaction policy + view: &FullViews{collections: make(map[int64][]*SegmentView)}, emptyLoopCount: atomic.NewInt64(0), } } @@ -39,13 +40,11 @@ func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]Compact policy.emptyLoopCount.Inc() if policy.emptyLoopCount.Load() >= 3 { - idleEvents := policy.generateEventForLevelZeroViewIDLE() - if len(idleEvents) > 0 { - policy.emptyLoopCount.Store(0) - } - return idleEvents, nil + policy.emptyLoopCount.Store(0) + return policy.generateEventForLevelZeroViewIDLE(), nil } - return make(map[CompactionTriggerType][]CompactionView, 0), nil + + return make(map[CompactionTriggerType][]CompactionView), nil } func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) { @@ -73,7 +72,6 @@ func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64 levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool { return info.GetLevel() == datapb.SegmentLevel_L0 }) - latestL0Segments := GetViewsByInfo(levelZeroSegments...) needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments) if needRefresh { diff --git a/internal/datacoord/compaction_policy_l0_test.go b/internal/datacoord/compaction_policy_l0_test.go index 29348added..2a3315183a 100644 --- a/internal/datacoord/compaction_policy_l0_test.go +++ b/internal/datacoord/compaction_policy_l0_test.go @@ -193,11 +193,7 @@ func (s *L0CompactionPolicySuite) SetupTest() { meta.segments.SetSegment(id, segment) } - views := &FullViews{ - collections: make(map[int64][]*SegmentView), - } - - s.l0_policy = newL0CompactionPolicy(meta, views) + s.l0_policy = newL0CompactionPolicy(meta) } func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo { diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index e5e6505bc4..25966c03bc 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -333,11 +333,7 @@ func (t *clusteringCompactionTask) processAnalyzing() error { } func (t *clusteringCompactionTask) resetSegmentCompacting() { - var segmentIDs []UniqueID - for _, binLogs := range t.GetPlan().GetSegmentBinlogs() { - segmentIDs = append(segmentIDs, binLogs.GetSegmentID()) - } - t.meta.SetSegmentsCompacting(segmentIDs, false) + t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } func (t *clusteringCompactionTask) processFailedOrTimeout() error { diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 87b185db0e..bab3618b2b 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -287,31 +287,28 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err func (t *l0CompactionTask) processMetaSaved() bool { err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) - if err == nil { - return t.processCompleted() + if err != nil { + log.Warn("l0CompactionTask unable to processMetaSaved", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) + return false } - return false + return t.processCompleted() } func (t *l0CompactionTask) processCompleted() bool { if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { - return false + log.Warn("l0CompactionTask unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) } t.resetSegmentCompacting() UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) - log.Info("handleCompactionResult: success to handle l0 compaction result") + log.Info("l0CompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID())) return true } func (t *l0CompactionTask) resetSegmentCompacting() { - var segmentIDs []UniqueID - for _, binLogs := range t.GetPlan().GetSegmentBinlogs() { - segmentIDs = append(segmentIDs, binLogs.GetSegmentID()) - } - t.meta.SetSegmentsCompacting(segmentIDs, false) + t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } func (t *l0CompactionTask) processTimeout() bool { @@ -323,10 +320,11 @@ func (t *l0CompactionTask) processFailed() bool { if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), }); err != nil { - return false + log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) } t.resetSegmentCompacting() + log.Info("l0CompactionTask processFailed done", zap.Int64("planID", t.GetPlanID())) return true } diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index ef796cb1fa..d28c119442 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -173,24 +173,21 @@ func (t *mixCompactionTask) NeedReAssignNodeID() bool { } func (t *mixCompactionTask) processCompleted() bool { - err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), - }) - if err == nil { - t.resetSegmentCompacting() - UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) - log.Info("handleCompactionResult: success to handle merge compaction result") + }); err != nil { + log.Warn("mixCompactionTask processCompleted unable to drop compaction plan", zap.Int64("planID", t.GetPlanID())) } - return err == nil + t.resetSegmentCompacting() + UpdateCompactionSegmentSizeMetrics(t.result.GetSegments()) + log.Info("mixCompactionTask processCompleted done", zap.Int64("planID", t.GetPlanID())) + + return true } func (t *mixCompactionTask) resetSegmentCompacting() { - var segmentIDs []UniqueID - for _, binLogs := range t.GetPlan().GetSegmentBinlogs() { - segmentIDs = append(segmentIDs, binLogs.GetSegmentID()) - } - t.meta.SetSegmentsCompacting(segmentIDs, false) + t.meta.SetSegmentsCompacting(t.GetInputSegments(), false) } func (t *mixCompactionTask) processTimeout() bool { @@ -227,14 +224,15 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa } func (t *mixCompactionTask) processFailed() bool { - err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ + if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{ PlanID: t.GetPlanID(), - }) - if err == nil { - t.resetSegmentCompacting() + }); err != nil { + log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err)) } - return err == nil + log.Info("mixCompactionTask processFailed done", zap.Int64("planID", t.GetPlanID())) + t.resetSegmentCompacting() + return true } func (t *mixCompactionTask) checkTimeout() bool { diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 915beb8e8a..50147a7636 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -85,7 +85,7 @@ func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHan meta: meta, closeSig: make(chan struct{}), } - m.l0Policy = newL0CompactionPolicy(meta, m.view) + m.l0Policy = newL0CompactionPolicy(meta) m.clusteringPolicy = newClusteringCompactionPolicy(meta, m.view, m.allocator, m.compactionHandler, m.handler) return m } @@ -112,7 +112,7 @@ func (m *CompactionTriggerManager) startLoop() { for { select { case <-m.closeSig: - log.Info("Compaction View checkLoop quit") + log.Info("Compaction trigger manager checkLoop quit") return case <-l0Ticker.C: if !m.l0Policy.Enable() { @@ -138,7 +138,7 @@ func (m *CompactionTriggerManager) startLoop() { continue } if m.compactionHandler.isFull() { - log.RatedInfo(10, "Skip trigger l0 compaction since compactionHandler is full") + log.RatedInfo(10, "Skip trigger clustering compaction since compactionHandler is full") continue } events, err := m.clusteringPolicy.Trigger() @@ -214,7 +214,7 @@ func (m *CompactionTriggerManager) notify(ctx context.Context, eventType Compact func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, view CompactionView) { taskID, err := m.allocator.allocID(ctx) if err != nil { - log.Warn("fail to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String())) return } @@ -224,7 +224,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) if err != nil { - log.Warn("fail to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) return } @@ -245,7 +245,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, err = m.compactionHandler.enqueueCompaction(task) if err != nil { - log.Warn("failed to execute compaction task", + log.Warn("Failed to execute compaction task", zap.Int64("collection", task.CollectionID), zap.Int64("planID", task.GetPlanID()), zap.Int64s("segmentIDs", task.GetInputSegments()), @@ -253,6 +253,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, } log.Info("Finish to submit a LevelZeroCompaction plan", zap.Int64("taskID", taskID), + zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), ) } @@ -260,13 +261,13 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(ctx context.Context, func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.Context, view CompactionView) { taskID, _, err := m.allocator.allocN(2) if err != nil { - log.Warn("fail to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because allocate id fail", zap.String("view", view.String())) return } view.GetSegmentsView() collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) if err != nil { - log.Warn("fail to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) + log.Warn("Failed to submit compaction view to scheduler because get collection fail", zap.String("view", view.String())) return } _, totalRows, maxSegmentRows, preferSegmentRows := calculateClusteringCompactionConfig(view) @@ -292,7 +293,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C } err = m.compactionHandler.enqueueCompaction(task) if err != nil { - log.Warn("failed to execute compaction task", + log.Warn("Failed to execute compaction task", zap.Int64("collection", task.CollectionID), zap.Int64("planID", task.GetPlanID()), zap.Int64s("segmentIDs", task.GetInputSegments()), @@ -300,6 +301,7 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C } log.Info("Finish to submit a clustering compaction task", zap.Int64("taskID", taskID), + zap.Int64("planID", task.GetPlanID()), zap.String("type", task.GetType().String()), ) } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 7eda80208b..e52108598f 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1742,13 +1742,13 @@ func (m *meta) GetCompactableSegmentGroupByCollection() map[int64][]*SegmentInfo func (m *meta) GetEarliestStartPositionOfGrowingSegments(label *CompactionGroupLabel) *msgpb.MsgPosition { segments := m.SelectSegments(WithCollection(label.CollectionID), SegmentFilterFunc(func(segment *SegmentInfo) bool { return segment.GetState() == commonpb.SegmentState_Growing && - segment.GetPartitionID() == label.PartitionID && + (label.PartitionID == common.AllPartitionsID || segment.GetPartitionID() == label.PartitionID) && segment.GetInsertChannel() == label.Channel })) earliest := &msgpb.MsgPosition{Timestamp: math.MaxUint64} for _, seg := range segments { - if earliest == nil || earliest.GetTimestamp() > seg.GetStartPosition().GetTimestamp() { + if earliest.GetTimestamp() == math.MaxUint64 || earliest.GetTimestamp() > seg.GetStartPosition().GetTimestamp() { earliest = seg.GetStartPosition() } } diff --git a/internal/metastore/kv/datacoord/kv_catalog.go b/internal/metastore/kv/datacoord/kv_catalog.go index ef4b6ff246..8904424fd8 100644 --- a/internal/metastore/kv/datacoord/kv_catalog.go +++ b/internal/metastore/kv/datacoord/kv_catalog.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/segmentutil" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -775,13 +776,11 @@ func (kc *Catalog) DropImportTask(taskID int64) error { return kc.MetaKv.Remove(key) } -const allPartitionID = -1 - // GcConfirm returns true if related collection/partition is not found. // DataCoord will remove all the meta eventually after GC is finished. func (kc *Catalog) GcConfirm(ctx context.Context, collectionID, partitionID typeutil.UniqueID) bool { prefix := buildCollectionPrefix(collectionID) - if partitionID != allPartitionID { + if partitionID != common.AllPartitionsID { prefix = buildPartitionPrefix(collectionID, partitionID) } keys, values, err := kc.MetaKv.LoadWithPrefix(prefix)