mirror of https://github.com/milvus-io/milvus.git
enhance: Remove task meta when task is no need to do (#40146)
issue: #39911 master pr: #39084 --------- Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/40165/head
parent
0c6518f344
commit
4a75fdac03
|
@ -209,33 +209,31 @@ func (m *indexMeta) updateIndexTasksMetrics() {
|
||||||
m.RLock()
|
m.RLock()
|
||||||
defer m.RUnlock()
|
defer m.RUnlock()
|
||||||
|
|
||||||
taskMetrics := make(map[UniqueID]map[commonpb.IndexState]int)
|
taskMetrics := make(map[indexpb.JobState]int)
|
||||||
for _, segIdx := range m.segmentBuildInfo.List() {
|
for _, segIdx := range m.segmentBuildInfo.List() {
|
||||||
if segIdx.IsDeleted || !m.isIndexExist(segIdx.CollectionID, segIdx.IndexID) {
|
if segIdx.IsDeleted || !m.isIndexExist(segIdx.CollectionID, segIdx.IndexID) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if _, ok := taskMetrics[segIdx.CollectionID]; !ok {
|
|
||||||
taskMetrics[segIdx.CollectionID] = make(map[commonpb.IndexState]int)
|
switch segIdx.IndexState {
|
||||||
taskMetrics[segIdx.CollectionID][commonpb.IndexState_Unissued] = 0
|
case commonpb.IndexState_IndexStateNone:
|
||||||
taskMetrics[segIdx.CollectionID][commonpb.IndexState_InProgress] = 0
|
taskMetrics[indexpb.JobState_JobStateNone]++
|
||||||
taskMetrics[segIdx.CollectionID][commonpb.IndexState_Finished] = 0
|
case commonpb.IndexState_Unissued:
|
||||||
taskMetrics[segIdx.CollectionID][commonpb.IndexState_Failed] = 0
|
taskMetrics[indexpb.JobState_JobStateInit]++
|
||||||
|
case commonpb.IndexState_InProgress:
|
||||||
|
taskMetrics[indexpb.JobState_JobStateInProgress]++
|
||||||
|
case commonpb.IndexState_Finished:
|
||||||
|
taskMetrics[indexpb.JobState_JobStateFinished]++
|
||||||
|
case commonpb.IndexState_Failed:
|
||||||
|
taskMetrics[indexpb.JobState_JobStateFailed]++
|
||||||
|
case commonpb.IndexState_Retry:
|
||||||
|
taskMetrics[indexpb.JobState_JobStateRetry]++
|
||||||
}
|
}
|
||||||
taskMetrics[segIdx.CollectionID][segIdx.IndexState]++
|
|
||||||
}
|
}
|
||||||
for collID, m := range taskMetrics {
|
|
||||||
for k, v := range m {
|
jobType := indexpb.JobType_JobTypeIndexJob.String()
|
||||||
switch k {
|
for k, v := range taskMetrics {
|
||||||
case commonpb.IndexState_Unissued:
|
metrics.TaskNum.WithLabelValues(jobType, k.String()).Set(float64(v))
|
||||||
metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.UnissuedIndexTaskLabel).Set(float64(v))
|
|
||||||
case commonpb.IndexState_InProgress:
|
|
||||||
metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.InProgressIndexTaskLabel).Set(float64(v))
|
|
||||||
case commonpb.IndexState_Finished:
|
|
||||||
metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.FinishedIndexTaskLabel).Set(float64(v))
|
|
||||||
case commonpb.IndexState_Failed:
|
|
||||||
metrics.IndexTaskNum.WithLabelValues(strconv.FormatInt(collID, 10), metrics.FailedIndexTaskLabel).Set(float64(v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics)))
|
log.Ctx(m.ctx).Info("update index metric", zap.Int("collectionNum", len(taskMetrics)))
|
||||||
}
|
}
|
||||||
|
@ -904,10 +902,7 @@ func (m *indexMeta) SetStoredIndexFileSizeMetric(collections map[UniqueID]*colle
|
||||||
return total
|
return total
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segID, indexID, buildID UniqueID) error {
|
func (m *indexMeta) removeSegmentIndex(ctx context.Context, collID, partID, segID, indexID, buildID UniqueID) error {
|
||||||
m.Lock()
|
|
||||||
defer m.Unlock()
|
|
||||||
|
|
||||||
err := m.catalog.DropSegmentIndex(ctx, collID, partID, segID, buildID)
|
err := m.catalog.DropSegmentIndex(ctx, collID, partID, segID, buildID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -925,6 +920,25 @@ func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segI
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *indexMeta) RemoveSegmentIndex(ctx context.Context, collID, partID, segID, indexID, buildID UniqueID) error {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
return m.removeSegmentIndex(ctx, collID, partID, segID, indexID, buildID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *indexMeta) RemoveSegmentIndexByID(ctx context.Context, buildID UniqueID) error {
|
||||||
|
m.Lock()
|
||||||
|
defer m.Unlock()
|
||||||
|
|
||||||
|
segIdx, ok := m.segmentBuildInfo.Get(buildID)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.removeSegmentIndex(ctx, segIdx.CollectionID, segIdx.PartitionID, segIdx.SegmentID, segIdx.IndexID, buildID)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *indexMeta) GetDeletedIndexes() []*model.Index {
|
func (m *indexMeta) GetDeletedIndexes() []*model.Index {
|
||||||
m.RLock()
|
m.RLock()
|
||||||
defer m.RUnlock()
|
defer m.RUnlock()
|
||||||
|
|
|
@ -1505,6 +1505,57 @@ func TestRemoveSegmentIndex(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRemoveSegmentIndexByID(t *testing.T) {
|
||||||
|
t.Run("drop segment index fail", func(t *testing.T) {
|
||||||
|
expectedErr := errors.New("error")
|
||||||
|
catalog := catalogmocks.NewDataCoordCatalog(t)
|
||||||
|
catalog.EXPECT().
|
||||||
|
DropSegmentIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return(expectedErr)
|
||||||
|
|
||||||
|
catalog.EXPECT().CreateSegmentIndex(mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
m := newSegmentIndexMeta(catalog)
|
||||||
|
err := m.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
|
||||||
|
SegmentID: 3,
|
||||||
|
CollectionID: 1,
|
||||||
|
PartitionID: 2,
|
||||||
|
NumRows: 1024,
|
||||||
|
IndexID: 1,
|
||||||
|
BuildID: 4,
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
err = m.RemoveSegmentIndexByID(context.TODO(), 4)
|
||||||
|
assert.Error(t, err)
|
||||||
|
assert.EqualError(t, err, "error")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("remove segment index ok", func(t *testing.T) {
|
||||||
|
catalog := catalogmocks.NewDataCoordCatalog(t)
|
||||||
|
catalog.EXPECT().
|
||||||
|
DropSegmentIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
||||||
|
Return(nil)
|
||||||
|
|
||||||
|
catalog.EXPECT().CreateSegmentIndex(mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
m := newSegmentIndexMeta(catalog)
|
||||||
|
err := m.AddSegmentIndex(context.TODO(), &model.SegmentIndex{
|
||||||
|
SegmentID: 3,
|
||||||
|
CollectionID: 1,
|
||||||
|
PartitionID: 2,
|
||||||
|
NumRows: 1024,
|
||||||
|
IndexID: 1,
|
||||||
|
BuildID: 4,
|
||||||
|
})
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = m.RemoveSegmentIndexByID(context.TODO(), 4)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Equal(t, len(m.segmentIndexes), 0)
|
||||||
|
assert.Equal(t, len(m.segmentBuildInfo.List()), 0)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func TestIndexMeta_GetUnindexedSegments(t *testing.T) {
|
func TestIndexMeta_GetUnindexedSegments(t *testing.T) {
|
||||||
catalog := &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}
|
catalog := &datacoord.Catalog{MetaKv: mockkv.NewMetaKv(t)}
|
||||||
m := createMeta(catalog, withIndexMeta(createIndexMeta(catalog)))
|
m := createMeta(catalog, withIndexMeta(createIndexMeta(catalog)))
|
||||||
|
|
|
@ -19,7 +19,6 @@ package datacoord
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strconv"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -76,25 +75,14 @@ func (stm *statsTaskMeta) updateMetrics() {
|
||||||
stm.RLock()
|
stm.RLock()
|
||||||
defer stm.RUnlock()
|
defer stm.RUnlock()
|
||||||
|
|
||||||
taskMetrics := make(map[UniqueID]map[indexpb.JobState]int)
|
taskMetrics := make(map[indexpb.JobState]int)
|
||||||
for _, t := range stm.tasks {
|
for _, t := range stm.tasks {
|
||||||
if _, ok := taskMetrics[t.GetCollectionID()]; !ok {
|
taskMetrics[t.GetState()]++
|
||||||
taskMetrics[t.GetCollectionID()] = make(map[indexpb.JobState]int)
|
|
||||||
taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateNone] = 0
|
|
||||||
taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateInit] = 0
|
|
||||||
taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateInProgress] = 0
|
|
||||||
taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateFinished] = 0
|
|
||||||
taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateFailed] = 0
|
|
||||||
taskMetrics[t.GetCollectionID()][indexpb.JobState_JobStateRetry] = 0
|
|
||||||
}
|
|
||||||
taskMetrics[t.GetCollectionID()][t.GetState()]++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
jobType := indexpb.JobType_JobTypeStatsJob.String()
|
jobType := indexpb.JobType_JobTypeStatsJob.String()
|
||||||
for collID, m := range taskMetrics {
|
for k, v := range taskMetrics {
|
||||||
for k, v := range m {
|
metrics.TaskNum.WithLabelValues(jobType, k.String()).Set(float64(v))
|
||||||
metrics.TaskNum.WithLabelValues(strconv.FormatInt(collID, 10), jobType, k.String()).Set(float64(v))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -277,14 +277,17 @@ func (at *analyzeTask) QueryResult(ctx context.Context, client types.IndexNodeCl
|
||||||
// infos length is always one.
|
// infos length is always one.
|
||||||
for _, result := range resp.GetAnalyzeJobResults().GetResults() {
|
for _, result := range resp.GetAnalyzeJobResults().GetResults() {
|
||||||
if result.GetTaskID() == at.GetTaskID() {
|
if result.GetTaskID() == at.GetTaskID() {
|
||||||
log.Ctx(ctx).Info("query analysis task info successfully",
|
|
||||||
zap.Int64("taskID", at.GetTaskID()), zap.String("result state", result.GetState().String()),
|
|
||||||
zap.String("failReason", result.GetFailReason()))
|
|
||||||
if result.GetState() == indexpb.JobState_JobStateFinished || result.GetState() == indexpb.JobState_JobStateFailed ||
|
if result.GetState() == indexpb.JobState_JobStateFinished || result.GetState() == indexpb.JobState_JobStateFailed ||
|
||||||
result.GetState() == indexpb.JobState_JobStateRetry {
|
result.GetState() == indexpb.JobState_JobStateRetry {
|
||||||
|
log.Ctx(ctx).Info("query analysis task info successfully",
|
||||||
|
zap.Int64("taskID", at.GetTaskID()), zap.String("result state", result.GetState().String()),
|
||||||
|
zap.String("failReason", result.GetFailReason()))
|
||||||
// state is retry or finished or failed
|
// state is retry or finished or failed
|
||||||
at.setResult(result)
|
at.setResult(result)
|
||||||
} else if result.GetState() == indexpb.JobState_JobStateNone {
|
} else if result.GetState() == indexpb.JobState_JobStateNone {
|
||||||
|
log.Ctx(ctx).Info("query analysis task info successfully",
|
||||||
|
zap.Int64("taskID", at.GetTaskID()), zap.String("result state", result.GetState().String()),
|
||||||
|
zap.String("failReason", result.GetFailReason()))
|
||||||
at.SetState(indexpb.JobState_JobStateRetry, "analyze task state is none in info response")
|
at.SetState(indexpb.JobState_JobStateRetry, "analyze task state is none in info response")
|
||||||
}
|
}
|
||||||
// inProgress or unissued/init, keep InProgress state
|
// inProgress or unissued/init, keep InProgress state
|
||||||
|
@ -320,3 +323,7 @@ func (at *analyzeTask) DropTaskOnWorker(ctx context.Context, client types.IndexN
|
||||||
func (at *analyzeTask) SetJobInfo(meta *meta) error {
|
func (at *analyzeTask) SetJobInfo(meta *meta) error {
|
||||||
return meta.analyzeMeta.FinishTask(at.GetTaskID(), at.taskInfo)
|
return meta.analyzeMeta.FinishTask(at.GetTaskID(), at.taskInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (at *analyzeTask) DropTaskMeta(ctx context.Context, meta *meta) error {
|
||||||
|
return meta.analyzeMeta.DropAnalyzeTask(ctx, at.GetTaskID())
|
||||||
|
}
|
||||||
|
|
|
@ -317,14 +317,17 @@ func (it *indexBuildTask) QueryResult(ctx context.Context, node types.IndexNodeC
|
||||||
// indexInfos length is always one.
|
// indexInfos length is always one.
|
||||||
for _, info := range resp.GetIndexJobResults().GetResults() {
|
for _, info := range resp.GetIndexJobResults().GetResults() {
|
||||||
if info.GetBuildID() == it.GetTaskID() {
|
if info.GetBuildID() == it.GetTaskID() {
|
||||||
log.Ctx(ctx).Info("query task index info successfully",
|
|
||||||
zap.Int64("taskID", it.GetTaskID()), zap.String("result state", info.GetState().String()),
|
|
||||||
zap.String("failReason", info.GetFailReason()))
|
|
||||||
if info.GetState() == commonpb.IndexState_Finished || info.GetState() == commonpb.IndexState_Failed ||
|
if info.GetState() == commonpb.IndexState_Finished || info.GetState() == commonpb.IndexState_Failed ||
|
||||||
info.GetState() == commonpb.IndexState_Retry {
|
info.GetState() == commonpb.IndexState_Retry {
|
||||||
|
log.Ctx(ctx).Info("query task index info successfully",
|
||||||
|
zap.Int64("taskID", it.GetTaskID()), zap.String("result state", info.GetState().String()),
|
||||||
|
zap.String("failReason", info.GetFailReason()))
|
||||||
// state is retry or finished or failed
|
// state is retry or finished or failed
|
||||||
it.setResult(info)
|
it.setResult(info)
|
||||||
} else if info.GetState() == commonpb.IndexState_IndexStateNone {
|
} else if info.GetState() == commonpb.IndexState_IndexStateNone {
|
||||||
|
log.Ctx(ctx).Info("query task index info successfully",
|
||||||
|
zap.Int64("taskID", it.GetTaskID()), zap.String("result state", info.GetState().String()),
|
||||||
|
zap.String("failReason", info.GetFailReason()))
|
||||||
it.SetState(indexpb.JobState_JobStateRetry, "index state is none in info response")
|
it.SetState(indexpb.JobState_JobStateRetry, "index state is none in info response")
|
||||||
}
|
}
|
||||||
// inProgress or unissued, keep InProgress state
|
// inProgress or unissued, keep InProgress state
|
||||||
|
@ -358,3 +361,7 @@ func (it *indexBuildTask) DropTaskOnWorker(ctx context.Context, client types.Ind
|
||||||
func (it *indexBuildTask) SetJobInfo(meta *meta) error {
|
func (it *indexBuildTask) SetJobInfo(meta *meta) error {
|
||||||
return meta.indexMeta.FinishTask(it.taskInfo)
|
return meta.indexMeta.FinishTask(it.taskInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (it *indexBuildTask) DropTaskMeta(ctx context.Context, meta *meta) error {
|
||||||
|
return meta.indexMeta.RemoveSegmentIndexByID(ctx, it.taskID)
|
||||||
|
}
|
||||||
|
|
|
@ -125,6 +125,10 @@ func (s *taskScheduler) reloadFromMeta() {
|
||||||
State: segIndex.IndexState,
|
State: segIndex.IndexState,
|
||||||
FailReason: segIndex.FailReason,
|
FailReason: segIndex.FailReason,
|
||||||
},
|
},
|
||||||
|
req: &workerpb.CreateJobRequest{
|
||||||
|
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
|
||||||
|
BuildID: segIndex.BuildID,
|
||||||
|
},
|
||||||
queueTime: time.Now(),
|
queueTime: time.Now(),
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
endTime: time.Now(),
|
endTime: time.Now(),
|
||||||
|
@ -150,6 +154,10 @@ func (s *taskScheduler) reloadFromMeta() {
|
||||||
State: t.State,
|
State: t.State,
|
||||||
FailReason: t.FailReason,
|
FailReason: t.FailReason,
|
||||||
},
|
},
|
||||||
|
req: &workerpb.AnalyzeRequest{
|
||||||
|
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
|
||||||
|
TaskID: taskID,
|
||||||
|
},
|
||||||
queueTime: time.Now(),
|
queueTime: time.Now(),
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
endTime: time.Now(),
|
endTime: time.Now(),
|
||||||
|
@ -176,6 +184,10 @@ func (s *taskScheduler) reloadFromMeta() {
|
||||||
State: t.GetState(),
|
State: t.GetState(),
|
||||||
FailReason: t.GetFailReason(),
|
FailReason: t.GetFailReason(),
|
||||||
},
|
},
|
||||||
|
req: &workerpb.CreateStatsRequest{
|
||||||
|
ClusterID: Params.CommonCfg.ClusterPrefix.GetValue(),
|
||||||
|
TaskID: taskID,
|
||||||
|
},
|
||||||
queueTime: time.Now(),
|
queueTime: time.Now(),
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
endTime: time.Now(),
|
endTime: time.Now(),
|
||||||
|
@ -333,6 +345,9 @@ func (s *taskScheduler) checkProcessingTasks() {
|
||||||
}
|
}
|
||||||
s.runningQueueLock.RUnlock()
|
s.runningQueueLock.RUnlock()
|
||||||
|
|
||||||
|
if len(runningTaskIDs) <= 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
log.Ctx(s.ctx).Info("check running tasks", zap.Int("runningTask num", len(runningTaskIDs)))
|
log.Ctx(s.ctx).Info("check running tasks", zap.Int("runningTask num", len(runningTaskIDs)))
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
@ -410,7 +425,9 @@ func (s *taskScheduler) run() {
|
||||||
|
|
||||||
switch task.GetState() {
|
switch task.GetState() {
|
||||||
case indexpb.JobState_JobStateNone:
|
case indexpb.JobState_JobStateNone:
|
||||||
return
|
if !s.processNone(task) {
|
||||||
|
s.pendingTasks.Push(task)
|
||||||
|
}
|
||||||
case indexpb.JobState_JobStateInit:
|
case indexpb.JobState_JobStateInit:
|
||||||
s.pendingTasks.Push(task)
|
s.pendingTasks.Push(task)
|
||||||
default:
|
default:
|
||||||
|
@ -433,7 +450,7 @@ func (s *taskScheduler) process(task Task, nodeID int64) bool {
|
||||||
|
|
||||||
switch task.GetState() {
|
switch task.GetState() {
|
||||||
case indexpb.JobState_JobStateNone:
|
case indexpb.JobState_JobStateNone:
|
||||||
return true
|
return s.processNone(task)
|
||||||
case indexpb.JobState_JobStateInit:
|
case indexpb.JobState_JobStateInit:
|
||||||
return s.processInit(task, nodeID)
|
return s.processInit(task, nodeID)
|
||||||
default:
|
default:
|
||||||
|
@ -577,6 +594,14 @@ func (s *taskScheduler) processInit(task Task, nodeID int64) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *taskScheduler) processNone(task Task) bool {
|
||||||
|
if err := task.DropTaskMeta(s.ctx, s.meta); err != nil {
|
||||||
|
log.Ctx(s.ctx).Warn("set job info failed", zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (s *taskScheduler) processFinished(task Task) bool {
|
func (s *taskScheduler) processFinished(task Task) bool {
|
||||||
if err := task.SetJobInfo(s.meta); err != nil {
|
if err := task.SetJobInfo(s.meta); err != nil {
|
||||||
log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err))
|
log.Ctx(s.ctx).Warn("update task info failed", zap.Error(err))
|
||||||
|
|
|
@ -783,6 +783,9 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
|
||||||
})
|
})
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
catalog.EXPECT().DropSegmentIndex(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||||
|
catalog.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||||
|
catalog.EXPECT().DropAnalyzeTask(mock.Anything, mock.Anything).Return(nil).Maybe()
|
||||||
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)
|
catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil)
|
||||||
// catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil)
|
// catalog.EXPECT().SaveStatsTask(mock.Anything, mock.Anything).Return(nil)
|
||||||
|
|
||||||
|
@ -951,10 +954,9 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) {
|
||||||
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 8)
|
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 8)
|
||||||
s.True(exist)
|
s.True(exist)
|
||||||
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
|
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
|
||||||
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 9)
|
_, exist = mt.indexMeta.GetIndexJob(buildID + 9)
|
||||||
s.True(exist)
|
s.False(exist)
|
||||||
// segment not healthy, wait for GC
|
// segment not healthy, remove task
|
||||||
s.Equal(commonpb.IndexState_Unissued, indexJob.IndexState)
|
|
||||||
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 10)
|
indexJob, exist = mt.indexMeta.GetIndexJob(buildID + 10)
|
||||||
s.True(exist)
|
s.True(exist)
|
||||||
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
|
s.Equal(commonpb.IndexState_Finished, indexJob.IndexState)
|
||||||
|
|
|
@ -283,13 +283,16 @@ func (st *statsTask) QueryResult(ctx context.Context, client types.IndexNodeClie
|
||||||
|
|
||||||
for _, result := range resp.GetStatsJobResults().GetResults() {
|
for _, result := range resp.GetStatsJobResults().GetResults() {
|
||||||
if result.GetTaskID() == st.GetTaskID() {
|
if result.GetTaskID() == st.GetTaskID() {
|
||||||
log.Ctx(ctx).Info("query stats task result success", zap.Int64("taskID", st.GetTaskID()),
|
|
||||||
zap.Int64("segmentID", st.segmentID), zap.String("result state", result.GetState().String()),
|
|
||||||
zap.String("failReason", result.GetFailReason()))
|
|
||||||
if result.GetState() == indexpb.JobState_JobStateFinished || result.GetState() == indexpb.JobState_JobStateRetry ||
|
if result.GetState() == indexpb.JobState_JobStateFinished || result.GetState() == indexpb.JobState_JobStateRetry ||
|
||||||
result.GetState() == indexpb.JobState_JobStateFailed {
|
result.GetState() == indexpb.JobState_JobStateFailed {
|
||||||
|
log.Ctx(ctx).Info("query stats task result success", zap.Int64("taskID", st.GetTaskID()),
|
||||||
|
zap.Int64("segmentID", st.segmentID), zap.String("result state", result.GetState().String()),
|
||||||
|
zap.String("failReason", result.GetFailReason()))
|
||||||
st.setResult(result)
|
st.setResult(result)
|
||||||
} else if result.GetState() == indexpb.JobState_JobStateNone {
|
} else if result.GetState() == indexpb.JobState_JobStateNone {
|
||||||
|
log.Ctx(ctx).Info("query stats task result success", zap.Int64("taskID", st.GetTaskID()),
|
||||||
|
zap.Int64("segmentID", st.segmentID), zap.String("result state", result.GetState().String()),
|
||||||
|
zap.String("failReason", result.GetFailReason()))
|
||||||
st.SetState(indexpb.JobState_JobStateRetry, "stats task state is none in info response")
|
st.SetState(indexpb.JobState_JobStateRetry, "stats task state is none in info response")
|
||||||
}
|
}
|
||||||
// inProgress or unissued/init, keep InProgress state
|
// inProgress or unissued/init, keep InProgress state
|
||||||
|
@ -367,3 +370,12 @@ func (st *statsTask) SetJobInfo(meta *meta) error {
|
||||||
zap.String("subJobType", st.subJobType.String()), zap.String("state", st.taskInfo.GetState().String()))
|
zap.String("subJobType", st.subJobType.String()), zap.String("state", st.taskInfo.GetState().String()))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (st *statsTask) DropTaskMeta(ctx context.Context, meta *meta) error {
|
||||||
|
if err := meta.statsTaskMeta.DropStatsTask(st.taskID); err != nil {
|
||||||
|
log.Ctx(ctx).Warn("drop stats task failed", zap.Int64("taskID", st.taskID), zap.Error(err))
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
log.Ctx(ctx).Info("drop stats task success", zap.Int64("taskID", st.taskID))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
|
@ -46,4 +46,5 @@ type Task interface {
|
||||||
SetEndTime(time.Time)
|
SetEndTime(time.Time)
|
||||||
GetEndTime() time.Time
|
GetEndTime() time.Time
|
||||||
GetTaskType() string
|
GetTaskType() string
|
||||||
|
DropTaskMeta(ctx context.Context, meta *meta) error
|
||||||
}
|
}
|
||||||
|
|
|
@ -310,6 +310,7 @@ var (
|
||||||
}, []string{statusLabelName})
|
}, []string{statusLabelName})
|
||||||
|
|
||||||
// IndexTaskNum records the number of index tasks of each type.
|
// IndexTaskNum records the number of index tasks of each type.
|
||||||
|
// Deprecated: please ues TaskNum after v2.5.5.
|
||||||
IndexTaskNum = prometheus.NewGaugeVec(
|
IndexTaskNum = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
Namespace: milvusNamespace,
|
Namespace: milvusNamespace,
|
||||||
|
@ -343,7 +344,7 @@ var (
|
||||||
Help: "latency of task execute operation",
|
Help: "latency of task execute operation",
|
||||||
Buckets: longTaskBuckets,
|
Buckets: longTaskBuckets,
|
||||||
}, []string{
|
}, []string{
|
||||||
taskTypeLabel,
|
TaskTypeLabel,
|
||||||
statusLabelName,
|
statusLabelName,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -354,7 +355,7 @@ var (
|
||||||
Subsystem: typeutil.DataCoordRole,
|
Subsystem: typeutil.DataCoordRole,
|
||||||
Name: "task_count",
|
Name: "task_count",
|
||||||
Help: "number of index tasks of each type",
|
Help: "number of index tasks of each type",
|
||||||
}, []string{collectionIDLabelName, taskTypeLabel, taskStateLabel})
|
}, []string{TaskTypeLabel, TaskStateLabel})
|
||||||
)
|
)
|
||||||
|
|
||||||
// RegisterDataCoord registers DataCoord metrics
|
// RegisterDataCoord registers DataCoord metrics
|
||||||
|
|
|
@ -131,8 +131,8 @@ const (
|
||||||
LoadedLabel = "loaded"
|
LoadedLabel = "loaded"
|
||||||
NumEntitiesAllLabel = "all"
|
NumEntitiesAllLabel = "all"
|
||||||
|
|
||||||
taskTypeLabel = "task_type"
|
TaskTypeLabel = "task_type"
|
||||||
taskStateLabel = "task_state"
|
TaskStateLabel = "task_state"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
|
@ -444,7 +444,7 @@ var (
|
||||||
Subsystem: typeutil.ProxyRole,
|
Subsystem: typeutil.ProxyRole,
|
||||||
Name: "queue_task_num",
|
Name: "queue_task_num",
|
||||||
Help: "",
|
Help: "",
|
||||||
}, []string{nodeIDLabelName, queueTypeLabelName, taskStateLabel})
|
}, []string{nodeIDLabelName, queueTypeLabelName, TaskStateLabel})
|
||||||
)
|
)
|
||||||
|
|
||||||
// RegisterProxy registers Proxy metrics
|
// RegisterProxy registers Proxy metrics
|
||||||
|
|
|
@ -132,7 +132,7 @@ var (
|
||||||
Name: "task_latency",
|
Name: "task_latency",
|
||||||
Help: "latency of all kind of task in query coord scheduler scheduler",
|
Help: "latency of all kind of task in query coord scheduler scheduler",
|
||||||
Buckets: longTaskBuckets,
|
Buckets: longTaskBuckets,
|
||||||
}, []string{collectionIDLabelName, taskTypeLabel, channelNameLabelName})
|
}, []string{collectionIDLabelName, TaskTypeLabel, channelNameLabelName})
|
||||||
|
|
||||||
QueryCoordResourceGroupInfo = prometheus.NewGaugeVec(
|
QueryCoordResourceGroupInfo = prometheus.NewGaugeVec(
|
||||||
prometheus.GaugeOpts{
|
prometheus.GaugeOpts{
|
||||||
|
|
Loading…
Reference in New Issue