diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index d89f8fbf84..320ef1dfbd 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -43,6 +43,12 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +var maxCompactionTaskExecutionDuration = map[datapb.CompactionType]time.Duration{ + datapb.CompactionType_MixCompaction: 30 * time.Minute, + datapb.CompactionType_Level0DeleteCompaction: 30 * time.Minute, + datapb.CompactionType_ClusteringCompaction: 60 * time.Minute, +} + type compactionPlanContext interface { start() stop() @@ -658,6 +664,7 @@ func (c *compactionPlanHandler) checkCompaction() error { var finishedTasks []CompactionTask c.executingGuard.RLock() for _, t := range c.executingTasks { + c.checkDelay(t) finished := t.Process() if finished { finishedTasks = append(finishedTasks, t) @@ -734,6 +741,23 @@ func (c *compactionPlanHandler) getTasksByState(state datapb.CompactionTaskState return tasks } +func (c *compactionPlanHandler) checkDelay(t CompactionTask) { + log := log.Ctx(context.TODO()).WithRateGroup("compactionPlanHandler.checkDelay", 1.0, 60.0) + maxExecDuration := maxCompactionTaskExecutionDuration[t.GetType()] + startTime := time.Unix(t.GetStartTime(), 0) + execDuration := time.Since(startTime) + if execDuration >= maxExecDuration { + log.RatedWarn(60, "compaction task is delay", + zap.Int64("planID", t.GetPlanID()), + zap.String("type", t.GetType().String()), + zap.String("state", t.GetState().String()), + zap.String("vchannel", t.GetChannel()), + zap.Int64("nodeID", t.GetNodeID()), + zap.Time("startTime", startTime), + zap.Duration("execDuration", execDuration)) + } +} + var ( ioPool *conc.Pool[any] ioPoolInitOnce sync.Once diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 84c0b61c9d..70003d6e24 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -942,3 +942,13 @@ func getStatsLogPath(rootPath string, segmentID typeutil.UniqueID) string { func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string { return metautil.BuildDeltaLogPath(rootPath, 10, 100, segmentID, 10000) } + +func TestCheckDelay(t *testing.T) { + handler := &compactionPlanHandler{} + t1 := &mixCompactionTask{CompactionTask: &datapb.CompactionTask{StartTime: time.Now().Add(-100 * time.Minute).Unix()}} + handler.checkDelay(t1) + t2 := &l0CompactionTask{CompactionTask: &datapb.CompactionTask{StartTime: time.Now().Add(-100 * time.Minute).Unix()}} + handler.checkDelay(t2) + t3 := &clusteringCompactionTask{CompactionTask: &datapb.CompactionTask{StartTime: time.Now().Add(-100 * time.Minute).Unix()}} + handler.checkDelay(t3) +} diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index e3435c2900..18c24281e7 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1517,7 +1517,7 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT return nil } - log.Info("start flushing segments", zap.Int64s("segmentIDs", flushableIDs)) + log.Info("start flushing segments", zap.Int64s("segmentIDs", flushableIDs), zap.Uint64("ts", ts)) // update segment last update triggered time // it's ok to fail flushing, since next timetick after duration will re-trigger s.setLastFlushTime(flushableSegments)