From fdad35e668ebf5126a16af9bfb49db3cbc64cfc9 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Mon, 24 Feb 2025 15:43:55 +0800 Subject: [PATCH] fix: [2.5] Set task version for stats task (#40130) issue: #40034 master pr: #40035 Signed-off-by: Cai Zhang --- internal/datacoord/garbage_collector.go | 4 ++-- internal/datacoord/job_manager.go | 6 +++--- internal/datacoord/task_stats.go | 12 +++++++++++- internal/datacoord/task_stats_test.go | 1 + 4 files changed, 17 insertions(+), 6 deletions(-) diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 75fcf00cbb..f0cfcd90aa 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -863,8 +863,8 @@ func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) { log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID())) // clear low version task for i := int64(1); i < fieldStats.GetVersion(); i++ { - prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath, - seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID(), i) + prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath, + fieldStats.GetBuildID(), i, seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID()) futures := make([]*conc.Future[struct{}], 0) err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(files *storage.ChunkObjectInfo) bool { diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index a3643c3988..097e4d3f7b 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -181,7 +181,7 @@ func (jm *statsJobManager) triggerTextStatsTask() { needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID()) } segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { - return needDoTextIndex(seg, needTriggerFieldIDs) + return seg.GetIsSorted() && needDoTextIndex(seg, needTriggerFieldIDs) })) for _, segment := range segments { @@ -205,7 +205,7 @@ func (jm *statsJobManager) triggerJsonKeyIndexStatsTask() { } } segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { - return needDoJsonKeyIndex(seg, needTriggerFieldIDs) + return seg.GetIsSorted() && needDoJsonKeyIndex(seg, needTriggerFieldIDs) })) for _, segment := range segments { if err := jm.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_JsonKeyIndexJob, true); err != nil { @@ -228,7 +228,7 @@ func (jm *statsJobManager) triggerBM25StatsTask() { } } segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool { - return needDoBM25(seg, needTriggerFieldIDs) + return seg.GetIsSorted() && needDoBM25(seg, needTriggerFieldIDs) })) for _, segment := range segments { diff --git a/internal/datacoord/task_stats.go b/internal/datacoord/task_stats.go index c8efe00601..846aa7f4c4 100644 --- a/internal/datacoord/task_stats.go +++ b/internal/datacoord/task_stats.go @@ -161,6 +161,14 @@ func (st *statsTask) UpdateMetaBuildingState(meta *meta) error { func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool { log := log.Ctx(ctx).With(zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID)) + + statsMeta := dependency.meta.statsTaskMeta.GetStatsTaskBySegmentID(st.segmentID, st.subJobType) + if statsMeta == nil { + log.Warn("stats task meta is null, skip it") + st.SetState(indexpb.JobState_JobStateNone, "stats task meta is null") + return false + } + // set segment compacting segment := dependency.meta.GetHealthySegment(ctx, st.segmentID) if segment == nil { log.Warn("segment is node healthy, skip stats") @@ -214,7 +222,9 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo NumRows: segment.GetNumOfRows(), CollectionTtl: collTtl.Nanoseconds(), CurrentTs: tsoutil.GetCurrentTime(), - BinlogMaxSize: Params.DataNodeCfg.BinLogMaxSize.GetAsUint64(), + // update version after check + TaskVersion: statsMeta.GetVersion() + 1, + BinlogMaxSize: Params.DataNodeCfg.BinLogMaxSize.GetAsUint64(), } return true diff --git a/internal/datacoord/task_stats_test.go b/internal/datacoord/task_stats_test.go index a4fc31814d..0576d0e48f 100644 --- a/internal/datacoord/task_stats_test.go +++ b/internal/datacoord/task_stats_test.go @@ -119,6 +119,7 @@ func (s *statsTaskSuite) SetupSuite() { SegmentID: s.segID, InsertChannel: "ch1", TaskID: s.taskID, + SubJobType: indexpb.StatsSubJob_Sort, Version: 0, NodeID: 0, State: indexpb.JobState_JobStateInit,