enhance: Add monitoring metrics for task execution time in datacoord (#35139)

issue: #35138

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
pull/35222/head
cai.zhang 2024-08-05 16:26:17 +08:00 committed by GitHub
parent d905713724
commit 6542c1ab0e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 222 additions and 3 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"math"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
@ -41,6 +42,10 @@ type analyzeTask struct {
nodeID int64
taskInfo *indexpb.AnalyzeResult
queueTime time.Time
startTime time.Time
endTime time.Time
req *indexpb.AnalyzeRequest
}
@ -56,6 +61,34 @@ func (at *analyzeTask) ResetNodeID() {
at.nodeID = 0
}
func (at *analyzeTask) SetQueueTime(t time.Time) {
at.queueTime = t
}
func (at *analyzeTask) GetQueueTime() time.Time {
return at.queueTime
}
func (at *analyzeTask) SetStartTime(t time.Time) {
at.startTime = t
}
func (at *analyzeTask) GetStartTime() time.Time {
return at.startTime
}
func (at *analyzeTask) SetEndTime(t time.Time) {
at.endTime = t
}
func (at *analyzeTask) GetEndTime() time.Time {
return at.endTime
}
func (at *analyzeTask) GetTaskType() string {
return indexpb.JobType_JobTypeIndexJob.String()
}
func (at *analyzeTask) CheckTaskHealthy(mt *meta) bool {
t := mt.analyzeMeta.GetTask(at.GetTaskID())
return t != nil

View File

@ -19,6 +19,7 @@ package datacoord
import (
"context"
"path"
"time"
"go.uber.org/zap"
@ -39,6 +40,10 @@ type indexBuildTask struct {
nodeID int64
taskInfo *indexpb.IndexTaskInfo
queueTime time.Time
startTime time.Time
endTime time.Time
req *indexpb.CreateJobRequest
}
@ -56,6 +61,34 @@ func (it *indexBuildTask) ResetNodeID() {
it.nodeID = 0
}
func (it *indexBuildTask) SetQueueTime(t time.Time) {
it.queueTime = t
}
func (it *indexBuildTask) GetQueueTime() time.Time {
return it.queueTime
}
func (it *indexBuildTask) SetStartTime(t time.Time) {
it.startTime = t
}
func (it *indexBuildTask) GetStartTime() time.Time {
return it.startTime
}
func (it *indexBuildTask) SetEndTime(t time.Time) {
it.endTime = t
}
func (it *indexBuildTask) GetEndTime() time.Time {
return it.endTime
}
func (it *indexBuildTask) GetTaskType() string {
return indexpb.JobType_JobTypeIndexJob.String()
}
func (it *indexBuildTask) CheckTaskHealthy(mt *meta) bool {
_, exist := mt.indexMeta.GetIndexJob(it.GetTaskID())
return exist

View File

@ -27,6 +27,8 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/lock"
)
const (
@ -40,11 +42,13 @@ type taskScheduler struct {
cancel context.CancelFunc
wg sync.WaitGroup
scheduleDuration time.Duration
scheduleDuration time.Duration
collectMetricsDuration time.Duration
// TODO @xiaocai2333: use priority queue
tasks map[int64]Task
notifyChan chan struct{}
taskLock *lock.KeyLock[int64]
meta *meta
@ -70,7 +74,9 @@ func newTaskScheduler(
meta: metaTable,
tasks: make(map[int64]Task),
notifyChan: make(chan struct{}, 1),
taskLock: lock.NewKeyLock[int64](),
scheduleDuration: Params.DataCoordCfg.IndexTaskSchedulerInterval.GetAsDuration(time.Millisecond),
collectMetricsDuration: time.Minute,
policy: defaultBuildIndexPolicy,
nodeManager: nodeManager,
chunkManager: chunkManager,
@ -82,8 +88,9 @@ func newTaskScheduler(
}
func (s *taskScheduler) Start() {
s.wg.Add(1)
s.wg.Add(2)
go s.schedule()
go s.collectTaskMetrics()
}
func (s *taskScheduler) Stop() {
@ -107,6 +114,9 @@ func (s *taskScheduler) reloadFromKV() {
State: segIndex.IndexState,
FailReason: segIndex.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
}
}
}
@ -123,6 +133,9 @@ func (s *taskScheduler) reloadFromKV() {
State: t.State,
FailReason: t.FailReason,
},
queueTime: time.Now(),
startTime: time.Now(),
endTime: time.Now(),
}
}
}
@ -145,6 +158,7 @@ func (s *taskScheduler) enqueue(task Task) {
if _, ok := s.tasks[taskID]; !ok {
s.tasks[taskID] = task
}
task.SetQueueTime(time.Now())
log.Info("taskScheduler enqueue task", zap.Int64("taskID", taskID))
}
@ -193,11 +207,14 @@ func (s *taskScheduler) run() {
s.policy(taskIDs)
for _, taskID := range taskIDs {
s.taskLock.Lock(taskID)
ok := s.process(taskID)
if !ok {
s.taskLock.Unlock(taskID)
log.Ctx(s.ctx).Info("there is no idle indexing node, wait a minute...")
break
}
s.taskLock.Unlock(taskID)
}
}
@ -261,6 +278,14 @@ func (s *taskScheduler) process(taskID UniqueID) bool {
task.SetState(indexpb.JobState_JobStateRetry, "update meta building state failed")
return false
}
task.SetStartTime(time.Now())
queueingTime := task.GetStartTime().Sub(task.GetQueueTime())
if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task queueing time is too long", zap.Int64("taskID", taskID),
zap.Int64("queueing time(ms)", queueingTime.Milliseconds()))
}
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(task.GetTaskType(), metrics.Pending).Observe(float64(queueingTime.Milliseconds()))
log.Ctx(s.ctx).Info("update task meta state to InProgress success", zap.Int64("taskID", taskID),
zap.Int64("nodeID", nodeID))
case indexpb.JobState_JobStateFinished, indexpb.JobState_JobStateFailed:
@ -268,6 +293,14 @@ func (s *taskScheduler) process(taskID UniqueID) bool {
log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err))
return true
}
task.SetEndTime(time.Now())
runningTime := task.GetEndTime().Sub(task.GetStartTime())
if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task running time is too long", zap.Int64("taskID", taskID),
zap.Int64("running time(ms)", runningTime.Milliseconds()))
}
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(task.GetTaskType(), metrics.Executing).Observe(float64(runningTime.Milliseconds()))
client, exist := s.nodeManager.GetClientByID(task.GetNodeID())
if exist {
if !task.DropTaskOnWorker(s.ctx, client) {
@ -296,3 +329,79 @@ func (s *taskScheduler) process(taskID UniqueID) bool {
}
return true
}
func (s *taskScheduler) collectTaskMetrics() {
defer s.wg.Done()
ticker := time.NewTicker(s.collectMetricsDuration)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
log.Warn("task scheduler context done")
return
case <-ticker.C:
s.RLock()
taskIDs := make([]UniqueID, 0, len(s.tasks))
for tID := range s.tasks {
taskIDs = append(taskIDs, tID)
}
s.RUnlock()
maxTaskQueueingTime := make(map[string]int64)
maxTaskRunningTime := make(map[string]int64)
collectMetricsFunc := func(taskID int64) {
s.taskLock.Lock(taskID)
defer s.taskLock.Unlock(taskID)
task := s.getTask(taskID)
if task == nil {
return
}
state := task.GetState()
switch state {
case indexpb.JobState_JobStateNone:
return
case indexpb.JobState_JobStateInit:
queueingTime := time.Since(task.GetQueueTime())
if queueingTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task queueing time is too long", zap.Int64("taskID", taskID),
zap.Int64("queueing time(ms)", queueingTime.Milliseconds()))
}
maxQueueingTime, ok := maxTaskQueueingTime[task.GetTaskType()]
if !ok || maxQueueingTime < queueingTime.Milliseconds() {
maxTaskQueueingTime[task.GetTaskType()] = queueingTime.Milliseconds()
}
case indexpb.JobState_JobStateInProgress:
runningTime := time.Since(task.GetStartTime())
if runningTime > Params.DataCoordCfg.TaskSlowThreshold.GetAsDuration(time.Second) {
log.Warn("task running time is too long", zap.Int64("taskID", taskID),
zap.Int64("running time(ms)", runningTime.Milliseconds()))
}
maxRunningTime, ok := maxTaskRunningTime[task.GetTaskType()]
if !ok || maxRunningTime < runningTime.Milliseconds() {
maxTaskRunningTime[task.GetTaskType()] = runningTime.Milliseconds()
}
}
}
for _, taskID := range taskIDs {
collectMetricsFunc(taskID)
}
for taskType, queueingTime := range maxTaskQueueingTime {
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(taskType, metrics.Pending).Observe(float64(queueingTime))
}
for taskType, runningTime := range maxTaskRunningTime {
metrics.DataCoordTaskExecuteLatency.
WithLabelValues(taskType, metrics.Executing).Observe(float64(runningTime))
}
}
}
}

View File

@ -732,14 +732,26 @@ func (s *taskSchedulerSuite) TearDownSuite() {
func (s *taskSchedulerSuite) scheduler(handler Handler) {
ctx := context.Background()
var once sync.Once
paramtable.Get().Save(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key, "1")
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.TaskSlowThreshold.Key)
catalog := catalogmocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).Return(nil)
catalog.EXPECT().SaveAnalyzeTask(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task *indexpb.AnalyzeTask) error {
once.Do(func() {
time.Sleep(time.Second * 3)
})
return nil
})
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)
in := mocks.NewMockIndexNodeClient(s.T())
in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).Return(merr.Success(), nil)
in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn(
func(ctx context.Context, request *indexpb.QueryJobsV2Request, option ...grpc.CallOption) (*indexpb.QueryJobsV2Response, error) {
once.Do(func() {
time.Sleep(time.Second * 3)
})
switch request.GetJobType() {
case indexpb.JobType_JobTypeIndexJob:
results := make([]*indexpb.IndexTaskInfo, 0)
@ -815,6 +827,7 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
mt.segments.DropSegment(segID + 9)
scheduler.scheduleDuration = time.Millisecond * 500
scheduler.collectMetricsDuration = time.Millisecond * 200
scheduler.Start()
s.Run("enqueue", func() {

View File

@ -18,6 +18,7 @@ package datacoord
import (
"context"
"time"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/types"
@ -38,4 +39,11 @@ type Task interface {
QueryResult(ctx context.Context, client types.IndexNodeClient)
DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool
SetJobInfo(meta *meta) error
SetQueueTime(time.Time)
GetQueueTime() time.Time
SetStartTime(time.Time)
GetStartTime() time.Time
SetEndTime(time.Time)
GetEndTime() time.Time
GetTaskType() string
}

View File

@ -309,6 +309,18 @@ var (
Name: "import_tasks",
Help: "the import tasks grouping by type and state",
}, []string{"task_type", "import_state"})
DataCoordTaskExecuteLatency = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: milvusNamespace,
Subsystem: typeutil.DataCoordRole,
Name: "task_execute_max_latency",
Help: "latency of task execute operation",
Buckets: longTaskBuckets,
}, []string{
taskTypeLabel,
statusLabelName,
})
)
// RegisterDataCoord registers DataCoord metrics
@ -336,6 +348,7 @@ func RegisterDataCoord(registry *prometheus.Registry) {
registry.MustRegister(ImportTasks)
registry.MustRegister(GarbageCollectorFileScanDuration)
registry.MustRegister(GarbageCollectorRunCount)
registry.MustRegister(DataCoordTaskExecuteLatency)
}
func CleanupDataCoordSegmentMetrics(dbName string, collectionID int64, segmentID int64) {

View File

@ -3080,6 +3080,7 @@ type dataCoordConfig struct {
WithCredential ParamItem `refreshable:"false"`
IndexNodeID ParamItem `refreshable:"false"`
IndexTaskSchedulerInterval ParamItem `refreshable:"false"`
TaskSlowThreshold ParamItem `refreshable:"true"`
MinSegmentNumRowsToEnableIndex ParamItem `refreshable:"true"`
BrokerTimeout ParamItem `refreshable:"false"`
@ -3731,6 +3732,13 @@ During compaction, the size of segment # of rows is able to exceed segment max #
}
p.IndexTaskSchedulerInterval.Init(base.mgr)
p.TaskSlowThreshold = ParamItem{
Key: "datacoord.scheduler.taskSlowThreshold",
Version: "2.0.0",
DefaultValue: "300",
}
p.TaskSlowThreshold.Init(base.mgr)
p.BrokerTimeout = ParamItem{
Key: "dataCoord.brokerTimeout",
Version: "2.3.0",

View File

@ -497,6 +497,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 5, Params.MixCompactionSlotUsage.GetAsInt())
params.Save("dataCoord.slot.l0DeleteCompactionUsage", "4")
assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt())
params.Save("datacoord.scheduler.taskSlowThreshold", "1000")
assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second))
})
t.Run("test dataNodeConfig", func(t *testing.T) {