diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index 6d004992bc..6762bee360 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -209,33 +209,31 @@ func (m *indexMeta) updateIndexTasksMetrics() { m.RLock() defer m.RUnlock() - taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int) + taskMetrics := make(map[indexpb.JobState]int) for _, segIdx := range m.segmentBuildInfo.List() { if segIdx.IsDeleted || !m.isIndexExist(segIdx.CollectionID, segIdx.IndexID) { continue } - if _, ok := taskMetrics[segIdx.CollectionID]; !ok { - taskMetrics[segIdx.CollectionID] = make(map[commonpb.IndexState]int) - taskMetrics[segIdx.CollectionID][commonpb.IndexState_Unissued] = 0 - taskMetrics[segIdx.CollectionID][commonpb.IndexState_InProgress] = 0 - taskMetrics[segIdx.CollectionID][commonpb.IndexState_Finished] = 0 - taskMetrics[segIdx.CollectionID][commonpb.IndexState_Failed] = 0 + + switch segIdx.IndexState { + case commonpb.IndexState_IndexStateNone: + taskMetrics[indexpb.JobState_JobStateNone]++ + case commonpb.IndexState_Unissued: + taskMetrics[indexpb.JobState_JobStateInit]++ + case commonpb.IndexState_InProgress: + taskMetrics[indexpb.JobState_JobStateInProgress]++ + case commonpb.IndexState_Finished: + taskMetrics[indexpb.JobState_JobStateFinished]++ + case commonpb.IndexState_Failed: + taskMetrics[indexpb.JobState_JobStateFailed]++ + case commonpb.IndexState_Retry: + taskMetrics[indexpb.JobState_JobStateRetry]++ } - taskMetrics[segIdx.CollectionID][segIdx.IndexState]++ } - for collID, m := range taskMetrics { - for k, v := range m { - switch k { - case commonpb.IndexState_Unissued: - metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.UnissuedIndexTaskLabel).Set(float64(v)) - case commonpb.IndexState_InProgress: - metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.InProgressIndexTaskLabel).Set(float64(v)) - case commonpb.IndexState_Finished: - metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.FinishedIndexTaskLabel).Set(float64(v)) - case commonpb.IndexState_Failed: - metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.FailedIndexTaskLabel).Set(float64(v)) - } - } + + jobType := indexpb.JobType_JobTypeIndexJob.String() + for k, v := range taskMetrics { + metrics.TaskNum.WithLabelValues(jobType, k.String()).Set(float64(v)) } log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics))) } @@ -904,10 +902,7 @@ func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*colle return total } -func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segID, indexID, buildID UniqueID) error { - m.Lock() - defer m.Unlock() - +func (m *indexMeta) removeSegmentIndex(ctx context.Context, collID, partID, segID, indexID, buildID UniqueID) error { err := m.catalog.DropSegmentIndex(ctx, collID, partID, segID, buildID) if err != nil { return err @@ -925,6 +920,25 @@ func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segI return nil } +func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segID, indexID, buildID UniqueID) error { + m.Lock() + defer m.Unlock() + + return m.removeSegmentIndex(ctx, collID, partID, segID, indexID, buildID) +} + +func (m *indexMeta) RemoveSegmentIndexByID(ctx context.Context, buildID UniqueID) error { + m.Lock() + defer m.Unlock() + + segIdx, ok := m.segmentBuildInfo.Get(buildID) + if !ok { + return nil + } + + return m.removeSegmentIndex(ctx, segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, buildID) +} + func (m *indexMeta) GetDeletedIndexes() []*model.Index { m.RLock() defer m.RUnlock() diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 6f75b1c2a7..d8ffca36b8 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -1505,6 +1505,57 @@ func TestRemoveSegmentIndex(t *testing.T) { }) } +func TestRemoveSegmentIndexByID(t *testing.T) { + t.Run("drop segment index fail", func(t *testing.T) { + expectedErr := errors.New("error") + catalog := catalogmocks.NewDataCoordCatalog(t) + catalog.EXPECT(). + DropSegmentIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(expectedErr) + + catalog.EXPECT().CreateSegmentIndex(mock.Anything, mock.Anything).Return(nil) + + m := newSegmentIndexMeta(catalog) + err := m.AddSegmentIndex(context.TODO(), &model.SegmentIndex{ + SegmentID: 3, + CollectionID: 1, + PartitionID: 2, + NumRows: 1024, + IndexID: 1, + BuildID: 4, + }) + assert.NoError(t, err) + err = m.RemoveSegmentIndexByID(context.TODO(), 4) + assert.Error(t, err) + assert.EqualError(t, err, "error") + }) + + t.Run("remove segment index ok", func(t *testing.T) { + catalog := catalogmocks.NewDataCoordCatalog(t) + catalog.EXPECT(). + DropSegmentIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(nil) + + catalog.EXPECT().CreateSegmentIndex(mock.Anything, mock.Anything).Return(nil) + + m := newSegmentIndexMeta(catalog) + err := m.AddSegmentIndex(context.TODO(), &model.SegmentIndex{ + SegmentID: 3, + CollectionID: 1, + PartitionID: 2, + NumRows: 1024, + IndexID: 1, + BuildID: 4, + }) + assert.NoError(t, err) + + err = m.RemoveSegmentIndexByID(context.TODO(), 4) + assert.NoError(t, err) + assert.Equal(t, len(m.segmentIndexes), 0) + assert.Equal(t, len(m.segmentBuildInfo.List()), 0) + }) +} + func TestIndexMeta_GetUnindexedSegments(t *testing.T) { catalog := &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)} m := createMeta(catalog, withIndexMeta(createIndexMeta(catalog))) diff --git a/internal/datacoord/stats_task_meta.go b/internal/datacoord/stats_task_meta.go index 349a6ffc07..2b2f3c26fe 100644 --- a/internal/datacoord/stats_task_meta.go +++ b/internal/datacoord/stats_task_meta.go @@ -19,7 +19,6 @@ package datacoord import ( "context" "fmt" - "strconv" "sync" "go.uber.org/zap" @@ -76,25 +75,14 @@ func (stm *statsTaskMeta) updateMetrics() { stm.RLock() defer stm.RUnlock() - taskMetrics := make(map[UniqueID]map[indexpb.JobState]int) + taskMetrics := make(map[indexpb.JobState]int) for _, t := range stm.tasks { - if _, ok := taskMetrics[t.GetCollectionID()]; !ok { - taskMetrics[t.GetCollectionID()] = make(map[indexpb.JobState]int) - taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateNone] = 0 - taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateInit] = 0 - taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateInProgress] = 0 - taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateFinished] = 0 - taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateFailed] = 0 - taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateRetry] = 0 - } - taskMetrics[t.GetCollectionID()][t.GetState()]++ + taskMetrics[t.GetState()]++ } jobType := indexpb.JobType_JobTypeStatsJob.String() - for collID, m := range taskMetrics { - for k, v := range m { - metrics.TaskNum.WithLabelValues(strconv.FormatInt(collID, 10), jobType, k.String()).Set(float64(v)) - } + for k, v := range taskMetrics { + metrics.TaskNum.WithLabelValues(jobType, k.String()).Set(float64(v)) } } diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index 8d588b47fa..0da89728ae 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -277,14 +277,17 @@ func (at *analyzeTask) QueryResult(ctx context.Context, client types.IndexNodeCl // infos length is always one. for _, result := range resp.GetAnalyzeJobResults().GetResults() { if result.GetTaskID() == at.GetTaskID() { - log.Ctx(ctx).Info("query analysis task info successfully", - zap.Int64("taskID", at.GetTaskID()), zap.String("result state", result.GetState().String()), - zap.String("failReason", result.GetFailReason())) if result.GetState() == indexpb.JobState_JobStateFinished || result.GetState() == indexpb.JobState_JobStateFailed || result.GetState() == indexpb.JobState_JobStateRetry { + log.Ctx(ctx).Info("query analysis task info successfully", + zap.Int64("taskID", at.GetTaskID()), zap.String("result state", result.GetState().String()), + zap.String("failReason", result.GetFailReason())) // state is retry or finished or failed at.setResult(result) } else if result.GetState() == indexpb.JobState_JobStateNone { + log.Ctx(ctx).Info("query analysis task info successfully", + zap.Int64("taskID", at.GetTaskID()), zap.String("result state", result.GetState().String()), + zap.String("failReason", result.GetFailReason())) at.SetState(indexpb.JobState_JobStateRetry, "analyze task state is none in info response") } // inProgress or unissued/init, keep InProgress state @@ -320,3 +323,7 @@ func (at *analyzeTask) DropTaskOnWorker(ctx context.Context, client types.IndexN func (at *analyzeTask) SetJobInfo(meta *meta) error { return meta.analyzeMeta.FinishTask(at.GetTaskID(), at.taskInfo) } + +func (at *analyzeTask) DropTaskMeta(ctx context.Context, meta *meta) error { + return meta.analyzeMeta.DropAnalyzeTask(ctx, at.GetTaskID()) +} diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index 9458356367..fb10a0273f 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -317,14 +317,17 @@ func (it *indexBuildTask) QueryResult(ctx context.Context, node types.IndexNodeC // indexInfos length is always one. for _, info := range resp.GetIndexJobResults().GetResults() { if info.GetBuildID() == it.GetTaskID() { - log.Ctx(ctx).Info("query task index info successfully", - zap.Int64("taskID", it.GetTaskID()), zap.String("result state", info.GetState().String()), - zap.String("failReason", info.GetFailReason())) if info.GetState() == commonpb.IndexState_Finished || info.GetState() == commonpb.IndexState_Failed || info.GetState() == commonpb.IndexState_Retry { + log.Ctx(ctx).Info("query task index info successfully", + zap.Int64("taskID", it.GetTaskID()), zap.String("result state", info.GetState().String()), + zap.String("failReason", info.GetFailReason())) // state is retry or finished or failed it.setResult(info) } else if info.GetState() == commonpb.IndexState_IndexStateNone { + log.Ctx(ctx).Info("query task index info successfully", + zap.Int64("taskID", it.GetTaskID()), zap.String("result state", info.GetState().String()), + zap.String("failReason", info.GetFailReason())) it.SetState(indexpb.JobState_JobStateRetry, "index state is none in info response") } // inProgress or unissued, keep InProgress state @@ -358,3 +361,7 @@ func (it *indexBuildTask) DropTaskOnWorker(ctx context.Context, client types.Ind func (it *indexBuildTask) SetJobInfo(meta *meta) error { return meta.indexMeta.FinishTask(it.taskInfo) } + +func (it *indexBuildTask) DropTaskMeta(ctx context.Context, meta *meta) error { + return meta.indexMeta.RemoveSegmentIndexByID(ctx, it.taskID) +} diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index d2ed08e2e1..e56a8dad26 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -125,6 +125,10 @@ func (s *taskScheduler) reloadFromMeta() { State: segIndex.IndexState, FailReason: segIndex.FailReason, }, + req: &workerpb.CreateJobRequest{ + ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), + BuildID: segIndex.BuildID, + }, queueTime: time.Now(), startTime: time.Now(), endTime: time.Now(), @@ -150,6 +154,10 @@ func (s *taskScheduler) reloadFromMeta() { State: t.State, FailReason: t.FailReason, }, + req: &workerpb.AnalyzeRequest{ + ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), + TaskID: taskID, + }, queueTime: time.Now(), startTime: time.Now(), endTime: time.Now(), @@ -176,6 +184,10 @@ func (s *taskScheduler) reloadFromMeta() { State: t.GetState(), FailReason: t.GetFailReason(), }, + req: &workerpb.CreateStatsRequest{ + ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(), + TaskID: taskID, + }, queueTime: time.Now(), startTime: time.Now(), endTime: time.Now(), @@ -333,6 +345,9 @@ func (s *taskScheduler) checkProcessingTasks() { } s.runningQueueLock.RUnlock() + if len(runningTaskIDs) <= 0 { + return + } log.Ctx(s.ctx).Info("check running tasks", zap.Int("runningTask num", len(runningTaskIDs))) var wg sync.WaitGroup @@ -410,7 +425,9 @@ func (s *taskScheduler) run() { switch task.GetState() { case indexpb.JobState_JobStateNone: - return + if !s.processNone(task) { + s.pendingTasks.Push(task) + } case indexpb.JobState_JobStateInit: s.pendingTasks.Push(task) default: @@ -433,7 +450,7 @@ func (s *taskScheduler) process(task Task, nodeID int64) bool { switch task.GetState() { case indexpb.JobState_JobStateNone: - return true + return s.processNone(task) case indexpb.JobState_JobStateInit: return s.processInit(task, nodeID) default: @@ -577,6 +594,14 @@ func (s *taskScheduler) processInit(task Task, nodeID int64) bool { return true } +func (s *taskScheduler) processNone(task Task) bool { + if err := task.DropTaskMeta(s.ctx, s.meta); err != nil { + log.Ctx(s.ctx).Warn("set job info failed", zap.Error(err)) + return false + } + return true +} + func (s *taskScheduler) processFinished(task Task) bool { if err := task.SetJobInfo(s.meta); err != nil { log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err)) diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index d2519195e2..1cebd680a6 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -783,6 +783,9 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { }) return nil }) + catalog.EXPECT().DropSegmentIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil).Maybe() + catalog.EXPECT().DropAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Maybe() catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil) // catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil) @@ -951,10 +954,9 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 8) s.True(exist) s.Equal(commonpb.IndexState_Finished, indexJob.IndexState) - indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 9) - s.True(exist) - // segment not healthy, wait for GC - s.Equal(commonpb.IndexState_Unissued, indexJob.IndexState) + _, exist = mt.indexMeta.GetIndexJob(buildID + 9) + s.False(exist) + // segment not healthy, remove task indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 10) s.True(exist) s.Equal(commonpb.IndexState_Finished, indexJob.IndexState) diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index 846aa7f4c4..ecfa316a33 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -283,13 +283,16 @@ func (st *statsTask) QueryResult(ctx context.Context, client types.IndexNodeClie for _, result := range resp.GetStatsJobResults().GetResults() { if result.GetTaskID() == st.GetTaskID() { - log.Ctx(ctx).Info("query stats task result success", zap.Int64("taskID", st.GetTaskID()), - zap.Int64("segmentID", st.segmentID), zap.String("result state", result.GetState().String()), - zap.String("failReason", result.GetFailReason())) if result.GetState() == indexpb.JobState_JobStateFinished || result.GetState() == indexpb.JobState_JobStateRetry || result.GetState() == indexpb.JobState_JobStateFailed { + log.Ctx(ctx).Info("query stats task result success", zap.Int64("taskID", st.GetTaskID()), + zap.Int64("segmentID", st.segmentID), zap.String("result state", result.GetState().String()), + zap.String("failReason", result.GetFailReason())) st.setResult(result) } else if result.GetState() == indexpb.JobState_JobStateNone { + log.Ctx(ctx).Info("query stats task result success", zap.Int64("taskID", st.GetTaskID()), + zap.Int64("segmentID", st.segmentID), zap.String("result state", result.GetState().String()), + zap.String("failReason", result.GetFailReason())) st.SetState(indexpb.JobState_JobStateRetry, "stats task state is none in info response") } // inProgress or unissued/init, keep InProgress state @@ -367,3 +370,12 @@ func (st *statsTask) SetJobInfo(meta *meta) error { zap.String("subJobType", st.subJobType.String()), zap.String("state", st.taskInfo.GetState().String())) return nil } + +func (st *statsTask) DropTaskMeta(ctx context.Context, meta *meta) error { + if err := meta.statsTaskMeta.DropStatsTask(st.taskID); err != nil { + log.Ctx(ctx).Warn("drop stats task failed", zap.Int64("taskID", st.taskID), zap.Error(err)) + return err + } + log.Ctx(ctx).Info("drop stats task success", zap.Int64("taskID", st.taskID)) + return nil +} diff --git a/internal/datacoord/types.go b/internal/datacoord/types.go index b0a4200a83..d11a237214 100644 --- a/internal/datacoord/types.go +++ b/internal/datacoord/types.go @@ -46,4 +46,5 @@ type Task interface { SetEndTime(time.Time) GetEndTime() time.Time GetTaskType() string + DropTaskMeta(ctx context.Context, meta *meta) error } diff --git a/pkg/metrics/datacoord_metrics.go b/pkg/metrics/datacoord_metrics.go index 8513bf6fe5..8d2490611f 100644 --- a/pkg/metrics/datacoord_metrics.go +++ b/pkg/metrics/datacoord_metrics.go @@ -310,6 +310,7 @@ var ( }, []string{statusLabelName}) // IndexTaskNum records the number of index tasks of each type. + // Deprecated: please ues TaskNum after v2.5.5. IndexTaskNum = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: milvusNamespace, @@ -343,7 +344,7 @@ var ( Help: "latency of task execute operation", Buckets: longTaskBuckets, }, []string{ - taskTypeLabel, + TaskTypeLabel, statusLabelName, }) @@ -354,7 +355,7 @@ var ( Subsystem: typeutil.DataCoordRole, Name: "task_count", Help: "number of index tasks of each type", - }, []string{collectionIDLabelName, taskTypeLabel, taskStateLabel}) + }, []string{TaskTypeLabel, TaskStateLabel}) ) // RegisterDataCoord registers DataCoord metrics diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 767c3e654b..84de95fcef 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -131,8 +131,8 @@ const ( LoadedLabel = "loaded" NumEntitiesAllLabel = "all" - taskTypeLabel = "task_type" - taskStateLabel = "task_state" + TaskTypeLabel = "task_type" + TaskStateLabel = "task_state" ) var ( diff --git a/pkg/metrics/proxy_metrics.go b/pkg/metrics/proxy_metrics.go index 9a404d024c..2d88fac37d 100644 --- a/pkg/metrics/proxy_metrics.go +++ b/pkg/metrics/proxy_metrics.go @@ -444,7 +444,7 @@ var ( Subsystem: typeutil.ProxyRole, Name: "queue_task_num", Help: "", - }, []string{nodeIDLabelName, queueTypeLabelName, taskStateLabel}) + }, []string{nodeIDLabelName, queueTypeLabelName, TaskStateLabel}) ) // RegisterProxy registers Proxy metrics diff --git a/pkg/metrics/querycoord_metrics.go b/pkg/metrics/querycoord_metrics.go index b1c0a9853f..c15bebb3b2 100644 --- a/pkg/metrics/querycoord_metrics.go +++ b/pkg/metrics/querycoord_metrics.go @@ -132,7 +132,7 @@ var ( Name: "task_latency", Help: "latency of all kind of task in query coord scheduler scheduler", Buckets: longTaskBuckets, - }, []string{collectionIDLabelName, taskTypeLabel, channelNameLabelName}) + }, []string{collectionIDLabelName, TaskTypeLabel, channelNameLabelName}) QueryCoordResourceGroupInfo = prometheus.NewGaugeVec( prometheus.GaugeOpts{