// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datanode import ( "context" "fmt" "strconv" "github.com/cockroachdb/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/datanode/index" "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/metrics" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/proto/workerpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/paramtable" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) // CreateJob is CreateIndex func (node *DataNode) CreateJob(ctx context.Context, req *workerpb.CreateJobRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.String("clusterID", req.GetClusterID()), zap.Int64("indexBuildID", req.GetBuildID()), ) if err := node.lifetime.Add(merr.IsHealthy); err != nil { log.Warn("index node not ready", zap.Error(err), ) return merr.Status(err), nil } defer node.lifetime.Done() log.Info("DataNode building index ...", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("segmentID", req.GetSegmentID()), zap.Int64("indexID", req.GetIndexID()), zap.String("indexName", req.GetIndexName()), zap.String("indexFilePrefix", req.GetIndexFilePrefix()), zap.Int64("indexVersion", req.GetIndexVersion()), zap.Strings("dataPaths", req.GetDataPaths()), zap.Any("typeParams", req.GetTypeParams()), zap.Any("indexParams", req.GetIndexParams()), zap.Int64("numRows", req.GetNumRows()), zap.Int32("current_index_version", req.GetCurrentIndexVersion()), zap.Any("storepath", req.GetStorePath()), zap.Any("storeversion", req.GetStoreVersion()), zap.Any("indexstorepath", req.GetIndexStorePath()), zap.Any("dim", req.GetDim()), ) ctx, sp := otel.Tracer(typeutil.DataNodeRole).Start(ctx, "DataNode-CreateIndex", trace.WithAttributes( attribute.Int64("indexBuildID", req.GetBuildID()), attribute.String("clusterID", req.GetClusterID()), )) defer sp.End() metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.TotalLabel).Inc() taskCtx, taskCancel := context.WithCancel(node.ctx) if oldInfo := node.taskManager.LoadOrStoreIndexTask(req.GetClusterID(), req.GetBuildID(), &index.IndexTaskInfo{ Cancel: taskCancel, State: commonpb.IndexState_InProgress, }); oldInfo != nil { err := merr.WrapErrIndexDuplicate(req.GetIndexName(), "building index task existed") log.Warn("duplicated index build task", zap.Error(err)) metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() return merr.Status(err), nil } cm, err := node.storageFactory.NewChunkManager(node.ctx, req.GetStorageConfig()) if err != nil { log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()), zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()), zap.Error(err), ) node.taskManager.DeleteIndexTaskInfos(ctx, []index.Key{{ClusterID: req.GetClusterID(), TaskID: req.GetBuildID()}}) metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() return merr.Status(err), nil } task := index.NewIndexBuildTask(taskCtx, taskCancel, req, cm, node.taskManager) ret := merr.Success() if err := node.taskScheduler.TaskQueue.Enqueue(task); err != nil { log.Warn("DataNode failed to schedule", zap.Error(err)) ret = merr.Status(err) metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc() return ret, nil } metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc() log.Info("DataNode successfully scheduled", zap.String("indexName", req.GetIndexName())) return ret, nil } func (node *DataNode) QueryJobs(ctx context.Context, req *workerpb.QueryJobsRequest) (*workerpb.QueryJobsResponse, error) { log := log.Ctx(ctx).With( zap.String("clusterID", req.GetClusterID()), ).WithRateGroup("in.queryJobs", 1, 60) if err := node.lifetime.Add(merr.IsHealthyOrStopping); err != nil { log.Warn("index node not ready", zap.Error(err)) return &workerpb.QueryJobsResponse{ Status: merr.Status(err), }, nil } defer node.lifetime.Done() infos := make(map[typeutil.UniqueID]*index.IndexTaskInfo) node.taskManager.ForeachIndexTaskInfo(func(ClusterID string, buildID typeutil.UniqueID, info *index.IndexTaskInfo) { if ClusterID == req.GetClusterID() { infos[buildID] = info.Clone() } }) ret := &workerpb.QueryJobsResponse{ Status: merr.Success(), ClusterID: req.GetClusterID(), IndexInfos: make([]*workerpb.IndexTaskInfo, 0, len(req.GetTaskIDs())), } for i, buildID := range req.GetTaskIDs() { ret.IndexInfos = append(ret.IndexInfos, &workerpb.IndexTaskInfo{ BuildID: buildID, State: commonpb.IndexState_IndexStateNone, IndexFileKeys: nil, SerializedSize: 0, }) if info, ok := infos[buildID]; ok { ret.IndexInfos[i].State = info.State ret.IndexInfos[i].IndexFileKeys = info.FileKeys ret.IndexInfos[i].SerializedSize = info.SerializedSize ret.IndexInfos[i].MemSize = info.MemSize ret.IndexInfos[i].FailReason = info.FailReason ret.IndexInfos[i].CurrentIndexVersion = info.CurrentIndexVersion ret.IndexInfos[i].IndexStoreVersion = info.IndexStoreVersion log.RatedDebug(5, "querying index build task", zap.Int64("indexBuildID", buildID), zap.String("state", info.State.String()), zap.String("reason", info.FailReason), ) } } return ret, nil } func (node *DataNode) DropJobs(ctx context.Context, req *workerpb.DropJobsRequest) (*commonpb.Status, error) { log.Ctx(ctx).Info("drop index build jobs", zap.String("clusterID", req.ClusterID), zap.Int64s("indexBuildIDs", req.GetTaskIDs()), ) if err := node.lifetime.Add(merr.IsHealthyOrStopping); err != nil { log.Ctx(ctx).Warn("index node not ready", zap.Error(err), zap.String("clusterID", req.ClusterID)) return merr.Status(err), nil } defer node.lifetime.Done() keys := make([]index.Key, 0, len(req.GetTaskIDs())) for _, taskID := range req.GetTaskIDs() { keys = append(keys, index.Key{ClusterID: req.GetClusterID(), TaskID: taskID}) } infos := node.taskManager.DeleteIndexTaskInfos(ctx, keys) for _, info := range infos { if info.Cancel != nil { info.Cancel() } } log.Ctx(ctx).Info("drop index build jobs success", zap.String("clusterID", req.GetClusterID()), zap.Int64s("indexBuildIDs", req.GetTaskIDs())) return merr.Success(), nil } // GetJobStats should be GetSlots func (node *DataNode) GetJobStats(ctx context.Context, req *workerpb.GetJobStatsRequest) (*workerpb.GetJobStatsResponse, error) { if err := node.lifetime.Add(merr.IsHealthyOrStopping); err != nil { log.Ctx(ctx).Warn("index node not ready", zap.Error(err)) return &workerpb.GetJobStatsResponse{ Status: merr.Status(err), }, nil } defer node.lifetime.Done() var ( totalSlots = node.totalSlot indexStatsUsed = node.taskScheduler.TaskQueue.GetUsingSlot() compactionUsed = node.compactionExecutor.Slots() importUsed = node.importScheduler.Slots() ) availableSlots := totalSlots - indexStatsUsed - compactionUsed - importUsed if availableSlots < 0 { availableSlots = 0 } log.Ctx(ctx).Info("query slots done", zap.Int64("totalSlots", totalSlots), zap.Int64("availableSlots", availableSlots), zap.Int64("indexStatsUsed", indexStatsUsed), zap.Int64("compactionUsed", compactionUsed), zap.Int64("importUsed", importUsed), ) return &workerpb.GetJobStatsResponse{ Status: merr.Success(), TotalSlots: node.totalSlot, AvailableSlots: availableSlots, }, nil } // Deprecated: use CreateTask instead, keep for compatibility func (node *DataNode) CreateJobV2(ctx context.Context, req *workerpb.CreateJobV2Request) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.String("clusterID", req.GetClusterID()), zap.Int64("TaskID", req.GetTaskID()), zap.String("jobType", req.GetJobType().String()), ) if err := node.lifetime.Add(merr.IsHealthy); err != nil { log.Warn("index node not ready", zap.Error(err), ) return merr.Status(err), nil } defer node.lifetime.Done() log.Info("DataNode receive CreateJob request...") switch req.GetJobType() { case indexpb.JobType_JobTypeIndexJob: indexRequest := req.GetIndexRequest() return node.createIndexTask(ctx, indexRequest) case indexpb.JobType_JobTypeAnalyzeJob: analyzeRequest := req.GetAnalyzeRequest() return node.createAnalyzeTask(ctx, analyzeRequest) case indexpb.JobType_JobTypeStatsJob: statsRequest := req.GetStatsRequest() return node.createStatsTask(ctx, statsRequest) default: log.Warn("DataNode receive unknown type job") return merr.Status(fmt.Errorf("DataNode receive unknown type job with TaskID: %d", req.GetTaskID())), nil } } func (node *DataNode) createIndexTask(ctx context.Context, req *workerpb.CreateJobRequest) (*commonpb.Status, error) { log.Ctx(ctx).Info("DataNode building index ...", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("segmentID", req.GetSegmentID()), zap.String("indexFilePrefix", req.GetIndexFilePrefix()), zap.Int64("indexVersion", req.GetIndexVersion()), zap.Strings("dataPaths", req.GetDataPaths()), zap.Any("typeParams", req.GetTypeParams()), zap.Any("indexParams", req.GetIndexParams()), zap.Int64("numRows", req.GetNumRows()), zap.Int32("current_index_version", req.GetCurrentIndexVersion()), zap.String("storePath", req.GetStorePath()), zap.Int64("storeVersion", req.GetStoreVersion()), zap.String("indexStorePath", req.GetIndexStorePath()), zap.Int64("dim", req.GetDim()), zap.Int64("fieldID", req.GetFieldID()), zap.String("fieldType", req.GetFieldType().String()), zap.Any("field", req.GetField()), zap.Int64("taskSlot", req.GetTaskSlot()), zap.Int64("lackBinlogRows", req.GetLackBinlogRows()), ) if req.GetTaskSlot() <= 0 { log.Ctx(ctx).Warn("receive index task with invalid slot, set to 64", zap.Int64("taskSlot", req.GetTaskSlot())) req.TaskSlot = 64 } taskCtx, taskCancel := context.WithCancel(node.ctx) if oldInfo := node.taskManager.LoadOrStoreIndexTask(req.GetClusterID(), req.GetBuildID(), &index.IndexTaskInfo{ Cancel: taskCancel, State: commonpb.IndexState_InProgress, }); oldInfo != nil { err := merr.WrapErrTaskDuplicate(indexpb.JobType_JobTypeIndexJob.String(), fmt.Sprintf("building index task existed with %s-%d", req.GetClusterID(), req.GetBuildID())) log.Warn("duplicated index build task", zap.Error(err)) metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() return merr.Status(err), nil } cm, err := node.storageFactory.NewChunkManager(node.ctx, req.GetStorageConfig()) if err != nil { log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()), zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()), zap.Error(err), ) node.taskManager.DeleteIndexTaskInfos(ctx, []index.Key{{ClusterID: req.GetClusterID(), TaskID: req.GetBuildID()}}) metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.FailLabel).Inc() return merr.Status(err), nil } task := index.NewIndexBuildTask(taskCtx, taskCancel, req, cm, node.taskManager) ret := merr.Success() if err := node.taskScheduler.TaskQueue.Enqueue(task); err != nil { log.Warn("DataNode failed to schedule", zap.Error(err)) ret = merr.Status(err) metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc() return ret, nil } metrics.DataNodeBuildIndexTaskCounter.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), metrics.SuccessLabel).Inc() log.Info("DataNode index job enqueued successfully", zap.String("indexName", req.GetIndexName())) return ret, nil } func (node *DataNode) createAnalyzeTask(ctx context.Context, req *workerpb.AnalyzeRequest) (*commonpb.Status, error) { log.Ctx(ctx).Info("receive analyze job", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("fieldID", req.GetFieldID()), zap.String("fieldName", req.GetFieldName()), zap.String("dataType", req.GetFieldType().String()), zap.Int64("version", req.GetVersion()), zap.Int64("dim", req.GetDim()), zap.Float64("trainSizeRatio", req.GetMaxTrainSizeRatio()), zap.Int64("numClusters", req.GetNumClusters()), zap.Int64("taskSlot", req.GetTaskSlot()), ) if req.GetTaskSlot() <= 0 { log.Ctx(ctx).Warn("receive analyze task with invalid slot, set to 65535", zap.Int64("taskSlot", req.GetTaskSlot())) req.TaskSlot = 65535 } taskCtx, taskCancel := context.WithCancel(node.ctx) if oldInfo := node.taskManager.LoadOrStoreAnalyzeTask(req.GetClusterID(), req.GetTaskID(), &index.AnalyzeTaskInfo{ Cancel: taskCancel, State: indexpb.JobState_JobStateInProgress, }); oldInfo != nil { err := merr.WrapErrTaskDuplicate(indexpb.JobType_JobTypeAnalyzeJob.String(), fmt.Sprintf("analyze task already existed with %s-%d", req.GetClusterID(), req.GetTaskID())) log.Warn("duplicated analyze task", zap.Error(err)) return merr.Status(err), nil } t := index.NewAnalyzeTask(taskCtx, taskCancel, req, node.taskManager) ret := merr.Success() if err := node.taskScheduler.TaskQueue.Enqueue(t); err != nil { log.Warn("DataNode failed to schedule", zap.Error(err)) ret = merr.Status(err) return ret, nil } log.Info("DataNode analyze job enqueued successfully") return ret, nil } func (node *DataNode) createStatsTask(ctx context.Context, req *workerpb.CreateStatsRequest) (*commonpb.Status, error) { log.Ctx(ctx).Info("receive stats job", zap.Int64("collectionID", req.GetCollectionID()), zap.Int64("partitionID", req.GetPartitionID()), zap.Int64("segmentID", req.GetSegmentID()), zap.Int64("numRows", req.GetNumRows()), zap.Int64("targetSegmentID", req.GetTargetSegmentID()), zap.String("subJobType", req.GetSubJobType().String()), zap.Int64("startLogID", req.GetStartLogID()), zap.Int64("endLogID", req.GetEndLogID()), zap.Int64("taskSlot", req.GetTaskSlot()), ) if req.GetTaskSlot() <= 0 { log.Ctx(ctx).Warn("receive stats task with invalid slot, set to 64", zap.Int64("taskSlot", req.GetTaskSlot())) req.TaskSlot = 64 } taskCtx, taskCancel := context.WithCancel(node.ctx) if oldInfo := node.taskManager.LoadOrStoreStatsTask(req.GetClusterID(), req.GetTaskID(), &index.StatsTaskInfo{ Cancel: taskCancel, State: indexpb.JobState_JobStateInProgress, }); oldInfo != nil { err := merr.WrapErrTaskDuplicate(indexpb.JobType_JobTypeStatsJob.String(), fmt.Sprintf("stats task already existed with %s-%d", req.GetClusterID(), req.GetTaskID())) log.Warn("duplicated stats task", zap.Error(err)) return merr.Status(err), nil } cm, err := node.storageFactory.NewChunkManager(node.ctx, req.GetStorageConfig()) if err != nil { log.Error("create chunk manager failed", zap.String("bucket", req.GetStorageConfig().GetBucketName()), zap.String("accessKey", req.GetStorageConfig().GetAccessKeyID()), zap.Error(err), ) node.taskManager.DeleteStatsTaskInfos(ctx, []index.Key{{ClusterID: req.GetClusterID(), TaskID: req.GetTaskID()}}) return merr.Status(err), nil } t := index.NewStatsTask(taskCtx, taskCancel, req, node.taskManager, io.NewBinlogIO(cm)) ret := merr.Success() if err := node.taskScheduler.TaskQueue.Enqueue(t); err != nil { log.Warn("DataNode failed to schedule", zap.Error(err)) ret = merr.Status(err) return ret, nil } log.Info("DataNode stats job enqueued successfully") return ret, nil } // Deprecated: use QueryTask instead, keep for compatibility func (node *DataNode) QueryJobsV2(ctx context.Context, req *workerpb.QueryJobsV2Request) (*workerpb.QueryJobsV2Response, error) { log := log.Ctx(ctx).With( zap.String("clusterID", req.GetClusterID()), zap.Int64s("taskIDs", req.GetTaskIDs()), ).WithRateGroup("QueryResult", 1, 60) if err := node.lifetime.Add(merr.IsHealthyOrStopping); err != nil { log.Warn("DataNode not ready", zap.Error(err)) return &workerpb.QueryJobsV2Response{ Status: merr.Status(err), }, nil } defer node.lifetime.Done() switch req.GetJobType() { case indexpb.JobType_JobTypeIndexJob: return node.queryIndexTask(ctx, &workerpb.QueryJobsRequest{ ClusterID: req.GetClusterID(), TaskIDs: req.GetTaskIDs(), }) case indexpb.JobType_JobTypeAnalyzeJob: return node.queryAnalyzeTask(ctx, &workerpb.QueryJobsRequest{ ClusterID: req.GetClusterID(), TaskIDs: req.GetTaskIDs(), }) case indexpb.JobType_JobTypeStatsJob: return node.queryStatsTask(ctx, &workerpb.QueryJobsRequest{ ClusterID: req.GetClusterID(), TaskIDs: req.GetTaskIDs(), }) default: log.Warn("DataNode receive querying unknown type jobs") return &workerpb.QueryJobsV2Response{ Status: merr.Status(errors.New("DataNode receive querying unknown type jobs")), }, nil } } func (node *DataNode) queryIndexTask(ctx context.Context, req *workerpb.QueryJobsRequest) (*workerpb.QueryJobsV2Response, error) { log := log.Ctx(ctx).With( zap.String("clusterID", req.GetClusterID()), zap.Int64s("taskIDs", req.GetTaskIDs()), ).WithRateGroup("QueryResult", 1, 60) infos := make(map[typeutil.UniqueID]*index.IndexTaskInfo) node.taskManager.ForeachIndexTaskInfo(func(ClusterID string, buildID typeutil.UniqueID, info *index.IndexTaskInfo) { if ClusterID == req.GetClusterID() { infos[buildID] = info.Clone() } }) results := make([]*workerpb.IndexTaskInfo, 0, len(req.GetTaskIDs())) for _, buildID := range req.GetTaskIDs() { if info, ok := infos[buildID]; ok { results = append(results, info.ToIndexTaskInfo(buildID)) } } log.Debug("query index jobs result success", zap.Any("results", results)) if len(results) == 0 { return &workerpb.QueryJobsV2Response{ Status: merr.Status(fmt.Errorf("tasks '%v' not found", req.GetTaskIDs())), }, nil } return &workerpb.QueryJobsV2Response{ Status: merr.Success(), ClusterID: req.GetClusterID(), Result: &workerpb.QueryJobsV2Response_IndexJobResults{ IndexJobResults: &workerpb.IndexJobResults{ Results: results, }, }, }, nil } func (node *DataNode) queryStatsTask(ctx context.Context, req *workerpb.QueryJobsRequest) (*workerpb.QueryJobsV2Response, error) { log := log.Ctx(ctx).With( zap.String("clusterID", req.GetClusterID()), zap.Int64s("taskIDs", req.GetTaskIDs()), ).WithRateGroup("QueryResult", 1, 60) results := make([]*workerpb.StatsResult, 0, len(req.GetTaskIDs())) for _, taskID := range req.GetTaskIDs() { info := node.taskManager.GetStatsTaskInfo(req.GetClusterID(), taskID) if info != nil { results = append(results, info.ToStatsResult(taskID)) } } log.Debug("query stats job result success", zap.Any("results", results)) if len(results) == 0 { return &workerpb.QueryJobsV2Response{ Status: merr.Status(fmt.Errorf("tasks '%v' not found", req.GetTaskIDs())), }, nil } return &workerpb.QueryJobsV2Response{ Status: merr.Success(), ClusterID: req.GetClusterID(), Result: &workerpb.QueryJobsV2Response_StatsJobResults{ StatsJobResults: &workerpb.StatsResults{ Results: results, }, }, }, nil } func (node *DataNode) queryAnalyzeTask(ctx context.Context, req *workerpb.QueryJobsRequest) (*workerpb.QueryJobsV2Response, error) { log := log.Ctx(ctx).With( zap.String("clusterID", req.GetClusterID()), zap.Int64s("taskIDs", req.GetTaskIDs()), ).WithRateGroup("QueryResult", 1, 60) results := make([]*workerpb.AnalyzeResult, 0, len(req.GetTaskIDs())) for _, taskID := range req.GetTaskIDs() { info := node.taskManager.GetAnalyzeTaskInfo(req.GetClusterID(), taskID) if info != nil { results = append(results, &workerpb.AnalyzeResult{ TaskID: taskID, State: info.State, FailReason: info.FailReason, CentroidsFile: info.CentroidsFile, }) } } log.Debug("query analyze jobs result success", zap.Any("results", results)) if len(results) == 0 { return &workerpb.QueryJobsV2Response{ Status: merr.Status(fmt.Errorf("tasks '%v' not found", req.GetTaskIDs())), }, nil } return &workerpb.QueryJobsV2Response{ Status: merr.Success(), ClusterID: req.GetClusterID(), Result: &workerpb.QueryJobsV2Response_AnalyzeJobResults{ AnalyzeJobResults: &workerpb.AnalyzeResults{ Results: results, }, }, }, nil } // Deprecated: use DropTask instead, keep for compatibility func (node *DataNode) DropJobsV2(ctx context.Context, req *workerpb.DropJobsV2Request) (*commonpb.Status, error) { log := log.Ctx(ctx).With(zap.String("clusterID", req.GetClusterID()), zap.Int64s("taskIDs", req.GetTaskIDs()), zap.String("jobType", req.GetJobType().String()), ) if err := node.lifetime.Add(merr.IsHealthyOrStopping); err != nil { log.Warn("DataNode not ready", zap.Error(err)) return merr.Status(err), nil } defer node.lifetime.Done() log.Info("DataNode receive DropJobs request") switch req.GetJobType() { case indexpb.JobType_JobTypeIndexJob: keys := make([]index.Key, 0, len(req.GetTaskIDs())) for _, buildID := range req.GetTaskIDs() { keys = append(keys, index.Key{ClusterID: req.GetClusterID(), TaskID: buildID}) } infos := node.taskManager.DeleteIndexTaskInfos(ctx, keys) for _, info := range infos { if info.Cancel != nil { info.Cancel() } } log.Info("drop index build jobs success") return merr.Success(), nil case indexpb.JobType_JobTypeAnalyzeJob: keys := make([]index.Key, 0, len(req.GetTaskIDs())) for _, taskID := range req.GetTaskIDs() { keys = append(keys, index.Key{ClusterID: req.GetClusterID(), TaskID: taskID}) } infos := node.taskManager.DeleteAnalyzeTaskInfos(ctx, keys) for _, info := range infos { if info.Cancel != nil { info.Cancel() } } log.Info("drop analyze jobs success") return merr.Success(), nil case indexpb.JobType_JobTypeStatsJob: keys := make([]index.Key, 0, len(req.GetTaskIDs())) for _, taskID := range req.GetTaskIDs() { keys = append(keys, index.Key{ClusterID: req.GetClusterID(), TaskID: taskID}) } infos := node.taskManager.DeleteStatsTaskInfos(ctx, keys) for _, info := range infos { if info.Cancel != nil { info.Cancel() } } log.Info("drop stats jobs success") return merr.Success(), nil default: log.Warn("DataNode receive dropping unknown type jobs") return merr.Status(errors.New("DataNode receive dropping unknown type jobs")), nil } }