enhance: clean compaction task in compactionHandler (#38170) (#38848)

issue: #35711, #38811 

master pr: #38170
master pr:  #38925

---------

Signed-off-by: wayblink <anyang.wang@zilliz.com>
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
Co-authored-by: wayblink <anyang.wang@zilliz.com>
pull/39089/head
cai.zhang 2025-01-08 15:58:57 +08:00 committed by GitHub
parent d8bc793be3
commit 3b99485615
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 612 additions and 167 deletions

View File

@ -83,6 +83,9 @@ type compactionPlanHandler struct {
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task
cleaningGuard lock.RWMutex
cleaningTasks map[int64]CompactionTask // planID -> task
meta CompactionMeta
allocator allocator
chManager ChannelManager
@ -191,6 +194,7 @@ func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm Chann
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
cleaningTasks: make(map[int64]CompactionTask),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
@ -412,6 +416,7 @@ func (c *compactionPlanHandler) loopCheck() {
if err != nil {
log.Info("fail to update compaction", zap.Error(err))
}
c.cleanFailedTasks()
}
}
}
@ -443,7 +448,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
triggers := c.meta.GetCompactionTasks()
for _, tasks := range triggers {
for _, task := range tasks {
if task.State == datapb.CompactionTaskState_completed || task.State == datapb.CompactionTaskState_cleaned {
if task.State == datapb.CompactionTaskState_cleaned {
duration := time.Since(time.Unix(task.StartTime, 0)).Seconds()
if duration > float64(Params.DataCoordCfg.CompactionDropToleranceInSeconds.GetAsDuration(time.Second).Seconds()) {
// try best to delete meta
@ -669,6 +674,11 @@ func assignNodeID(slots map[int64]int64, t CompactionTask) int64 {
return nodeID
}
// checkCompaction retrieves executing tasks and calls each task's Process() method
// to evaluate its state and progress through the state machine.
// Completed tasks are removed from executingTasks.
// Tasks that fail or timeout are moved from executingTasks to cleaningTasks,
// where task-specific clean logic is performed asynchronously.
func (c *compactionPlanHandler) checkCompaction() error {
// Get executing executingTasks before GetCompactionState from DataNode to prevent false failure,
// for DC might add new task while GetCompactionState.
@ -710,9 +720,44 @@ func (c *compactionPlanHandler) checkCompaction() error {
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()
// insert task need to clean
c.cleaningGuard.Lock()
for _, t := range finishedTasks {
if t.GetState() == datapb.CompactionTaskState_failed ||
t.GetState() == datapb.CompactionTaskState_timeout ||
t.GetState() == datapb.CompactionTaskState_completed {
log.Ctx(context.TODO()).Info("task need to clean",
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()),
zap.String("state", t.GetState().String()))
c.cleaningTasks[t.GetPlanID()] = t
}
}
c.cleaningGuard.Unlock()
return nil
}
// cleanFailedTasks performs task define Clean logic
// while compactionPlanHandler.Clean is to do garbage collection for cleaned tasks
func (c *compactionPlanHandler) cleanFailedTasks() {
c.cleaningGuard.RLock()
cleanedTasks := make([]CompactionTask, 0)
for _, t := range c.cleaningTasks {
clean := t.Clean()
if clean {
cleanedTasks = append(cleanedTasks, t)
}
}
c.cleaningGuard.RUnlock()
c.cleaningGuard.Lock()
for _, t := range cleanedTasks {
delete(c.cleaningTasks, t.GetPlanID())
}
c.cleaningGuard.Unlock()
}
func pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1

View File

@ -24,7 +24,17 @@ import (
)
type CompactionTask interface {
// Process performs the task's state machine
//
// Returns:
// - <bool>: whether the task state machine ends.
//
// Notes:
//
// `end` doesn't mean the task completed, its state may be completed or failed or timeout.
Process() bool
// Clean performs clean logic for a fail/timeout task
Clean() bool
BuildCompactionRequest() (*datapb.CompactionPlan, error)
GetTriggerID() UniqueID

View File

@ -65,8 +65,11 @@ func newClusteringCompactionTask(t *datapb.CompactionTask, meta CompactionMeta,
}
}
// Note: return True means exit this state machine.
// ONLY return True for Completed, Failed or Timeout
func (t *clusteringCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
ctx := context.TODO()
log := log.Ctx(ctx).With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
lastState := t.GetState().String()
err := t.retryableProcess()
if err != nil {
@ -105,15 +108,22 @@ func (t *clusteringCompactionTask) Process() bool {
if err != nil {
log.Warn("Failed to updateAndSaveTaskMeta", zap.Error(err))
}
log.Info("clustering compaction task state changed", zap.String("lastState", lastState), zap.String("currentState", currentState), zap.Int64("elapse seconds", lastStateDuration))
}
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
return t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout
}
// retryableProcess process task's state transfer, return error if not work as expected
// the outer Process will set state and retry times according to the error type(retryable or not-retryable)
func (t *clusteringCompactionTask) retryableProcess() error {
if t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned {
if t.State == datapb.CompactionTaskState_completed ||
t.State == datapb.CompactionTaskState_cleaned ||
t.State == datapb.CompactionTaskState_failed ||
t.State == datapb.CompactionTaskState_timeout {
return nil
}
@ -140,14 +150,14 @@ func (t *clusteringCompactionTask) retryableProcess() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_indexing:
return t.processIndexing()
case datapb.CompactionTaskState_timeout:
return t.processFailedOrTimeout()
case datapb.CompactionTaskState_failed:
return t.processFailedOrTimeout()
}
return nil
}
func (t *clusteringCompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, error) {
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
@ -188,7 +198,9 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
}
func (t *clusteringCompactionTask) processPipelining() error {
log := log.With(zap.Int64("triggerID", t.TriggerID), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("planID", t.GetPlanID()))
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()))
if t.NeedReAssignNodeID() {
log.Debug("wait for the node to be assigned before proceeding with the subsequent steps")
return nil
@ -210,7 +222,9 @@ func (t *clusteringCompactionTask) processPipelining() error {
}
func (t *clusteringCompactionTask) processExecuting() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()))
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil {
log.Info("processExecuting clustering compaction", zap.Bool("result nil", result == nil), zap.Error(err))
@ -248,12 +262,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return t.processMetaSaved()
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
}
return nil
case datapb.CompactionTaskState_failed:
@ -265,6 +274,9 @@ func (t *clusteringCompactionTask) processExecuting() error {
}
func (t *clusteringCompactionTask) processMetaSaved() error {
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()))
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
@ -274,6 +286,9 @@ func (t *clusteringCompactionTask) processMetaSaved() error {
}
func (t *clusteringCompactionTask) processIndexing() error {
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()))
// wait for segment indexed
collectionIndexes := t.meta.GetIndexMeta().GetIndexesForCollection(t.GetCollectionID(), "")
if len(collectionIndexes) == 0 {
@ -292,7 +307,8 @@ func (t *clusteringCompactionTask) processIndexing() error {
}
return true
}()
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed), zap.Int64("planID", t.GetPlanID()), zap.Int64s("segments", t.ResultSegments))
log.Debug("check compaction result segments index states", zap.Bool("indexed", indexed),
zap.Int64s("segments", t.ResultSegments))
if indexed {
return t.completeTask()
}
@ -316,6 +332,9 @@ func (t *clusteringCompactionTask) markInputSegmentsDropped() error {
// indexed is the final state of a clustering compaction task
// one task should only run this once
func (t *clusteringCompactionTask) completeTask() error {
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()))
var err error
// update current partition stats version
// at this point, the segment view includes both the input segments and the result segments.
@ -335,17 +354,23 @@ func (t *clusteringCompactionTask) completeTask() error {
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
}
if err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("completeTask update task state to completed failed", zap.Error(err))
return err
}
// mark input segments as dropped
// now, the segment view only includes the result segments.
if err = t.markInputSegmentsDropped(); err != nil {
log.Warn("mark input segments as Dropped failed, skip it and wait retry",
zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("mark input segments as Dropped failed, skip it and wait retry")
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
return nil
}
func (t *clusteringCompactionTask) processAnalyzing() error {
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()))
analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetAnalyzeTaskID())
if analyzeTask == nil {
log.Warn("analyzeTask not found", zap.Int64("id", t.GetAnalyzeTaskID()))
@ -373,74 +398,92 @@ func (t *clusteringCompactionTask) resetSegmentCompacting() {
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
}
func (t *clusteringCompactionTask) processFailedOrTimeout() error {
log.Info("clean task", zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()), zap.String("state", t.GetState().String()))
func (t *clusteringCompactionTask) doClean() error {
log := log.Ctx(context.TODO()).With(zap.Int64("triggerID", t.TriggerID),
zap.Int64("collectionID", t.GetCollectionID()),
zap.Int64("planID", t.GetPlanID()))
log.Info("clean task", zap.String("state", t.GetState().String()))
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
log.Warn("clusteringCompactionTask processFailedOrTimeout unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
}
isInputDropped := false
for _, segID := range t.GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
isInputDropped = true
break
}
}
if isInputDropped {
log.Info("input segments dropped, doing for compatibility",
zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
// this task must be generated by v2.4, just for compatibility
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
if t.GetState() == datapb.CompactionTaskState_completed {
if err := t.markInputSegmentsDropped(); err != nil {
return err
}
} else {
// after v2.4.16, mark the results segment as dropped
var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
isInputDropped := false
for _, segID := range t.GetInputSegments() {
if t.meta.GetHealthySegment(segID) == nil {
isInputDropped = true
break
}
}
if isInputDropped {
log.Info("input segments dropped, doing for compatibility",
zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
// this task must be generated by v2.4, just for compatibility
// revert segments meta
var operators []UpdateOperator
// revert level of input segments
// L1 : L1 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L1
// L2 : L2 ->(processPipelining)-> L2 ->(processFailedOrTimeout)-> L2
for _, segID := range t.GetInputSegments() {
operators = append(operators, RevertSegmentLevelOperator(segID))
}
// if result segments are generated but task fail in the other steps, mark them as L1 segments without partitions stats
for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L1))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, 0))
}
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
} else {
// after v2.4.16, mark the results segment as dropped
var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
// Don't worry about them being loaded; they are all invisible.
operators = append(operators, UpdateStatusOperator(segID, commonpb.SegmentState_Dropped))
}
err := t.meta.UpdateSegmentsInfo(operators...)
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
}
// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
VChannel: t.GetChannel(),
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
}
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrCleanPartitionStatsFail(fmt.Sprintf("%d-%d-%s-%d", t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID()))
}
}
t.resetSegmentCompacting()
// drop partition stats if uploaded
partitionStatsInfo := &datapb.PartitionStatsInfo{
CollectionID: t.GetCollectionID(),
PartitionID: t.GetPartitionID(),
VChannel: t.GetChannel(),
Version: t.GetPlanID(),
SegmentIDs: t.GetResultSegments(),
}
err := t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
log.Warn("clusteringCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("clusteringCompactionTask clean done")
return nil
}
func (t *clusteringCompactionTask) doAnalyze() error {
@ -535,8 +578,8 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO
log.Warn("Failed to saveTaskMeta", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable
}
t.CompactionTask = task
log.Info("updateAndSaveTaskMeta success", zap.String("task state", t.CompactionTask.State.String()))
t.SetTask(task)
log.Ctx(context.TODO()).Info("updateAndSaveTaskMeta success", zap.String("task state", t.GetState().String()))
return nil
}

