mirror of https://github.com/milvus-io/milvus.git
fix: [2.5] Set task version for stats task (#40130)
issue: #40034 master pr: #40035 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/40155/head
parent
a4eb2ce224
commit
fdad35e668
|
@ -863,8 +863,8 @@ func (gc *garbageCollector) recycleUnusedTextIndexFiles(ctx context.Context) {
|
|||
log := log.With(zap.Int64("segmentID", seg.GetID()), zap.Int64("fieldID", fieldStats.GetFieldID()))
|
||||
// clear low version task
|
||||
for i := int64(1); i < fieldStats.GetVersion(); i++ {
|
||||
prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath,
|
||||
seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID(), i)
|
||||
prefix := fmt.Sprintf("%s/%s/%d/%d/%d/%d/%d/%d", gc.option.cli.RootPath(), common.TextIndexPath,
|
||||
fieldStats.GetBuildID(), i, seg.GetCollectionID(), seg.GetPartitionID(), seg.GetID(), fieldStats.GetFieldID())
|
||||
futures := make([]*conc.Future[struct{}], 0)
|
||||
|
||||
err := gc.option.cli.WalkWithPrefix(ctx, prefix, true, func(files *storage.ChunkObjectInfo) bool {
|
||||
|
|
|
@ -181,7 +181,7 @@ func (jm *statsJobManager) triggerTextStatsTask() {
|
|||
needTriggerFieldIDs = append(needTriggerFieldIDs, field.GetFieldID())
|
||||
}
|
||||
segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
|
||||
return needDoTextIndex(seg, needTriggerFieldIDs)
|
||||
return seg.GetIsSorted() && needDoTextIndex(seg, needTriggerFieldIDs)
|
||||
}))
|
||||
|
||||
for _, segment := range segments {
|
||||
|
@ -205,7 +205,7 @@ func (jm *statsJobManager) triggerJsonKeyIndexStatsTask() {
|
|||
}
|
||||
}
|
||||
segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
|
||||
return needDoJsonKeyIndex(seg, needTriggerFieldIDs)
|
||||
return seg.GetIsSorted() && needDoJsonKeyIndex(seg, needTriggerFieldIDs)
|
||||
}))
|
||||
for _, segment := range segments {
|
||||
if err := jm.SubmitStatsTask(segment.GetID(), segment.GetID(), indexpb.StatsSubJob_JsonKeyIndexJob, true); err != nil {
|
||||
|
@ -228,7 +228,7 @@ func (jm *statsJobManager) triggerBM25StatsTask() {
|
|||
}
|
||||
}
|
||||
segments := jm.mt.SelectSegments(jm.ctx, WithCollection(collection.ID), SegmentFilterFunc(func(seg *SegmentInfo) bool {
|
||||
return needDoBM25(seg, needTriggerFieldIDs)
|
||||
return seg.GetIsSorted() && needDoBM25(seg, needTriggerFieldIDs)
|
||||
}))
|
||||
|
||||
for _, segment := range segments {
|
||||
|
|
|
@ -161,6 +161,14 @@ func (st *statsTask) UpdateMetaBuildingState(meta *meta) error {
|
|||
|
||||
func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
|
||||
log := log.Ctx(ctx).With(zap.Int64("taskID", st.taskID), zap.Int64("segmentID", st.segmentID))
|
||||
|
||||
statsMeta := dependency.meta.statsTaskMeta.GetStatsTaskBySegmentID(st.segmentID, st.subJobType)
|
||||
if statsMeta == nil {
|
||||
log.Warn("stats task meta is null, skip it")
|
||||
st.SetState(indexpb.JobState_JobStateNone, "stats task meta is null")
|
||||
return false
|
||||
}
|
||||
// set segment compacting
|
||||
segment := dependency.meta.GetHealthySegment(ctx, st.segmentID)
|
||||
if segment == nil {
|
||||
log.Warn("segment is node healthy, skip stats")
|
||||
|
@ -214,6 +222,8 @@ func (st *statsTask) PreCheck(ctx context.Context, dependency *taskScheduler) bo
|
|||
NumRows: segment.GetNumOfRows(),
|
||||
CollectionTtl: collTtl.Nanoseconds(),
|
||||
CurrentTs: tsoutil.GetCurrentTime(),
|
||||
// update version after check
|
||||
TaskVersion: statsMeta.GetVersion() + 1,
|
||||
BinlogMaxSize: Params.DataNodeCfg.BinLogMaxSize.GetAsUint64(),
|
||||
}
|
||||
|
||||
|
|
|
@ -119,6 +119,7 @@ func (s *statsTaskSuite) SetupSuite() {
|
|||
SegmentID: s.segID,
|
||||
InsertChannel: "ch1",
|
||||
TaskID: s.taskID,
|
||||
SubJobType: indexpb.StatsSubJob_Sort,
|
||||
Version: 0,
|
||||
NodeID: 0,
|
||||
State: indexpb.JobState_JobStateInit,
|
||||
|
|
Loading…
Reference in New Issue