milvus/internal/datacoord/stats_task_meta.go

347 lines
11 KiB
Go

// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"context"
"fmt"
"strconv"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/pkg/v2/log"
"github.com/milvus-io/milvus/pkg/v2/metrics"
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
"github.com/milvus-io/milvus/pkg/v2/util/lock"
"github.com/milvus-io/milvus/pkg/v2/util/merr"
"github.com/milvus-io/milvus/pkg/v2/util/timerecord"
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
)
type statsTaskMeta struct {
ctx context.Context
catalog metastore.DataCoordCatalog
keyLock *lock.KeyLock[UniqueID]
// taskID -> statsTask
tasks *typeutil.ConcurrentMap[UniqueID, *indexpb.StatsTask]
// segmentID + SubJobType -> statsTask
segmentID2Tasks *typeutil.ConcurrentMap[string, *indexpb.StatsTask]
}
func newStatsTaskMeta(ctx context.Context, catalog metastore.DataCoordCatalog) (*statsTaskMeta, error) {
stm := &statsTaskMeta{
ctx: ctx,
catalog: catalog,
keyLock: lock.NewKeyLock[UniqueID](),
tasks: typeutil.NewConcurrentMap[UniqueID, *indexpb.StatsTask](),
segmentID2Tasks: typeutil.NewConcurrentMap[string, *indexpb.StatsTask](),
}
if err := stm.reloadFromKV(); err != nil {
return nil, err
}
return stm, nil
}
func createSecondaryIndexKey(segmentID UniqueID, subJobType string) string {
return strconv.FormatUint(uint64(segmentID), 10) + "-" + subJobType
}
func (stm *statsTaskMeta) reloadFromKV() error {
record := timerecord.NewTimeRecorder("statsTaskMeta-reloadFromKV")
// load stats task
statsTasks, err := stm.catalog.ListStatsTasks(stm.ctx)
if err != nil {
log.Error("statsTaskMeta reloadFromKV load stats tasks failed", zap.Error(err))
return err
}
for _, t := range statsTasks {
stm.tasks.Insert(t.GetTaskID(), t)
secondaryKey := createSecondaryIndexKey(t.GetSegmentID(), t.GetSubJobType().String())
stm.segmentID2Tasks.Insert(secondaryKey, t)
}
log.Info("statsTaskMeta reloadFromKV done", zap.Duration("duration", record.ElapseSpan()))
return nil
}
func (stm *statsTaskMeta) updateMetrics() {
taskMetrics := make(map[indexpb.JobState]int)
taskMetrics[indexpb.JobState_JobStateNone] = 0
taskMetrics[indexpb.JobState_JobStateInit] = 0
taskMetrics[indexpb.JobState_JobStateInProgress] = 0
taskMetrics[indexpb.JobState_JobStateFinished] = 0
taskMetrics[indexpb.JobState_JobStateFailed] = 0
taskMetrics[indexpb.JobState_JobStateRetry] = 0
allTasks := stm.tasks.Values()
for _, t := range allTasks {
taskMetrics[t.GetState()]++
}
jobType := indexpb.JobType_JobTypeStatsJob.String()
for k, v := range taskMetrics {
metrics.TaskNum.WithLabelValues(jobType, k.String()).Set(float64(v))
}
}
func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error {
taskID := t.GetTaskID()
secondaryKey := createSecondaryIndexKey(t.GetSegmentID(), t.GetSubJobType().String())
task, alreadyExist := stm.segmentID2Tasks.Get(secondaryKey)
if alreadyExist {
msg := fmt.Sprintf("stats task already exist in meta of segment %d with subJobType: %s",
t.GetSegmentID(), t.GetSubJobType().String())
log.RatedWarn(10, msg, zap.Int64("taskID", t.GetTaskID()), zap.Int64("exist taskID", task.GetTaskID()))
return merr.WrapErrTaskDuplicate(indexpb.JobType_JobTypeStatsJob.String(), msg)
}
stm.keyLock.Lock(taskID)
defer stm.keyLock.Unlock(taskID)
log.Info("add stats task", zap.Int64("taskID", t.GetTaskID()), zap.Int64("originSegmentID", t.GetSegmentID()),
zap.Int64("targetSegmentID", t.GetTargetSegmentID()), zap.String("subJobType", t.GetSubJobType().String()))
t.State = indexpb.JobState_JobStateInit
if err := stm.catalog.SaveStatsTask(stm.ctx, t); err != nil {
log.Warn("adding stats task failed",
zap.Int64("taskID", taskID),
zap.Int64("segmentID", t.GetSegmentID()),
zap.String("subJobType", t.GetSubJobType().String()),
zap.Error(err))
return err
}
stm.tasks.Insert(taskID, t)
stm.segmentID2Tasks.Insert(secondaryKey, t)
log.Info("add stats task success", zap.Int64("taskID", t.GetTaskID()), zap.Int64("originSegmentID", t.GetSegmentID()),
zap.Int64("targetSegmentID", t.GetTargetSegmentID()), zap.String("subJobType", t.GetSubJobType().String()))
return nil
}
func (stm *statsTaskMeta) DropStatsTask(taskID int64) error {
stm.keyLock.Lock(taskID)
defer stm.keyLock.Unlock(taskID)
log.Info("drop stats task by taskID", zap.Int64("taskID", taskID))
t, ok := stm.tasks.Get(taskID)
if !ok {
log.Info("remove stats task success, task already not exist", zap.Int64("taskID", taskID))
return nil
}
if err := stm.catalog.DropStatsTask(stm.ctx, taskID); err != nil {
log.Warn("drop stats task failed",
zap.Int64("taskID", taskID),
zap.Int64("segmentID", taskID),
zap.Error(err))
return err
}
stm.tasks.Remove(taskID)
secondaryKey := createSecondaryIndexKey(t.GetSegmentID(), t.GetSubJobType().String())
stm.segmentID2Tasks.Remove(secondaryKey)
log.Info("remove stats task success", zap.Int64("taskID", taskID))
return nil
}
func (stm *statsTaskMeta) UpdateVersion(taskID, nodeID int64) error {
stm.keyLock.Lock(taskID)
defer stm.keyLock.Unlock(taskID)
t, ok := stm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
cloneT := proto.Clone(t).(*indexpb.StatsTask)
cloneT.Version++
cloneT.NodeID = nodeID
if err := stm.catalog.SaveStatsTask(stm.ctx, cloneT); err != nil {
log.Warn("update stats task version failed",
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("segmentID", t.GetSegmentID()),
zap.Int64("nodeID", nodeID),
zap.Error(err))
return err
}
stm.tasks.Insert(taskID, cloneT)
secondaryKey := createSecondaryIndexKey(t.GetSegmentID(), t.GetSubJobType().String())
stm.segmentID2Tasks.Insert(secondaryKey, cloneT)
log.Info("update stats task version success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID),
zap.Int64("newVersion", cloneT.GetVersion()))
return nil
}
func (stm *statsTaskMeta) UpdateBuildingTask(taskID int64) error {
stm.keyLock.Lock(taskID)
defer stm.keyLock.Unlock(taskID)
t, ok := stm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
cloneT := proto.Clone(t).(*indexpb.StatsTask)
cloneT.State = indexpb.JobState_JobStateInProgress
if err := stm.catalog.SaveStatsTask(stm.ctx, cloneT); err != nil {
log.Warn("update stats task state building failed",
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("segmentID", t.GetSegmentID()),
zap.Error(err))
return err
}
stm.tasks.Insert(taskID, cloneT)
secondaryKey := createSecondaryIndexKey(t.GetSegmentID(), t.GetSubJobType().String())
stm.segmentID2Tasks.Insert(secondaryKey, cloneT)
log.Info("update building stats task success", zap.Int64("taskID", taskID))
return nil
}
func (stm *statsTaskMeta) FinishTask(taskID int64, result *workerpb.StatsResult) error {
stm.keyLock.Lock(taskID)
defer stm.keyLock.Unlock(taskID)
t, ok := stm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
cloneT := proto.Clone(t).(*indexpb.StatsTask)
cloneT.State = result.GetState()
cloneT.FailReason = result.GetFailReason()
if err := stm.catalog.SaveStatsTask(stm.ctx, cloneT); err != nil {
log.Warn("finish stats task state failed",
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("segmentID", t.GetSegmentID()),
zap.Error(err))
return err
}
stm.tasks.Insert(taskID, cloneT)
secondaryKey := createSecondaryIndexKey(t.GetSegmentID(), t.GetSubJobType().String())
stm.segmentID2Tasks.Insert(secondaryKey, cloneT)
log.Info("finish stats task meta success", zap.Int64("taskID", taskID), zap.Int64("segmentID", t.SegmentID),
zap.String("state", result.GetState().String()), zap.String("failReason", t.GetFailReason()))
return nil
}
func (stm *statsTaskMeta) GetStatsTask(taskID int64) *indexpb.StatsTask {
t, _ := stm.tasks.Get(taskID)
return t
}
func (stm *statsTaskMeta) GetStatsTaskState(taskID int64) indexpb.JobState {
t, ok := stm.tasks.Get(taskID)
if !ok {
return indexpb.JobState_JobStateNone
}
return t.GetState()
}
func (stm *statsTaskMeta) GetStatsTaskStateBySegmentID(segmentID int64, subJobType indexpb.StatsSubJob) indexpb.JobState {
state := indexpb.JobState_JobStateNone
secondaryKey := createSecondaryIndexKey(segmentID, subJobType.String())
t, exists := stm.segmentID2Tasks.Get(secondaryKey)
if exists {
state = t.GetState()
}
return state
}
func (stm *statsTaskMeta) CanCleanedTasks() []int64 {
needCleanedTaskIDs := make([]int64, 0)
stm.tasks.Range(func(key UniqueID, value *indexpb.StatsTask) bool {
if value.GetCanRecycle() && (value.GetState() == indexpb.JobState_JobStateFinished ||
value.GetState() == indexpb.JobState_JobStateFailed) {
needCleanedTaskIDs = append(needCleanedTaskIDs, key)
}
return true
})
return needCleanedTaskIDs
}
func (stm *statsTaskMeta) GetAllTasks() map[int64]*indexpb.StatsTask {
tasks := make(map[int64]*indexpb.StatsTask)
allTasks := stm.tasks.Values()
for _, v := range allTasks {
tasks[v.GetTaskID()] = proto.Clone(v).(*indexpb.StatsTask)
}
return tasks
}
func (stm *statsTaskMeta) GetStatsTaskBySegmentID(segmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask {
secondaryKey := createSecondaryIndexKey(segmentID, subJobType.String())
t, exists := stm.segmentID2Tasks.Get(secondaryKey)
if exists {
log.Info("get stats task by segmentID success",
zap.Int64("taskID", t.GetTaskID()),
zap.Int64("segmentID", segmentID),
zap.String("subJobType", subJobType.String()))
return t
}
log.Info("get stats task by segmentID failed, task not exist", zap.Int64("segmentID", segmentID),
zap.String("subJobType", subJobType.String()))
return nil
}
func (stm *statsTaskMeta) MarkTaskCanRecycle(taskID int64) error {
log.Info("mark stats task can recycle", zap.Int64("taskID", taskID))
t, ok := stm.tasks.Get(taskID)
if !ok {
return fmt.Errorf("task %d not found", taskID)
}
cloneT := proto.Clone(t).(*indexpb.StatsTask)
cloneT.CanRecycle = true
if err := stm.catalog.SaveStatsTask(stm.ctx, cloneT); err != nil {
log.Warn("mark stats task can recycle failed",
zap.Int64("taskID", taskID),
zap.Int64("segmentID", t.GetSegmentID()),
zap.Error(err))
return err
}
stm.tasks.Insert(taskID, cloneT)
secondaryKey := createSecondaryIndexKey(t.GetSegmentID(), t.GetSubJobType().String())
stm.segmentID2Tasks.Insert(secondaryKey, cloneT)
log.Info("mark stats task can recycle success", zap.Int64("taskID", taskID),
zap.Int64("segmentID", t.SegmentID),
zap.String("subJobType", t.GetSubJobType().String()))
return nil
}