View File

@ -109,7 +109,8 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
task := s.generateBasicTask(false)
task.processPipelining()
err := task.processPipelining()
s.NoError(err)
seg11 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg11.Level)
@ -117,13 +118,14 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
s.Equal(datapb.SegmentLevel_L2, seg21.Level)
s.Equal(int64(10000), seg21.PartitionStatsVersion)
task.ResultSegments = []int64{103, 104}
task.updateAndSaveTaskMeta(setResultSegments([]int64{103, 104}))
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 103,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
@ -133,27 +135,80 @@ func (s *ClusteringCompactionTaskSuite) TestClusteringCompactionSegmentMetaChang
ID: 104,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
LastLevel: datapb.SegmentLevel_L1,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
task.processFailedOrTimeout()
s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
seg12 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
seg22 := s.meta.GetSegment(102)
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
s.Equal(int64(10000), seg22.PartitionStatsVersion)
err = task.doClean()
s.NoError(err)
seg32 := s.meta.GetSegment(103)
s.Equal(datapb.SegmentLevel_L2, seg32.Level)
s.Equal(int64(10001), seg32.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg32.GetState())
seg42 := s.meta.GetSegment(104)
s.Equal(datapb.SegmentLevel_L2, seg42.Level)
s.Equal(int64(10001), seg42.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Dropped, seg42.GetState())
s.Run("v2.4.x", func() {
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 101,
State: commonpb.SegmentState_Dropped,
LastLevel: datapb.SegmentLevel_L1,
Level: datapb.SegmentLevel_L2,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 102,
State: commonpb.SegmentState_Dropped,
LastLevel: datapb.SegmentLevel_L2,
Level: datapb.SegmentLevel_L2,
PartitionStatsVersion: 10000,
},
})
// fake some compaction result segment
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 103,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
s.meta.AddSegment(context.TODO(), &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{
ID: 104,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L2,
CreatedByCompaction: true,
PartitionStatsVersion: 10001,
},
})
task := s.generateBasicTask(false)
task.sessions = s.session
s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
task.InputSegments = []int64{101, 102}
task.ResultSegments = []int64{103, 104}
task.Clean()
seg12 := s.meta.GetSegment(101)
s.Equal(datapb.SegmentLevel_L1, seg12.Level)
seg22 := s.meta.GetSegment(102)
s.Equal(datapb.SegmentLevel_L2, seg22.Level)
s.Equal(int64(10000), seg22.PartitionStatsVersion)
seg32 := s.meta.GetSegment(103)
s.Equal(datapb.SegmentLevel_L1, seg32.Level)
s.Equal(int64(0), seg32.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Flushed, seg32.GetState())
seg42 := s.meta.GetSegment(104)
s.Equal(datapb.SegmentLevel_L1, seg42.Level)
s.Equal(int64(0), seg42.PartitionStatsVersion)
s.Equal(commonpb.SegmentState_Flushed, seg42.GetState())
})
}
func (s *ClusteringCompactionTaskSuite) generateBasicTask(vectorClusteringKey bool) *clusteringCompactionTask {
@ -208,7 +263,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
s.Equal(false, task.Process())
s.Equal(int32(3), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_pipelining, task.GetState())
s.Equal(false, task.Process())
s.True(task.Process())
s.Equal(int32(0), task.RetryTimes)
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
}
@ -216,8 +271,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessRetryLogic() {
func (s *ClusteringCompactionTaskSuite) TestProcessPipelining() {
s.Run("process pipelining fail, segment not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_pipelining
s.Equal(false, task.Process())
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining))
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})
@ -444,11 +499,10 @@ func (s *ClusteringCompactionTaskSuite) TestProcessExecuting() {
},
},
}, nil).Once()
s.session.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once()
time.Sleep(time.Second * 1)
s.Equal(true, task.Process())
s.Equal(datapb.CompactionTaskState_cleaned, task.GetState())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_timeout, task.GetState())
})
}
@ -546,8 +600,8 @@ func (s *ClusteringCompactionTaskSuite) TestProcessIndexingState() {
func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
s.Run("analyze task not found", func() {
task := s.generateBasicTask(false)
task.State = datapb.CompactionTaskState_analyzing
s.False(task.Process())
task.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_analyzing))
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})
@ -564,7 +618,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
State: indexpb.JobState_JobStateFailed,
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})
@ -582,7 +636,7 @@ func (s *ClusteringCompactionTaskSuite) TestProcessAnalyzingState() {
CentroidsFile: "",
}
s.meta.analyzeMeta.AddAnalyzeTask(t)
s.False(task.Process())
s.True(task.Process())
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
})

