diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index fce3e807a2..bbf97d22ef 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -722,7 +722,7 @@ func (c *compactionPlanHandler) checkCompaction() error { if id == NullNodeID { break } - metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Dec() metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetTaskProto().GetNodeID()), t.GetTaskProto().GetType().String(), metrics.Executing).Inc() } } diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index b44415304f..c3ccbca89f 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -705,12 +705,18 @@ func (t *clusteringCompactionTask) doCompact() error { } err = t.sessions.Compaction(context.Background(), t.GetTaskProto().GetNodeID(), t.GetPlan()) if err != nil { - if errors.Is(err, merr.ErrDataNodeSlotExhausted) { - log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted") - return t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) + originNodeID := t.GetTaskProto().GetNodeID() + log.Warn("Failed to notify compaction tasks to DataNode", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), + zap.Int64("nodeID", originNodeID), + zap.Error(err)) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("updateAndSaveTaskMeta fail", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + return err } - log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) - return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc() } return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing)) } diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index c63eccc904..834ac80ab6 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -33,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -111,8 +112,18 @@ func (t *l0CompactionTask) processPipelining() bool { err = t.sessions.Compaction(context.TODO(), t.GetTaskProto().GetNodeID(), t.GetPlan()) if err != nil { - log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) - t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + originNodeID := t.GetTaskProto().GetNodeID() + log.Warn("l0CompactionTask failed to notify compaction tasks to DataNode", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), + zap.Int64("nodeID", originNodeID), + zap.Error(err)) + err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) + if err != nil { + log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + return false + } + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc() return false } diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 71958f69b8..e80d3e6d41 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/session" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -78,11 +79,17 @@ func (t *mixCompactionTask) processPipelining() bool { // Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset // to enable a retry in compaction.checkCompaction(). // This is tricky, we should remove the reassignment here. - log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetTaskProto().GetPlanID()), zap.Error(err)) + originNodeID := t.GetTaskProto().GetNodeID() + log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", + zap.Int64("planID", t.GetTaskProto().GetPlanID()), + zap.Int64("nodeID", originNodeID), + zap.Error(err)) err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) if err != nil { log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err)) } + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", originNodeID), t.GetTaskProto().GetType().String(), metrics.Executing).Dec() + metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetTaskProto().GetType().String(), metrics.Pending).Inc() return false } log.Info("mixCompactionTask notify compaction tasks to DataNode")