// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datacoord import ( "context" "sync" "time" "github.com/cockroachdb/errors" "go.uber.org/zap" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/task" "github.com/milvus-io/milvus/pkg/v2/log" "github.com/milvus-io/milvus/pkg/v2/proto/datapb" "github.com/milvus-io/milvus/pkg/v2/proto/indexpb" "github.com/milvus-io/milvus/pkg/v2/util/merr" "github.com/milvus-io/milvus/pkg/v2/util/typeutil" ) type StatsInspector interface { Start() Stop() SubmitStatsTask(originSegmentID, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool) error GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error } var _ StatsInspector = (*statsInspector)(nil) type statsInspector struct { ctx context.Context cancel context.CancelFunc loopWg sync.WaitGroup mt *meta scheduler task.GlobalScheduler allocator allocator.Allocator handler Handler compactionInspector CompactionInspector ievm IndexEngineVersionManager } func newStatsInspector(ctx context.Context, mt *meta, scheduler task.GlobalScheduler, allocator allocator.Allocator, handler Handler, compactionInspector CompactionInspector, ievm IndexEngineVersionManager, ) *statsInspector { ctx, cancel := context.WithCancel(ctx) return &statsInspector{ ctx: ctx, cancel: cancel, loopWg: sync.WaitGroup{}, mt: mt, scheduler: scheduler, allocator: allocator, handler: handler, compactionInspector: compactionInspector, ievm: ievm, } } func (si *statsInspector) Start() { si.reloadFromMeta() si.loopWg.Add(2) go si.triggerStatsTaskLoop() go si.cleanupStatsTasksLoop() } func (si *statsInspector) Stop() { si.cancel() si.loopWg.Wait() } func (si *statsInspector) reloadFromMeta() { tasks := si.mt.statsTaskMeta.GetAllTasks() for _, st := range tasks { if st.GetState() != indexpb.JobState_JobStateInit && st.GetState() != indexpb.JobState_JobStateRetry && st.GetState() != indexpb.JobState_JobStateInProgress { continue } segment := si.mt.GetHealthySegment(si.ctx, st.GetSegmentID()) taskSlot := int64(0) if segment != nil { taskSlot = calculateStatsTaskSlot(segment.getSegmentSize()) } si.scheduler.Enqueue(newStatsTask( proto.Clone(st).(*indexpb.StatsTask), taskSlot, si.mt, si.handler, si.allocator, si.ievm, )) } } func (si *statsInspector) triggerStatsTaskLoop() { log.Info("start checkStatsTaskLoop...") defer si.loopWg.Done() ticker := time.NewTicker(Params.DataCoordCfg.TaskCheckInterval.GetAsDuration(time.Second)) defer ticker.Stop() lastJSONStatsLastTrigger := time.Now().Unix() maxJSONStatsTaskCount := 0 for { select { case <-si.ctx.Done(): log.Warn("DataCoord context done, exit checkStatsTaskLoop...") return case <-ticker.C: si.triggerTextStatsTask() si.triggerBM25StatsTask() lastJSONStatsLastTrigger, maxJSONStatsTaskCount = si.triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger, maxJSONStatsTaskCount) } } } func (si *statsInspector) enableBM25() bool { return false } func needDoTextIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { if !isFlush(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 || !segment.GetIsSorted() { return false } for _, fieldID := range fieldIDs { if segment.GetTextStatsLogs() == nil { return true } if segment.GetTextStatsLogs()[fieldID] == nil { return true } } return false } func needDoJsonKeyIndex(segment *SegmentInfo, fieldIDs []UniqueID) bool { if !isFlush(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 || !segment.GetIsSorted() { return false } for _, fieldID := range fieldIDs { if segment.GetJsonKeyStats() == nil { return true } if segment.GetJsonKeyStats()[fieldID] == nil { return true } } return false } func needDoBM25(segment *SegmentInfo, fieldIDs []UniqueID) bool { // TODO: docking bm25 stats task return false } func (si *statsInspector) triggerTextStatsTask() { collections := si.mt.GetCollections() for _, collection := range collections { needTriggerFieldIDs := make([]UniqueID, 0) for _, field := range collection.Schema.GetFields() { // TODO @longjiquan: please replace it to fieldSchemaHelper.EnableMath h := typeutil.CreateFieldSchemaHelper(field) if !h.EnableMatch() { continue } needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID()) } segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { return seg.GetIsSorted() && needDoTextIndex(seg, needTriggerFieldIDs) })) for _, segment := range segments { if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_TextIndexJob, true); err != nil { log.Warn("create stats task with text index for segment failed, wait for retry", zap.Int64("segmentID", segment.GetID()), zap.Error(err)) continue } } } } func (si *statsInspector) triggerJsonKeyIndexStatsTask(lastJSONStatsLastTrigger int64, maxJSONStatsTaskCount int) (int64, int) { collections := si.mt.GetCollections() for _, collection := range collections { needTriggerFieldIDs := make([]UniqueID, 0) for _, field := range collection.Schema.GetFields() { h := typeutil.CreateFieldSchemaHelper(field) if h.EnableJSONKeyStatsIndex() && Params.CommonCfg.EnabledJSONKeyStats.GetAsBool() { needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID()) } } segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { return needDoJsonKeyIndex(seg, needTriggerFieldIDs) })) if time.Now().Unix()-lastJSONStatsLastTrigger > int64(Params.DataCoordCfg.JSONStatsTriggerInterval.GetAsDuration(time.Minute).Seconds()) { lastJSONStatsLastTrigger = time.Now().Unix() maxJSONStatsTaskCount = 0 } for _, segment := range segments { if maxJSONStatsTaskCount >= Params.DataCoordCfg.JSONStatsTriggerCount.GetAsInt() { break } if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_JsonKeyIndexJob, true); err != nil { log.Warn("create stats task with json key index for segment failed, wait for retry:", zap.Int64("segmentID", segment.GetID()), zap.Error(err)) continue } maxJSONStatsTaskCount++ } } return lastJSONStatsLastTrigger, maxJSONStatsTaskCount } func (si *statsInspector) triggerBM25StatsTask() { collections := si.mt.GetCollections() for _, collection := range collections { needTriggerFieldIDs := make([]UniqueID, 0) for _, field := range collection.Schema.GetFields() { // TODO: docking bm25 stats task if si.enableBM25() { needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID()) } } segments := si.mt.SelectSegments(si.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { return seg.GetIsSorted() && needDoBM25(seg, needTriggerFieldIDs) })) for _, segment := range segments { if err := si.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_BM25Job, true); err != nil { log.Warn("create stats task with bm25 for segment failed, wait for retry", zap.Int64("segmentID", segment.GetID()), zap.Error(err)) continue } } } } // cleanupStatsTasks clean up the finished/failed stats tasks func (si *statsInspector) cleanupStatsTasksLoop() { log.Info("start cleanupStatsTasksLoop...") defer si.loopWg.Done() ticker := time.NewTicker(Params.DataCoordCfg.GCInterval.GetAsDuration(time.Second)) defer ticker.Stop() for { select { case <-si.ctx.Done(): log.Warn("DataCoord context done, exit cleanupStatsTasksLoop...") return case <-ticker.C: start := time.Now() log.Info("start cleanupUnusedStatsTasks...", zap.Time("startAt", start)) taskIDs := si.mt.statsTaskMeta.CanCleanedTasks() for _, taskID := range taskIDs { if err := si.mt.statsTaskMeta.DropStatsTask(si.ctx, taskID); err != nil { // ignore err, if remove failed, wait next GC log.Warn("clean up stats task failed", zap.Int64("taskID", taskID), zap.Error(err)) } } log.Info("cleanupUnusedStatsTasks done", zap.Duration("timeCost", time.Since(start))) } } } func (si *statsInspector) SubmitStatsTask(originSegmentID, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool, ) error { originSegment := si.mt.GetHealthySegment(si.ctx, originSegmentID) if originSegment == nil { return merr.WrapErrSegmentNotFound(originSegmentID) } taskID, err := si.allocator.AllocID(context.Background()) if err != nil { return err } originSegmentSize := originSegment.getSegmentSize() if subJobType == indexpb.StatsSubJob_JsonKeyIndexJob { originSegmentSize = originSegment.getSegmentSize() * 2 } taskSlot := calculateStatsTaskSlot(originSegmentSize) t := &indexpb.StatsTask{ CollectionID: originSegment.GetCollectionID(), PartitionID: originSegment.GetPartitionID(), SegmentID: originSegmentID, InsertChannel: originSegment.GetInsertChannel(), TaskID: taskID, Version: 0, NodeID: 0, State: indexpb.JobState_JobStateInit, FailReason: "", TargetSegmentID: targetSegmentID, SubJobType: subJobType, CanRecycle: canRecycle, } if err = si.mt.statsTaskMeta.AddStatsTask(t); err != nil { if errors.Is(err, merr.ErrTaskDuplicate) { log.RatedInfo(10, "stats task already exists", zap.Int64("taskID", taskID), zap.Int64("collectionID", originSegment.GetCollectionID()), zap.Int64("segmentID", originSegment.GetID())) return nil } return err } si.scheduler.Enqueue(newStatsTask(proto.Clone(t).(*indexpb.StatsTask), taskSlot, si.mt, si.handler, si.allocator, si.ievm)) log.Ctx(si.ctx).Info("submit stats task success", zap.Int64("taskID", taskID), zap.String("subJobType", subJobType.String()), zap.Int64("collectionID", originSegment.GetCollectionID()), zap.Int64("originSegmentID", originSegmentID), zap.Int64("targetSegmentID", targetSegmentID), zap.Int64("taskSlot", taskSlot)) return nil } func (si *statsInspector) GetStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) *indexpb.StatsTask { task := si.mt.statsTaskMeta.GetStatsTaskBySegmentID(originSegmentID, subJobType) log.Info("statsJobManager get stats task state", zap.Int64("segmentID", originSegmentID), zap.String("subJobType", subJobType.String()), zap.String("state", task.GetState().String()), zap.String("failReason", task.GetFailReason())) return task } func (si *statsInspector) DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error { task := si.mt.statsTaskMeta.GetStatsTaskBySegmentID(originSegmentID, subJobType) if task == nil { return nil } si.scheduler.AbortAndRemoveTask(task.GetTaskID()) if err := si.mt.statsTaskMeta.MarkTaskCanRecycle(task.GetTaskID()); err != nil { return err } log.Info("statsJobManager drop stats task success", zap.Int64("segmentID", originSegmentID), zap.Int64("taskID", task.GetTaskID()), zap.String("subJobType", subJobType.String())) return nil }