mirror of https://github.com/milvus-io/milvus.git
600 lines
17 KiB
Go
600 lines
17 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package index
|
|
|
|
import "C"
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/common"
|
|
"github.com/milvus-io/milvus/pkg/v2/log"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/datapb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/indexpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/proto/workerpb"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/paramtable"
|
|
"github.com/milvus-io/milvus/pkg/v2/util/typeutil"
|
|
)
|
|
|
|
type IndexTaskInfo struct {
|
|
Cancel context.CancelFunc
|
|
State commonpb.IndexState
|
|
FileKeys []string
|
|
SerializedSize uint64
|
|
MemSize uint64
|
|
FailReason string
|
|
CurrentIndexVersion int32
|
|
IndexStoreVersion int64
|
|
CurrentScalarIndexVersion int32
|
|
|
|
// task statistics
|
|
statistic *indexpb.JobInfo
|
|
}
|
|
|
|
func (i *IndexTaskInfo) Clone() *IndexTaskInfo {
|
|
return &IndexTaskInfo{
|
|
Cancel: i.Cancel,
|
|
State: i.State,
|
|
FileKeys: common.CloneStringList(i.FileKeys),
|
|
SerializedSize: i.SerializedSize,
|
|
MemSize: i.MemSize,
|
|
FailReason: i.FailReason,
|
|
CurrentIndexVersion: i.CurrentIndexVersion,
|
|
IndexStoreVersion: i.IndexStoreVersion,
|
|
CurrentScalarIndexVersion: i.CurrentScalarIndexVersion,
|
|
statistic: typeutil.Clone(i.statistic),
|
|
}
|
|
}
|
|
|
|
func (i *IndexTaskInfo) ToIndexTaskInfo(buildID int64) *workerpb.IndexTaskInfo {
|
|
return &workerpb.IndexTaskInfo{
|
|
BuildID: buildID,
|
|
State: i.State,
|
|
IndexFileKeys: i.FileKeys,
|
|
SerializedSize: i.SerializedSize,
|
|
MemSize: i.MemSize,
|
|
FailReason: i.FailReason,
|
|
CurrentIndexVersion: i.CurrentIndexVersion,
|
|
IndexStoreVersion: i.IndexStoreVersion,
|
|
CurrentScalarIndexVersion: i.CurrentScalarIndexVersion,
|
|
}
|
|
}
|
|
|
|
type TaskManager struct {
|
|
ctx context.Context
|
|
stateLock sync.Mutex
|
|
indexTasks map[Key]*IndexTaskInfo
|
|
analyzeTasks map[Key]*AnalyzeTaskInfo
|
|
statsTasks map[Key]*StatsTaskInfo
|
|
}
|
|
|
|
func NewTaskManager(ctx context.Context) *TaskManager {
|
|
return &TaskManager{
|
|
ctx: ctx,
|
|
indexTasks: make(map[Key]*IndexTaskInfo),
|
|
analyzeTasks: make(map[Key]*AnalyzeTaskInfo),
|
|
statsTasks: make(map[Key]*StatsTaskInfo),
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) LoadOrStoreIndexTask(ClusterID string, buildID typeutil.UniqueID, info *IndexTaskInfo) *IndexTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
key := Key{ClusterID: ClusterID, TaskID: buildID}
|
|
oldInfo, ok := m.indexTasks[key]
|
|
if ok {
|
|
return oldInfo
|
|
}
|
|
m.indexTasks[key] = info
|
|
return nil
|
|
}
|
|
|
|
func (m *TaskManager) LoadIndexTaskState(ClusterID string, buildID typeutil.UniqueID) commonpb.IndexState {
|
|
key := Key{ClusterID: ClusterID, TaskID: buildID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
task, ok := m.indexTasks[key]
|
|
if !ok {
|
|
return commonpb.IndexState_IndexStateNone
|
|
}
|
|
return task.State
|
|
}
|
|
|
|
func (m *TaskManager) StoreIndexTaskState(ClusterID string, buildID typeutil.UniqueID, state commonpb.IndexState, failReason string) {
|
|
key := Key{ClusterID: ClusterID, TaskID: buildID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if task, ok := m.indexTasks[key]; ok {
|
|
log.Ctx(m.ctx).Debug("store task state", zap.String("clusterID", ClusterID), zap.Int64("buildID", buildID),
|
|
zap.String("state", state.String()), zap.String("fail reason", failReason))
|
|
task.State = state
|
|
task.FailReason = failReason
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) ForeachIndexTaskInfo(fn func(ClusterID string, buildID typeutil.UniqueID, info *IndexTaskInfo)) {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
for key, info := range m.indexTasks {
|
|
fn(key.ClusterID, key.TaskID, info)
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) StoreIndexFilesAndStatistic(
|
|
ClusterID string,
|
|
buildID typeutil.UniqueID,
|
|
fileKeys []string,
|
|
serializedSize uint64,
|
|
memSize uint64,
|
|
currentIndexVersion int32,
|
|
currentScalarIndexVersion int32,
|
|
) {
|
|
key := Key{ClusterID: ClusterID, TaskID: buildID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if info, ok := m.indexTasks[key]; ok {
|
|
info.FileKeys = common.CloneStringList(fileKeys)
|
|
info.SerializedSize = serializedSize
|
|
info.MemSize = memSize
|
|
info.CurrentIndexVersion = currentIndexVersion
|
|
info.CurrentScalarIndexVersion = currentScalarIndexVersion
|
|
return
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) DeleteIndexTaskInfos(ctx context.Context, keys []Key) []*IndexTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
deleted := make([]*IndexTaskInfo, 0, len(keys))
|
|
for _, key := range keys {
|
|
info, ok := m.indexTasks[key]
|
|
if ok {
|
|
deleted = append(deleted, info)
|
|
delete(m.indexTasks, key)
|
|
log.Ctx(ctx).Info("delete task infos",
|
|
zap.String("cluster_id", key.ClusterID), zap.Int64("build_id", key.TaskID))
|
|
}
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
func (m *TaskManager) deleteAllIndexTasks() []*IndexTaskInfo {
|
|
m.stateLock.Lock()
|
|
deletedTasks := m.indexTasks
|
|
m.indexTasks = make(map[Key]*IndexTaskInfo)
|
|
m.stateLock.Unlock()
|
|
|
|
deleted := make([]*IndexTaskInfo, 0, len(deletedTasks))
|
|
for _, info := range deletedTasks {
|
|
deleted = append(deleted, info)
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
type AnalyzeTaskInfo struct {
|
|
Cancel context.CancelFunc
|
|
State indexpb.JobState
|
|
FailReason string
|
|
CentroidsFile string
|
|
}
|
|
|
|
func (m *TaskManager) LoadOrStoreAnalyzeTask(clusterID string, taskID typeutil.UniqueID, info *AnalyzeTaskInfo) *AnalyzeTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
key := Key{ClusterID: clusterID, TaskID: taskID}
|
|
oldInfo, ok := m.analyzeTasks[key]
|
|
if ok {
|
|
return oldInfo
|
|
}
|
|
m.analyzeTasks[key] = info
|
|
return nil
|
|
}
|
|
|
|
func (m *TaskManager) LoadAnalyzeTaskState(clusterID string, taskID typeutil.UniqueID) indexpb.JobState {
|
|
key := Key{ClusterID: clusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
task, ok := m.analyzeTasks[key]
|
|
if !ok {
|
|
return indexpb.JobState_JobStateNone
|
|
}
|
|
return task.State
|
|
}
|
|
|
|
func (m *TaskManager) StoreAnalyzeTaskState(clusterID string, taskID typeutil.UniqueID, state indexpb.JobState, failReason string) {
|
|
key := Key{ClusterID: clusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if task, ok := m.analyzeTasks[key]; ok {
|
|
log.Info("store analyze task state", zap.String("clusterID", clusterID), zap.Int64("TaskID", taskID),
|
|
zap.String("state", state.String()), zap.String("fail reason", failReason))
|
|
task.State = state
|
|
task.FailReason = failReason
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) StoreAnalyzeFilesAndStatistic(
|
|
ClusterID string,
|
|
taskID typeutil.UniqueID,
|
|
centroidsFile string,
|
|
) {
|
|
key := Key{ClusterID: ClusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if info, ok := m.analyzeTasks[key]; ok {
|
|
info.CentroidsFile = centroidsFile
|
|
return
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) GetAnalyzeTaskInfo(clusterID string, taskID typeutil.UniqueID) *AnalyzeTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
|
|
if info, ok := m.analyzeTasks[Key{ClusterID: clusterID, TaskID: taskID}]; ok {
|
|
return &AnalyzeTaskInfo{
|
|
Cancel: info.Cancel,
|
|
State: info.State,
|
|
FailReason: info.FailReason,
|
|
CentroidsFile: info.CentroidsFile,
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *TaskManager) DeleteAnalyzeTaskInfos(ctx context.Context, keys []Key) []*AnalyzeTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
deleted := make([]*AnalyzeTaskInfo, 0, len(keys))
|
|
for _, key := range keys {
|
|
info, ok := m.analyzeTasks[key]
|
|
if ok {
|
|
deleted = append(deleted, info)
|
|
delete(m.analyzeTasks, key)
|
|
log.Ctx(ctx).Info("delete analyze task infos",
|
|
zap.String("clusterID", key.ClusterID), zap.Int64("TaskID", key.TaskID))
|
|
}
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
func (m *TaskManager) deleteAllAnalyzeTasks() []*AnalyzeTaskInfo {
|
|
m.stateLock.Lock()
|
|
deletedTasks := m.analyzeTasks
|
|
m.analyzeTasks = make(map[Key]*AnalyzeTaskInfo)
|
|
m.stateLock.Unlock()
|
|
|
|
deleted := make([]*AnalyzeTaskInfo, 0, len(deletedTasks))
|
|
for _, info := range deletedTasks {
|
|
deleted = append(deleted, info)
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
func (m *TaskManager) HasInProgressTask() bool {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
for _, info := range m.indexTasks {
|
|
if info.State == commonpb.IndexState_InProgress {
|
|
return true
|
|
}
|
|
}
|
|
|
|
for _, info := range m.analyzeTasks {
|
|
if info.State == indexpb.JobState_JobStateInProgress {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *TaskManager) WaitTaskFinish() {
|
|
if !m.HasInProgressTask() {
|
|
return
|
|
}
|
|
|
|
gracefulTimeout := ¶mtable.Get().DataNodeCfg.GracefulStopTimeout
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
timeoutCtx, cancel := context.WithTimeout(m.ctx, gracefulTimeout.GetAsDuration(time.Second))
|
|
defer cancel()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if !m.HasInProgressTask() {
|
|
return
|
|
}
|
|
case <-timeoutCtx.Done():
|
|
log.Warn("timeout, the index node has some progress task")
|
|
for _, info := range m.indexTasks {
|
|
if info.State == commonpb.IndexState_InProgress {
|
|
log.Warn("progress task", zap.Any("info", info))
|
|
}
|
|
}
|
|
for _, info := range m.analyzeTasks {
|
|
if info.State == indexpb.JobState_JobStateInProgress {
|
|
log.Warn("progress task", zap.Any("info", info))
|
|
}
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type StatsTaskInfo struct {
|
|
Cancel context.CancelFunc
|
|
State indexpb.JobState
|
|
FailReason string
|
|
CollID typeutil.UniqueID
|
|
PartID typeutil.UniqueID
|
|
SegID typeutil.UniqueID
|
|
InsertChannel string
|
|
NumRows int64
|
|
InsertLogs []*datapb.FieldBinlog
|
|
StatsLogs []*datapb.FieldBinlog
|
|
TextStatsLogs map[int64]*datapb.TextIndexStats
|
|
Bm25Logs []*datapb.FieldBinlog
|
|
JSONKeyStatsLogs map[int64]*datapb.JsonKeyStats
|
|
}
|
|
|
|
func (s *StatsTaskInfo) Clone() *StatsTaskInfo {
|
|
return &StatsTaskInfo{
|
|
Cancel: s.Cancel,
|
|
State: s.State,
|
|
FailReason: s.FailReason,
|
|
CollID: s.CollID,
|
|
PartID: s.PartID,
|
|
SegID: s.SegID,
|
|
InsertChannel: s.InsertChannel,
|
|
NumRows: s.NumRows,
|
|
InsertLogs: s.CloneInsertLogs(),
|
|
StatsLogs: s.CloneStatsLogs(),
|
|
TextStatsLogs: s.CloneTextStatsLogs(),
|
|
Bm25Logs: s.CloneBm25Logs(),
|
|
JSONKeyStatsLogs: s.CloneJSONKeyStatsLogs(),
|
|
}
|
|
}
|
|
|
|
func (s *StatsTaskInfo) ToStatsResult(taskID int64) *workerpb.StatsResult {
|
|
return &workerpb.StatsResult{
|
|
TaskID: taskID,
|
|
State: s.State,
|
|
FailReason: s.FailReason,
|
|
CollectionID: s.CollID,
|
|
PartitionID: s.PartID,
|
|
SegmentID: s.SegID,
|
|
Channel: s.InsertChannel,
|
|
InsertLogs: s.InsertLogs,
|
|
StatsLogs: s.StatsLogs,
|
|
TextStatsLogs: s.TextStatsLogs,
|
|
Bm25Logs: s.Bm25Logs,
|
|
NumRows: s.NumRows,
|
|
JsonKeyStatsLogs: s.JSONKeyStatsLogs,
|
|
}
|
|
}
|
|
|
|
func (s *StatsTaskInfo) CloneInsertLogs() []*datapb.FieldBinlog {
|
|
clone := make([]*datapb.FieldBinlog, len(s.InsertLogs))
|
|
for i, log := range s.InsertLogs {
|
|
clone[i] = typeutil.Clone(log)
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func (s *StatsTaskInfo) CloneStatsLogs() []*datapb.FieldBinlog {
|
|
clone := make([]*datapb.FieldBinlog, len(s.StatsLogs))
|
|
for i, log := range s.StatsLogs {
|
|
clone[i] = typeutil.Clone(log)
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func (s *StatsTaskInfo) CloneTextStatsLogs() map[int64]*datapb.TextIndexStats {
|
|
clone := make(map[int64]*datapb.TextIndexStats)
|
|
for k, v := range s.TextStatsLogs {
|
|
clone[k] = typeutil.Clone(v)
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func (s *StatsTaskInfo) CloneBm25Logs() []*datapb.FieldBinlog {
|
|
clone := make([]*datapb.FieldBinlog, len(s.Bm25Logs))
|
|
for i, log := range s.Bm25Logs {
|
|
clone[i] = typeutil.Clone(log)
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func (s *StatsTaskInfo) CloneJSONKeyStatsLogs() map[int64]*datapb.JsonKeyStats {
|
|
clone := make(map[int64]*datapb.JsonKeyStats)
|
|
for k, v := range s.JSONKeyStatsLogs {
|
|
clone[k] = typeutil.Clone(v)
|
|
}
|
|
return clone
|
|
}
|
|
|
|
func (m *TaskManager) LoadOrStoreStatsTask(clusterID string, taskID typeutil.UniqueID, info *StatsTaskInfo) *StatsTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
key := Key{ClusterID: clusterID, TaskID: taskID}
|
|
oldInfo, ok := m.statsTasks[key]
|
|
if ok {
|
|
return oldInfo
|
|
}
|
|
m.statsTasks[key] = info
|
|
return nil
|
|
}
|
|
|
|
func (m *TaskManager) GetStatsTaskState(clusterID string, taskID typeutil.UniqueID) indexpb.JobState {
|
|
key := Key{ClusterID: clusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
task, ok := m.statsTasks[key]
|
|
if !ok {
|
|
return indexpb.JobState_JobStateNone
|
|
}
|
|
return task.State
|
|
}
|
|
|
|
func (m *TaskManager) StoreStatsTaskState(clusterID string, taskID typeutil.UniqueID, state indexpb.JobState, failReason string) {
|
|
key := Key{ClusterID: clusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if task, ok := m.statsTasks[key]; ok {
|
|
log.Info("store stats task state", zap.String("clusterID", clusterID), zap.Int64("TaskID", taskID),
|
|
zap.String("state", state.String()), zap.String("fail reason", failReason))
|
|
task.State = state
|
|
task.FailReason = failReason
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) StorePKSortStatsResult(
|
|
ClusterID string,
|
|
taskID typeutil.UniqueID,
|
|
collID typeutil.UniqueID,
|
|
partID typeutil.UniqueID,
|
|
segID typeutil.UniqueID,
|
|
channel string,
|
|
numRows int64,
|
|
insertLogs []*datapb.FieldBinlog,
|
|
statsLogs []*datapb.FieldBinlog,
|
|
bm25Logs []*datapb.FieldBinlog,
|
|
) {
|
|
key := Key{ClusterID: ClusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if info, ok := m.statsTasks[key]; ok {
|
|
info.CollID = collID
|
|
info.PartID = partID
|
|
info.SegID = segID
|
|
info.InsertChannel = channel
|
|
info.NumRows = numRows
|
|
info.InsertLogs = insertLogs
|
|
info.StatsLogs = statsLogs
|
|
info.Bm25Logs = bm25Logs
|
|
return
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) StoreStatsTextIndexResult(
|
|
ClusterID string,
|
|
taskID typeutil.UniqueID,
|
|
collID typeutil.UniqueID,
|
|
partID typeutil.UniqueID,
|
|
segID typeutil.UniqueID,
|
|
channel string,
|
|
texIndexLogs map[int64]*datapb.TextIndexStats,
|
|
) {
|
|
key := Key{ClusterID: ClusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if info, ok := m.statsTasks[key]; ok {
|
|
info.TextStatsLogs = texIndexLogs
|
|
info.SegID = segID
|
|
info.CollID = collID
|
|
info.PartID = partID
|
|
info.InsertChannel = channel
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) StoreJSONKeyStatsResult(
|
|
clusterID string,
|
|
taskID typeutil.UniqueID,
|
|
collID typeutil.UniqueID,
|
|
partID typeutil.UniqueID,
|
|
segID typeutil.UniqueID,
|
|
channel string,
|
|
jsonKeyIndexLogs map[int64]*datapb.JsonKeyStats,
|
|
) {
|
|
key := Key{ClusterID: clusterID, TaskID: taskID}
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
if info, ok := m.statsTasks[key]; ok {
|
|
info.JSONKeyStatsLogs = jsonKeyIndexLogs
|
|
info.SegID = segID
|
|
info.CollID = collID
|
|
info.PartID = partID
|
|
info.InsertChannel = channel
|
|
}
|
|
}
|
|
|
|
func (m *TaskManager) GetStatsTaskInfo(clusterID string, taskID typeutil.UniqueID) *StatsTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
|
|
if info, ok := m.statsTasks[Key{ClusterID: clusterID, TaskID: taskID}]; ok {
|
|
return info.Clone()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *TaskManager) DeleteStatsTaskInfos(ctx context.Context, keys []Key) []*StatsTaskInfo {
|
|
m.stateLock.Lock()
|
|
defer m.stateLock.Unlock()
|
|
deleted := make([]*StatsTaskInfo, 0, len(keys))
|
|
for _, key := range keys {
|
|
info, ok := m.statsTasks[key]
|
|
if ok {
|
|
deleted = append(deleted, info)
|
|
delete(m.statsTasks, key)
|
|
log.Ctx(ctx).Info("delete stats task infos",
|
|
zap.String("clusterID", key.ClusterID), zap.Int64("TaskID", key.TaskID))
|
|
}
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
func (m *TaskManager) deleteAllStatsTasks() []*StatsTaskInfo {
|
|
m.stateLock.Lock()
|
|
deletedTasks := m.statsTasks
|
|
m.statsTasks = make(map[Key]*StatsTaskInfo)
|
|
m.stateLock.Unlock()
|
|
|
|
deleted := make([]*StatsTaskInfo, 0, len(deletedTasks))
|
|
for _, info := range deletedTasks {
|
|
deleted = append(deleted, info)
|
|
}
|
|
return deleted
|
|
}
|
|
|
|
func (m *TaskManager) DeleteAllTasks() {
|
|
deletedIndexTasks := m.deleteAllIndexTasks()
|
|
for _, t := range deletedIndexTasks {
|
|
if t.Cancel != nil {
|
|
t.Cancel()
|
|
}
|
|
}
|
|
deletedAnalyzeTasks := m.deleteAllAnalyzeTasks()
|
|
for _, t := range deletedAnalyzeTasks {
|
|
if t.Cancel != nil {
|
|
t.Cancel()
|
|
}
|
|
}
|
|
deletedStatsTasks := m.deleteAllStatsTasks()
|
|
for _, t := range deletedStatsTasks {
|
|
if t.Cancel != nil {
|
|
t.Cancel()
|
|
}
|
|
}
|
|
}
|