View File

@ -44,7 +44,7 @@ type l0CompactionTask struct {
}
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
// ONLY return True for Completed, Failed
func (t *l0CompactionTask) Process() bool {
switch t.GetState() {
case datapb.CompactionTaskState_pipelining:
@ -55,8 +55,6 @@ func (t *l0CompactionTask) Process() bool {
return t.processMetaSaved()
case datapb.CompactionTaskState_completed:
return t.processCompleted()
case datapb.CompactionTaskState_failed:
return t.processFailed()
}
return true
}
@ -77,7 +75,7 @@ func (t *l0CompactionTask) processPipelining() bool {
return false
}
return t.processFailed()
return true
}
err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan())
@ -119,7 +117,7 @@ func (t *l0CompactionTask) processExecuting() bool {
log.Warn("l0CompactionTask failed to set task failed state", zap.Error(err))
return false
}
return t.processFailed()
return true
}
return false
}
@ -149,25 +147,33 @@ func (t *l0CompactionTask) processCompleted() bool {
return true
}
func (t *l0CompactionTask) processFailed() bool {
func (t *l0CompactionTask) doClean() error {
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
if t.hasAssignedWorker() {
err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
})
if err != nil {
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
log.Warn("l0CompactionTask processFailed unable to drop compaction plan", zap.Error(err))
}
}
t.resetSegmentCompacting()
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("l0CompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
return err
}
log.Info("l0CompactionTask processFailed done", zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", t.GetPlanID()))
return true
// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
UpdateCompactionSegmentSizeMetrics(t.result.GetSegments())
log.Info("l0CompactionTask clean done")
return nil
}
func (t *l0CompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *l0CompactionTask) GetSpan() trace.Span {

View File

@ -363,14 +363,11 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
PlanID: t.GetPlanID(),
State: datapb.CompactionTaskState_failed,
}, nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(2)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).Return().Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
s.Equal(datapb.CompactionTaskState_failed, t.GetState())
})
s.Run("test executing with result failed save compaction meta failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_executing)
@ -453,29 +450,10 @@ func (s *L0CompactionTaskSuite) TestStateTrans() {
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Times(1)
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(nil).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetInputSegments())
}).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
})
s.Run("test process failed failed", func() {
t := s.generateTestL0Task(datapb.CompactionTaskState_failed)
t.NodeID = 100
s.Require().True(t.GetNodeID() > 0)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil).Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(t.GetNodeID(), mock.Anything).Return(errors.New("mock error")).Once()
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, false).RunAndReturn(func(segIDs []int64, isCompacting bool) {
s.ElementsMatch(segIDs, t.GetInputSegments())
}).Once()
got := t.Process()
s.True(got)
s.Equal(datapb.CompactionTaskState_cleaned, t.GetState())
s.Equal(datapb.CompactionTaskState_failed, t.GetState())
})
s.Run("test unkonwn task", func() {

View File

@ -42,7 +42,7 @@ func (t *mixCompactionTask) processPipelining() bool {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
}
return t.processFailed()
return true
}
err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan())
@ -50,15 +50,18 @@ func (t *mixCompactionTask) processPipelining() bool {
// Compaction tasks may be refused by DataNode because of slot limit. In this case, the node id is reset
// to enable a retry in compaction.checkCompaction().
// This is tricky, we should remove the reassignment here.
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
}
return false
}
log.Info("mixCompactionTask notify compaction tasks to DataNode")
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
if err != nil {
log.Warn("mixCompactionTask update task state failed", zap.Error(err))
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
}
return false
@ -96,7 +99,7 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
}
return t.processFailed()
return true
}
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
@ -106,7 +109,7 @@ func (t *mixCompactionTask) processExecuting() bool {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
}
return t.processFailed()
return true
}
return false
}
@ -122,7 +125,7 @@ func (t *mixCompactionTask) processExecuting() bool {
if err != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
return false
return true
}
return false
}
@ -150,7 +153,7 @@ func (t *mixCompactionTask) saveSegmentMeta() error {
}
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
// ONLY return True for Completed, Failed or Timeout
func (t *mixCompactionTask) Process() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
lastState := t.GetState().String()
@ -245,21 +248,33 @@ func (t *mixCompactionTask) ShadowClone(opts ...compactionTaskOpt) *datapb.Compa
}
func (t *mixCompactionTask) processFailed() bool {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()), zap.Int64("collectionID", t.GetCollectionID()))
return true
}
func (t *mixCompactionTask) Clean() bool {
return t.doClean() == nil
}
func (t *mixCompactionTask) doClean() error {
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("PlanID", t.GetPlanID()),
zap.Int64("collectionID", t.GetCollectionID()))
if err := t.sessions.DropCompactionPlan(t.GetNodeID(), &datapb.DropCompactionPlanRequest{
PlanID: t.GetPlanID(),
}); err != nil {
log.Warn("mixCompactionTask processFailed unable to drop compaction plan", zap.Error(err))
return err
}
log.Info("mixCompactionTask processFailed done")
t.resetSegmentCompacting()
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
log.Warn("mixCompactionTask fail to updateAndSaveTaskMeta", zap.Error(err))
return err
}
return true
// resetSegmentCompacting must be the last step of Clean, to make sure resetSegmentCompacting only called once
// otherwise, it may unlock segments locked by other compaction tasks
t.resetSegmentCompacting()
log.Info("mixCompactionTask clean done")
return nil
}
func (t *mixCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskOpt) error {

View File

@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/metastore/kv/datacoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -46,6 +47,7 @@ type CompactionPlanHandlerSuite struct {
mockCm *MockChannelManager
mockSessMgr *MockSessionManager
handler *compactionPlanHandler
mockHandler *NMockHandler
cluster *MockCluster
}
@ -57,6 +59,8 @@ func (s *CompactionPlanHandlerSuite) SetupTest() {
s.mockSessMgr = NewMockSessionManager(s.T())
s.cluster = NewMockCluster(s.T())
s.handler = newCompactionPlanHandler(s.cluster, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc, nil, nil)
s.mockHandler = NewNMockHandler(s.T())
s.mockHandler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil).Maybe()
}
func (s *CompactionPlanHandlerSuite) TestScheduleEmpty() {
@ -1045,6 +1049,184 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.NoError(err)
}
func (s *CompactionPlanHandlerSuite) TestCleanCompaction() {
s.SetupTest()
tests := []struct {
task CompactionTask
}{
{
&mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_MixCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
},
},
{
&l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
meta: s.mockMeta,
sessions: s.mockSessMgr,
},
},
}
for _, test := range tests {
task := test.task
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
err := s.handler.checkCompaction()
s.NoError(err)
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
}
func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompaction() {
s.SetupTest()
task := newClusteringCompactionTask(
&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
CollectionID: 1001,
Type: datapb.CompactionType_ClusteringCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
s.mockMeta, s.mockSessMgr, s.mockHandler, nil)
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
func (s *CompactionPlanHandlerSuite) TestCleanClusteringCompactionCommitFail() {
s.SetupTest()
task := newClusteringCompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
CollectionID: 1001,
Channel: "ch-1",
Type: datapb.CompactionType_ClusteringCompaction,
State: datapb.CompactionTaskState_executing,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
ClusteringKeyField: &schemapb.FieldSchema{
FieldID: 100,
Name: Int64Field,
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
AutoID: true,
IsClusteringKey: true,
},
},
s.mockMeta, s.mockSessMgr, s.mockHandler, nil)
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().GetCompactionPlanResult(UniqueID(1), int64(1)).Return(
&datapb.CompactionPlanResult{
PlanID: 1,
State: datapb.CompactionTaskState_completed,
Segments: []*datapb.CompactionSegment{
{
PlanID: 1,
SegmentID: 101,
},
},
}, nil).Once()
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(nil, nil, errors.New("mock error"))
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(task.GetResultSegments()))
s.Equal(datapb.CompactionTaskState_failed, task.GetState())
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return().Once()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil)
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
// test compactionHandler should keep clean the failed task until it become cleaned
func (s *CompactionPlanHandlerSuite) TestKeepClean() {
s.SetupTest()
tests := []struct {
task CompactionTask
}{
{
newClusteringCompactionTask(&datapb.CompactionTask{
PlanID: 1,
TriggerID: 1,
Type: datapb.CompactionType_ClusteringCompaction,
State: datapb.CompactionTaskState_failed,
NodeID: 1,
InputSegments: []UniqueID{1, 2},
},
s.mockMeta, s.mockSessMgr, s.mockHandler, nil),
},
}
for _, test := range tests {
task := test.task
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil)
s.mockMeta.EXPECT().SetSegmentsCompacting(mock.Anything, mock.Anything).Return()
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(errors.New("mock error")).Once()
s.mockMeta.EXPECT().SaveCompactionTask(mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.executingTasks[1] = task
s.Equal(1, len(s.handler.executingTasks))
s.handler.checkCompaction()
s.Equal(0, len(s.handler.executingTasks))
s.Equal(1, len(s.handler.cleaningTasks))
s.handler.cleanFailedTasks()
s.Equal(1, len(s.handler.cleaningTasks))
s.mockMeta.EXPECT().CleanPartitionStatsInfo(mock.Anything).Return(nil).Once()
s.handler.cleanFailedTasks()
s.Equal(0, len(s.handler.cleaningTasks))
}
}
func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {
l := &datapb.FieldBinlog{
FieldID: fieldID,

View File

@ -184,22 +184,37 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
return true
}
var compactionFromExist func(segID UniqueID) bool
callCount := 0
compactionFromExist = func(segID UniqueID) bool {
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
var compactionFromExistWithCache func(segID UniqueID) bool
compactionFromExistWithCache = func(segID UniqueID) bool {
compactionFromExistMap := make(map[int64]bool)
var compactionFromExist func(segID UniqueID) bool
compactionFromExist = func(segID UniqueID) bool {
callCount++
if valid, ok := compactionFromExistMap[segID]; ok {
return valid
}
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
compactionFromExistMap[segID] = false
return false
}
for _, fromID := range compactionFrom {
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
compactionFromExistMap[segID] = true
return true
}
if compactionFromExist(fromID) {
compactionFromExistMap[segID] = true
return true
}
}
compactionFromExistMap[segID] = false
return false
}
for _, fromID := range compactionFrom {
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
return true
}
if compactionFromExist(fromID) {
return true
}
}
return false
return compactionFromExist(segID)
}
segmentIndexed := func(segID UniqueID) bool {
@ -214,7 +229,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
newFlushedIDs.Insert(id)
continue
}
if segmentIndexed(id) && !compactionFromExist(id) {
if segmentIndexed(id) && !compactionFromExistWithCache(id) {
newFlushedIDs.Insert(id)
} else {
for _, fromID := range compactionFrom {
@ -231,7 +246,6 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
flushedIDs = newFlushedIDs
newFlushedIDs = make(typeutil.UniqueSet)
}
flushedIDs = newFlushedIDs
log.Info("GetQueryVChanPositions",
@ -242,6 +256,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
zap.Int("result growing", len(growingIDs)),
zap.Int("result L0", len(levelZeroIDs)),
zap.Any("partition stats", partStatsVersionsMap),
zap.Int("callCount", callCount),
)
return &datapb.VchannelInfo{

View File

@ -774,6 +774,12 @@ func UpdateStatusOperator(segmentID int64, status commonpb.SegmentState) UpdateO
return false
}
if segment.GetState() == status {
log.Ctx(context.TODO()).Info("meta update: segment stats already is target state",
zap.Int64("segmentID", segmentID), zap.String("status", status.String()))
return false
}
updateSegStateAndPrepareMetrics(segment, status, modPack.metricMutation)
if status == commonpb.SegmentState_Dropped {
segment.DroppedAt = uint64(time.Now().UnixNano())

View File

@ -107,8 +107,8 @@ func (psm *partitionStatsMeta) ListPartitionStatsInfos(collectionID int64, parti
func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStatsInfo) error {
psm.Lock()
defer psm.Unlock()
if err := psm.catalog.SavePartitionStatsInfo(psm.ctx, info); err != nil {
log.Error("meta update: update PartitionStatsInfo info fail", zap.Error(err))
if err := psm.catalog.SavePartitionStatsInfo(context.TODO(), info); err != nil {
log.Ctx(context.TODO()).Error("meta update: update PartitionStatsInfo info fail", zap.Error(err))
return err
}
if _, ok := psm.partitionStatsInfos[info.GetVChannel()]; !ok {
@ -127,6 +127,24 @@ func (psm *partitionStatsMeta) SavePartitionStatsInfo(info *datapb.PartitionStat
func (psm *partitionStatsMeta) DropPartitionStatsInfo(info *datapb.PartitionStatsInfo) error {
psm.Lock()
defer psm.Unlock()
// if the dropping partitionStats is the current version, should update currentPartitionStats
currentVersion := psm.innerGetCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel())
if currentVersion == info.GetVersion() && currentVersion != emptyPartitionStatsVersion {
infos := psm.partitionStatsInfos[info.GetVChannel()][info.GetPartitionID()].infos
if len(infos) > 0 {
var maxVersion int64 = 0
for version := range infos {
if version > maxVersion && version < currentVersion {
maxVersion = version
}
}
err := psm.innerSaveCurrentPartitionStatsVersion(info.GetCollectionID(), info.GetPartitionID(), info.GetVChannel(), maxVersion)
if err != nil {
return err
}
}
}
if err := psm.catalog.DropPartitionStatsInfo(psm.ctx, info); err != nil {
log.Error("meta update: drop PartitionStatsInfo info fail",
zap.Int64("collectionID", info.GetCollectionID()),
@ -155,8 +173,11 @@ func (psm *partitionStatsMeta) DropPartitionStatsInfo(info *datapb.PartitionStat
func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error {
psm.Lock()
defer psm.Unlock()
return psm.innerSaveCurrentPartitionStatsVersion(collectionID, partitionID, vChannel, currentPartitionStatsVersion)
}
log.Info("update current partition stats version", zap.Int64("collectionID", collectionID),
func (psm *partitionStatsMeta) innerSaveCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string, currentPartitionStatsVersion int64) error {
log.Ctx(context.TODO()).Info("update current partition stats version", zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.String("vChannel", vChannel), zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion))
@ -180,7 +201,10 @@ func (psm *partitionStatsMeta) SaveCurrentPartitionStatsVersion(collectionID, pa
func (psm *partitionStatsMeta) GetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 {
psm.RLock()
defer psm.RUnlock()
return psm.innerGetCurrentPartitionStatsVersion(collectionID, partitionID, vChannel)
}
func (psm *partitionStatsMeta) innerGetCurrentPartitionStatsVersion(collectionID, partitionID int64, vChannel string) int64 {
if _, ok := psm.partitionStatsInfos[vChannel]; !ok {
return emptyPartitionStatsVersion
}

View File

@ -91,3 +91,61 @@ func (s *PartitionStatsMetaSuite) TestGetPartitionStats() {
currentVersion4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(1, 2, "ch-1")
s.Equal(int64(100), currentVersion4)
}
func (s *PartitionStatsMetaSuite) TestDropPartitionStats() {
ctx := context.Background()
partitionStatsMeta, err := newPartitionStatsMeta(ctx, s.catalog)
s.NoError(err)
collectionID := int64(1)
partitionID := int64(2)
channel := "ch-1"
s.catalog.EXPECT().DropPartitionStatsInfo(mock.Anything, mock.Anything).Return(nil)
s.catalog.EXPECT().SaveCurrentPartitionStatsVersion(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
partitionStats := []*datapb.PartitionStatsInfo{
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
SegmentIDs: []int64{100000},
Version: 100,
},
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
SegmentIDs: []int64{100000},
Version: 101,
},
{
CollectionID: collectionID,
PartitionID: partitionID,
VChannel: channel,
SegmentIDs: []int64{100000},
Version: 102,
},
}
for _, partitionStats := range partitionStats {
partitionStatsMeta.SavePartitionStatsInfo(partitionStats)
}
partitionStatsMeta.SaveCurrentPartitionStatsVersion(collectionID, partitionID, channel, 102)
version := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(int64(102), version)
err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[2])
s.NoError(err)
s.Equal(2, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos))
version2 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(int64(101), version2)
err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[1])
s.Equal(1, len(partitionStatsMeta.partitionStatsInfos[channel][partitionID].infos))
version3 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(int64(100), version3)
err = partitionStatsMeta.DropPartitionStatsInfo(partitionStats[0])
s.NoError(err)
s.Nil(partitionStatsMeta.partitionStatsInfos[channel][partitionID])
version4 := partitionStatsMeta.GetCurrentPartitionStatsVersion(collectionID, partitionID, channel)
s.Equal(emptyPartitionStatsVersion, version4)
}

View File

@ -210,6 +210,7 @@ var (
ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true)
ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false)
ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false)
ErrCleanPartitionStatsFail = newMilvusError("fail to clean partition Stats", 2316, true)
ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false)

View File

@ -1167,6 +1167,14 @@ func WrapErrClusteringCompactionMetaError(operation string, err error) error {
return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation))
}
func WrapErrCleanPartitionStatsFail(msg ...string) error {
err := error(ErrCleanPartitionStatsFail)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}
func WrapErrAnalyzeTaskNotFound(id int64) error {
return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id))
}