From 6542c1ab0e81059a7140269451b4aa3362909644 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 5 Aug 2024 16:26:17 +0800 Subject: [PATCH] enhance: Add monitoring metrics for task execution time in datacoord (#35139) issue: #35138 Signed-off-by: Cai Zhang --- internal/datacoord/task_analyze.go | 33 ++++++ internal/datacoord/task_index.go | 33 ++++++ internal/datacoord/task_scheduler.go | 113 +++++++++++++++++++- internal/datacoord/task_scheduler_test.go | 15 ++- internal/datacoord/types.go | 8 ++ pkg/metrics/datacoord_metrics.go | 13 +++ pkg/util/paramtable/component_param.go | 8 ++ pkg/util/paramtable/component_param_test.go | 2 + 8 files changed, 222 insertions(+), 3 deletions(-) diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index c29888099c..2d9e77c93e 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "math" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -41,6 +42,10 @@ type analyzeTask struct { nodeID int64 taskInfo *indexpb.AnalyzeResult + queueTime time.Time + startTime time.Time + endTime time.Time + req *indexpb.AnalyzeRequest } @@ -56,6 +61,34 @@ func (at *analyzeTask) ResetNodeID() { at.nodeID = 0 } +func (at *analyzeTask) SetQueueTime(t time.Time) { + at.queueTime = t +} + +func (at *analyzeTask) GetQueueTime() time.Time { + return at.queueTime +} + +func (at *analyzeTask) SetStartTime(t time.Time) { + at.startTime = t +} + +func (at *analyzeTask) GetStartTime() time.Time { + return at.startTime +} + +func (at *analyzeTask) SetEndTime(t time.Time) { + at.endTime = t +} + +func (at *analyzeTask) GetEndTime() time.Time { + return at.endTime +} + +func (at *analyzeTask) GetTaskType() string { + return indexpb.JobType_JobTypeIndexJob.String() +} + func (at *analyzeTask) CheckTaskHealthy(mt *meta) bool { t := mt.analyzeMeta.GetTask(at.GetTaskID()) return t != nil diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index ab23f656a1..ed75d885b4 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "path" + "time" "go.uber.org/zap" @@ -39,6 +40,10 @@ type indexBuildTask struct { nodeID int64 taskInfo *indexpb.IndexTaskInfo + queueTime time.Time + startTime time.Time + endTime time.Time + req *indexpb.CreateJobRequest } @@ -56,6 +61,34 @@ func (it *indexBuildTask) ResetNodeID() { it.nodeID = 0 } +func (it *indexBuildTask) SetQueueTime(t time.Time) { + it.queueTime = t +} + +func (it *indexBuildTask) GetQueueTime() time.Time { + return it.queueTime +} + +func (it *indexBuildTask) SetStartTime(t time.Time) { + it.startTime = t +} + +func (it *indexBuildTask) GetStartTime() time.Time { + return it.startTime +} + +func (it *indexBuildTask) SetEndTime(t time.Time) { + it.endTime = t +} + +func (it *indexBuildTask) GetEndTime() time.Time { + return it.endTime +} + +func (it *indexBuildTask) GetTaskType() string { + return indexpb.JobType_JobTypeIndexJob.String() +} + func (it *indexBuildTask) CheckTaskHealthy(mt *meta) bool { _, exist := mt.indexMeta.GetIndexJob(it.GetTaskID()) return exist diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 1893dc15cf..8c6035f385 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -27,6 +27,8 @@ import ( "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/lock" ) const ( @@ -40,11 +42,13 @@ type taskScheduler struct { cancel context.CancelFunc wg sync.WaitGroup - scheduleDuration time.Duration + scheduleDuration time.Duration + collectMetricsDuration time.Duration // TODO @xiaocai2333: use priority queue tasks map[int64]Task notifyChan chan struct{} + taskLock *lock.KeyLock[int64] meta *meta @@ -70,7 +74,9 @@ func newTaskScheduler( meta: metaTable, tasks: make(map[int64]Task), notifyChan: make(chan struct{}, 1), + taskLock: lock.NewKeyLock[int64](), scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond), + collectMetricsDuration: time.Minute, policy: defaultBuildIndexPolicy, nodeManager: nodeManager, chunkManager: chunkManager, @@ -82,8 +88,9 @@ func newTaskScheduler( } func (s *taskScheduler) Start() { - s.wg.Add(1) + s.wg.Add(2) go s.schedule() + go s.collectTaskMetrics() } func (s *taskScheduler) Stop() { @@ -107,6 +114,9 @@ func (s *taskScheduler) reloadFromKV() { State: segIndex.IndexState, FailReason: segIndex.FailReason, }, + queueTime: time.Now(), + startTime: time.Now(), + endTime: time.Now(), } } } @@ -123,6 +133,9 @@ func (s *taskScheduler) reloadFromKV() { State: t.State, FailReason: t.FailReason, }, + queueTime: time.Now(), + startTime: time.Now(), + endTime: time.Now(), } } } @@ -145,6 +158,7 @@ func (s *taskScheduler) enqueue(task Task) { if _, ok := s.tasks[taskID]; !ok { s.tasks[taskID] = task } + task.SetQueueTime(time.Now()) log.Info("taskScheduler enqueue task", zap.Int64("taskID", taskID)) } @@ -193,11 +207,14 @@ func (s *taskScheduler) run() { s.policy(taskIDs) for _, taskID := range taskIDs { + s.taskLock.Lock(taskID) ok := s.process(taskID) if !ok { + s.taskLock.Unlock(taskID) log.Ctx(s.ctx).Info("there is no idle indexing node, wait a minute...") break } + s.taskLock.Unlock(taskID) } } @@ -261,6 +278,14 @@ func (s *taskScheduler) process(taskID UniqueID) bool { task.SetState(indexpb.JobState_JobStateRetry, "update meta building state failed") return false } + task.SetStartTime(time.Now()) + queueingTime := task.GetStartTime().Sub(task.GetQueueTime()) + if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) { + log.Warn("task queueing time is too long", zap.Int64("taskID", taskID), + zap.Int64("queueing time(ms)", queueingTime.Milliseconds())) + } + metrics.DataCoordTaskExecuteLatency. + WithLabelValues(task.GetTaskType(), metrics.Pending).Observe(float64(queueingTime.Milliseconds())) log.Ctx(s.ctx).Info("update task meta state to InProgress success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID)) case indexpb.JobState_JobStateFinished, indexpb.JobState_JobStateFailed: @@ -268,6 +293,14 @@ func (s *taskScheduler) process(taskID UniqueID) bool { log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err)) return true } + task.SetEndTime(time.Now()) + runningTime := task.GetEndTime().Sub(task.GetStartTime()) + if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) { + log.Warn("task running time is too long", zap.Int64("taskID", taskID), + zap.Int64("running time(ms)", runningTime.Milliseconds())) + } + metrics.DataCoordTaskExecuteLatency. + WithLabelValues(task.GetTaskType(), metrics.Executing).Observe(float64(runningTime.Milliseconds())) client, exist := s.nodeManager.GetClientByID(task.GetNodeID()) if exist { if !task.DropTaskOnWorker(s.ctx, client) { @@ -296,3 +329,79 @@ func (s *taskScheduler) process(taskID UniqueID) bool { } return true } + +func (s *taskScheduler) collectTaskMetrics() { + defer s.wg.Done() + + ticker := time.NewTicker(s.collectMetricsDuration) + defer ticker.Stop() + for { + select { + case <-s.ctx.Done(): + log.Warn("task scheduler context done") + return + case <-ticker.C: + s.RLock() + taskIDs := make([]UniqueID, 0, len(s.tasks)) + for tID := range s.tasks { + taskIDs = append(taskIDs, tID) + } + s.RUnlock() + + maxTaskQueueingTime := make(map[string]int64) + maxTaskRunningTime := make(map[string]int64) + + collectMetricsFunc := func(taskID int64) { + s.taskLock.Lock(taskID) + defer s.taskLock.Unlock(taskID) + + task := s.getTask(taskID) + if task == nil { + return + } + + state := task.GetState() + switch state { + case indexpb.JobState_JobStateNone: + return + case indexpb.JobState_JobStateInit: + queueingTime := time.Since(task.GetQueueTime()) + if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) { + log.Warn("task queueing time is too long", zap.Int64("taskID", taskID), + zap.Int64("queueing time(ms)", queueingTime.Milliseconds())) + } + + maxQueueingTime, ok := maxTaskQueueingTime[task.GetTaskType()] + if !ok || maxQueueingTime < queueingTime.Milliseconds() { + maxTaskQueueingTime[task.GetTaskType()] = queueingTime.Milliseconds() + } + case indexpb.JobState_JobStateInProgress: + runningTime := time.Since(task.GetStartTime()) + if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) { + log.Warn("task running time is too long", zap.Int64("taskID", taskID), + zap.Int64("running time(ms)", runningTime.Milliseconds())) + } + + maxRunningTime, ok := maxTaskRunningTime[task.GetTaskType()] + if !ok || maxRunningTime < runningTime.Milliseconds() { + maxTaskRunningTime[task.GetTaskType()] = runningTime.Milliseconds() + } + } + } + + for _, taskID := range taskIDs { + collectMetricsFunc(taskID) + } + + for taskType, queueingTime := range maxTaskQueueingTime { + metrics.DataCoordTaskExecuteLatency. + WithLabelValues(taskType, metrics.Pending).Observe(float64(queueingTime)) + } + + for taskType, runningTime := range maxTaskRunningTime { + metrics.DataCoordTaskExecuteLatency. + WithLabelValues(taskType, metrics.Executing).Observe(float64(runningTime)) + } + } + } +} diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index dd6994e904..5dd986ac2d 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -732,14 +732,26 @@ func (s *taskSchedulerSuite) TearDownSuite() { func (s *taskSchedulerSuite) scheduler(handler Handler) { ctx := context.Background() + var once sync.Once + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key) catalog := catalogmocks.NewDataCoordCatalog(s.T()) - catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil) + catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task *indexpb.AnalyzeTask) error { + once.Do(func() { + time.Sleep(time.Second * 3) + }) + return nil + }) catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil) in := mocks.NewMockIndexNodeClient(s.T()) in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil) in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, request *indexpb.QueryJobsV2Request, option ...grpc.CallOption) (*indexpb.QueryJobsV2Response, error) { + once.Do(func() { + time.Sleep(time.Second * 3) + }) switch request.GetJobType() { case indexpb.JobType_JobTypeIndexJob: results := make([]*indexpb.IndexTaskInfo, 0) @@ -815,6 +827,7 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { mt.segments.DropSegment(segID + 9) scheduler.scheduleDuration = time.Millisecond * 500 + scheduler.collectMetricsDuration = time.Millisecond * 200 scheduler.Start() s.Run("enqueue", func() { diff --git a/internal/datacoord/types.go b/internal/datacoord/types.go index c1a138eb44..3d9cf46fd6 100644 --- a/internal/datacoord/types.go +++ b/internal/datacoord/types.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "time" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/types" @@ -38,4 +39,11 @@ type Task interface { QueryResult(ctx context.Context, client types.IndexNodeClient) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool SetJobInfo(meta *meta) error + SetQueueTime(time.Time) + GetQueueTime() time.Time + SetStartTime(time.Time) + GetStartTime() time.Time + SetEndTime(time.Time) + GetEndTime() time.Time + GetTaskType() string } diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 3bf4dfe8b7..e9b999ace3 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -309,6 +309,18 @@ var ( Name: "import_tasks", Help: "the import tasks grouping by type and state", }, []string{"task_type", "import_state"}) + + DataCoordTaskExecuteLatency = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: milvusNamespace, + Subsystem: typeutil.DataCoordRole, + Name: "task_execute_max_latency", + Help: "latency of task execute operation", + Buckets: longTaskBuckets, + }, []string{ + taskTypeLabel, + statusLabelName, + }) ) // RegisterDataCoord registers DataCoord metrics @@ -336,6 +348,7 @@ func RegisterDataCoord(registry *prometheus.Registry) { registry.MustRegister(ImportTasks) registry.MustRegister(GarbageCollectorFileScanDuration) registry.MustRegister(GarbageCollectorRunCount) + registry.MustRegister(DataCoordTaskExecuteLatency) } func CleanupDataCoordSegmentMetrics(dbName string, collectionID int64, segmentID int64) { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 3941e28adb..597966995c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -3080,6 +3080,7 @@ type dataCoordConfig struct { WithCredential ParamItem `refreshable:"false"` IndexNodeID ParamItem `refreshable:"false"` IndexTaskSchedulerInterval ParamItem `refreshable:"false"` + TaskSlowThreshold ParamItem `refreshable:"true"` MinSegmentNumRowsToEnableIndex ParamItem `refreshable:"true"` BrokerTimeout ParamItem `refreshable:"false"` @@ -3731,6 +3732,13 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.IndexTaskSchedulerInterval.Init(base.mgr) + p.TaskSlowThreshold = ParamItem{ + Key: "datacoord.scheduler.taskSlowThreshold", + Version: "2.0.0", + DefaultValue: "300", + } + p.TaskSlowThreshold.Init(base.mgr) + p.BrokerTimeout = ParamItem{ Key: "dataCoord.brokerTimeout", Version: "2.3.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f25f167c14..e7b1765e18 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -497,6 +497,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 5, Params.MixCompactionSlotUsage.GetAsInt()) params.Save("dataCoord.slot.l0DeleteCompactionUsage", "4") assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt()) + params.Save("datacoord.scheduler.taskSlowThreshold", "1000") + assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second)) }) t.Run("test dataNodeConfig", func(t *testing.T) {