mirror of https://github.com/milvus-io/milvus.git
enhance: Log warn on delayed compaction tasks (#36049)
/kind enhancement --------- Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/36113/head
parent
e480b103bd
commit
6b4ae0c65e
|
@ -43,6 +43,12 @@ import (
|
|||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
)
|
||||
|
||||
var maxCompactionTaskExecutionDuration = map[datapb.CompactionType]time.Duration{
|
||||
datapb.CompactionType_MixCompaction: 30 * time.Minute,
|
||||
datapb.CompactionType_Level0DeleteCompaction: 30 * time.Minute,
|
||||
datapb.CompactionType_ClusteringCompaction: 60 * time.Minute,
|
||||
}
|
||||
|
||||
type compactionPlanContext interface {
|
||||
start()
|
||||
stop()
|
||||
|
@ -658,6 +664,7 @@ func (c *compactionPlanHandler) checkCompaction() error {
|
|||
var finishedTasks []CompactionTask
|
||||
c.executingGuard.RLock()
|
||||
for _, t := range c.executingTasks {
|
||||
c.checkDelay(t)
|
||||
finished := t.Process()
|
||||
if finished {
|
||||
finishedTasks = append(finishedTasks, t)
|
||||
|
@ -734,6 +741,23 @@ func (c *compactionPlanHandler) getTasksByState(state datapb.CompactionTaskState
|
|||
return tasks
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) checkDelay(t CompactionTask) {
|
||||
log := log.Ctx(context.TODO()).WithRateGroup("compactionPlanHandler.checkDelay", 1.0, 60.0)
|
||||
maxExecDuration := maxCompactionTaskExecutionDuration[t.GetType()]
|
||||
startTime := time.Unix(t.GetStartTime(), 0)
|
||||
execDuration := time.Since(startTime)
|
||||
if execDuration >= maxExecDuration {
|
||||
log.RatedWarn(60, "compaction task is delay",
|
||||
zap.Int64("planID", t.GetPlanID()),
|
||||
zap.String("type", t.GetType().String()),
|
||||
zap.String("state", t.GetState().String()),
|
||||
zap.String("vchannel", t.GetChannel()),
|
||||
zap.Int64("nodeID", t.GetNodeID()),
|
||||
zap.Time("startTime", startTime),
|
||||
zap.Duration("execDuration", execDuration))
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
ioPool *conc.Pool[any]
|
||||
ioPoolInitOnce sync.Once
|
||||
|
|
|
@ -942,3 +942,13 @@ func getStatsLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
|||
func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
||||
return metautil.BuildDeltaLogPath(rootPath, 10, 100, segmentID, 10000)
|
||||
}
|
||||
|
||||
func TestCheckDelay(t *testing.T) {
|
||||
handler := &compactionPlanHandler{}
|
||||
t1 := &mixCompactionTask{CompactionTask: &datapb.CompactionTask{StartTime: time.Now().Add(-100 * time.Minute).Unix()}}
|
||||
handler.checkDelay(t1)
|
||||
t2 := &l0CompactionTask{CompactionTask: &datapb.CompactionTask{StartTime: time.Now().Add(-100 * time.Minute).Unix()}}
|
||||
handler.checkDelay(t2)
|
||||
t3 := &clusteringCompactionTask{CompactionTask: &datapb.CompactionTask{StartTime: time.Now().Add(-100 * time.Minute).Unix()}}
|
||||
handler.checkDelay(t3)
|
||||
}
|
||||
|
|
|
@ -1517,7 +1517,7 @@ func (s *Server) handleDataNodeTtMsg(ctx context.Context, ttMsg *msgpb.DataNodeT
|
|||
return nil
|
||||
}
|
||||
|
||||
log.Info("start flushing segments", zap.Int64s("segmentIDs", flushableIDs))
|
||||
log.Info("start flushing segments", zap.Int64s("segmentIDs", flushableIDs), zap.Uint64("ts", ts))
|
||||
// update segment last update triggered time
|
||||
// it's ok to fail flushing, since next timetick after duration will re-trigger
|
||||
s.setLastFlushTime(flushableSegments)
|
||||
|
|
Loading…
Reference in New Issue