fix: Fix accidentlly exit MixCompaction task loop (#34688)

See also: #33431, #34460

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/34724/head
XuanYang-cn 2024-07-16 15:57:42 +08:00 committed by GitHub
parent cc8f7aa110
commit fd7221f4be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 52 additions and 26 deletions

View File

@ -549,12 +549,20 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
log := log.With(zap.Int64("planID", task.GetPlanID()), zap.Int64("triggerID", task.GetTriggerID()), zap.Int64("collectionID", task.GetCollectionID()), zap.String("type", task.GetType().String()))
t, err := c.createCompactTask(task)
if err != nil {
// Conflict is normal
if errors.Is(err, merr.ErrCompactionPlanConflict) {
log.RatedInfo(60, "Failed to create compaction task, compaction plan conflict", zap.Error(err))
} else {
log.Warn("Failed to create compaction task, unable to create compaction task", zap.Error(err))
}
return err
}
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
log.Warn("Failed to enqueue compaction task, unable to save task meta", zap.Error(err))
return err
}
c.submitTask(t)
@ -608,7 +616,7 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
for _, t := range tasks {
nodeID := c.pickAnyNode(slots)
if nodeID == NullNodeID {
log.Info("cannot find datanode for compaction task",
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()))
continue
}

View File

@ -44,6 +44,8 @@ type l0CompactionTask struct {
meta CompactionMeta
}
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
func (t *l0CompactionTask) Process() bool {
switch t.GetState() {
case datapb.CompactionTaskState_pipelining:
@ -275,7 +277,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
for _, segInfo := range sealedSegments {
// TODO should allow parallel executing of l0 compaction
if segInfo.isCompacting {
log.Info("l0 compaction candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID()))
log.Warn("l0CompactionTask candidate segment is compacting", zap.Int64("segmentID", segInfo.GetID()))
return nil, merr.WrapErrCompactionPlanConflict(fmt.Sprintf("segment %d is compacting", segInfo.GetID()))
}
}
@ -292,7 +294,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
})
plan.SegmentBinlogs = append(plan.SegmentBinlogs, sealedSegBinlogs...)
log.Info("Compaction handler refreshed level zero compaction plan",
log.Info("l0CompactionTask refreshed level zero compaction plan",
zap.Any("target position", t.GetPos()),
zap.Any("target segments count", len(sealedSegBinlogs)))
return plan, nil

View File

@ -30,29 +30,38 @@ func (t *mixCompactionTask) processPipelining() bool {
if t.NeedReAssignNodeID() {
return false
}
log := log.With(zap.Int64("triggerID", t.GetTriggerID()), zap.Int64("nodeID", t.GetNodeID()))
var err error
t.plan, err = t.BuildCompactionRequest()
// Segment not found
if err != nil {
err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return err2 == nil
log.Warn("mixCompactionTask failed to build compaction request", zap.Error(err))
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
}
return t.processFailed()
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
err = t.sessions.Compaction(context.TODO(), t.GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
log.Warn("mixCompactionTask failed to notify compaction tasks to DataNode", zap.Int64("planID", t.GetPlanID()), zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
return false
}
func (t *mixCompactionTask) processMetaSaved() bool {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
if err == nil {
return t.processCompleted()
if err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)); err != nil {
log.Warn("mixCompactionTask failed to proccessMetaSaved", zap.Error(err))
return false
}
return false
return t.processCompleted()
}
func (t *mixCompactionTask) processExecuting() bool {
@ -62,44 +71,49 @@ func (t *mixCompactionTask) processExecuting() bool {
if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
log.Warn("mixCompactionTask failed to get compaction result", zap.Error(err))
return false
}
switch result.GetState() {
case datapb.CompactionTaskState_executing:
if t.checkTimeout() {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processTimeout()
if err != nil {
log.Warn("mixCompactionTask failed to updateAndSaveTaskMeta", zap.Error(err))
return false
}
return t.processTimeout()
}
return false
case datapb.CompactionTaskState_completed:
t.result = result
if len(result.GetSegments()) == 0 || len(result.GetSegments()) > 1 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
}
return t.processFailed()
}
err2 := t.saveSegmentMeta()
if err2 != nil {
if errors.Is(err2, merr.ErrIllegalCompactionPlan) {
err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err3 != nil {
log.Warn("fail to updateAndSaveTaskMeta")
if err := t.saveSegmentMeta(); err != nil {
log.Warn("mixCompactionTask failed to save segment meta", zap.Error(err))
if errors.Is(err, merr.ErrIllegalCompactionPlan) {
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
log.Warn("mixCompactionTask failed to setState failed", zap.Error(err))
return false
}
return true
return t.processFailed()
}
return false
}
segments := []UniqueID{t.newSegment.GetID()}
err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments))
if err3 == nil {
return t.processMetaSaved()
err = t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments))
if err != nil {
log.Warn("mixCompaction failed to setState meta saved", zap.Error(err))
return false
}
return false
return t.processMetaSaved()
case datapb.CompactionTaskState_failed:
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err != nil {
@ -132,6 +146,8 @@ func (t *mixCompactionTask) saveSegmentMeta() error {
return nil
}
// Note: return True means exit this state machine.
// ONLY return True for processCompleted or processFailed
func (t *mixCompactionTask) Process() bool {
switch t.GetState() {
case datapb.CompactionTaskState_pipelining: