Refine clustering_compaction_task retry mechanism (#34194)

#32939

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/34276/head
wayblink 2024-06-30 20:22:09 +08:00 committed by GitHub
parent 3030e4625e
commit 73ffc1b424
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 61 additions and 16 deletions

View File

@ -94,8 +94,7 @@ func (t *clusteringCompactionTask) Process() bool {
Observe(float64(elapse)) Observe(float64(elapse))
} }
} }
// todo debug log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
log.Info("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
} }
@ -186,7 +185,7 @@ func (t *clusteringCompactionTask) processPipelining() error {
err := t.meta.UpdateSegmentsInfo(operators...) err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil { if err != nil {
log.Warn("fail to set segment level to L2", zap.Error(err)) log.Warn("fail to set segment level to L2", zap.Error(err))
return err return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo before compaction executing", err)
} }
if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) { if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) {
@ -211,7 +210,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
if err != nil || result == nil { if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) { if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err)) log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// todo reassign node ID // setNodeID(NullNodeID) to trigger reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil return nil
} }
@ -223,9 +222,9 @@ func (t *clusteringCompactionTask) processExecuting() error {
t.result = result t.result = result
result := t.result result := t.result
if len(result.GetSegments()) == 0 { if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results") log.Warn("illegal compaction results, this should not happen")
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
return err return merr.WrapErrCompactionResult("compaction result is empty")
} }
resultSegmentIDs := lo.Map(result.Segments, func(segment *datapb.CompactionSegment, _ int) int64 { resultSegmentIDs := lo.Map(result.Segments, func(segment *datapb.CompactionSegment, _ int) int64 {
@ -247,6 +246,8 @@ func (t *clusteringCompactionTask) processExecuting() error {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout)) err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil { if err == nil {
return t.processFailedOrTimeout() return t.processFailedOrTimeout()
} else {
return err
} }
} }
return nil return nil
@ -294,11 +295,11 @@ func (t *clusteringCompactionTask) completeTask() error {
if err != nil { if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err) return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
} }
var operators []UpdateOperator var operators []UpdateOperator
for _, segID := range t.GetResultSegments() { for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID())) operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID()))
} }
err = t.meta.UpdateSegmentsInfo(operators...) err = t.meta.UpdateSegmentsInfo(operators...)
if err != nil { if err != nil {
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err) return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err)
@ -306,7 +307,7 @@ func (t *clusteringCompactionTask) completeTask() error {
err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID()) err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID())
if err != nil { if err != nil {
return err return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
} }
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed)) return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
} }
@ -315,13 +316,13 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetAnalyzeTaskID()) analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetAnalyzeTaskID())
if analyzeTask == nil { if analyzeTask == nil {
log.Warn("analyzeTask not found", zap.Int64("id", t.GetAnalyzeTaskID())) log.Warn("analyzeTask not found", zap.Int64("id", t.GetAnalyzeTaskID()))
return errors.New("analyzeTask not found") return merr.WrapErrAnalyzeTaskNotFound(t.GetAnalyzeTaskID()) // retryable
} }
log.Info("check analyze task state", zap.Int64("id", t.GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String())) log.Info("check analyze task state", zap.Int64("id", t.GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String()))
switch analyzeTask.State { switch analyzeTask.State {
case indexpb.JobState_JobStateFinished: case indexpb.JobState_JobStateFinished:
if analyzeTask.GetCentroidsFile() == "" { if analyzeTask.GetCentroidsFile() == "" {
// fake finished vector clustering is not supported in opensource // not retryable, fake finished vector clustering is not supported in opensource
return merr.WrapErrClusteringCompactionNotSupportVector() return merr.WrapErrClusteringCompactionNotSupportVector()
} else { } else {
t.AnalyzeVersion = analyzeTask.GetVersion() t.AnalyzeVersion = analyzeTask.GetVersion()
@ -354,7 +355,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err := t.meta.UpdateSegmentsInfo(operators...) err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil { if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err)) log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return err return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
} }
t.resetSegmentCompacting() t.resetSegmentCompacting()
@ -369,6 +370,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo) err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil { if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err)) log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("CleanPartitionStatsInfo", err)
} }
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
@ -404,14 +406,32 @@ func (t *clusteringCompactionTask) doAnalyze() error {
} }
func (t *clusteringCompactionTask) doCompact() error { func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
if t.NeedReAssignNodeID() { if t.NeedReAssignNodeID() {
return errors.New("not assign nodeID") return errors.New("not assign nodeID")
} }
var err error // check whether the compaction plan is already submitted considering
// datacoord may crash between call sessions.Compaction and updateTaskState to executing
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// setNodeID(NullNodeID) to trigger reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil
}
return merr.WrapErrGetCompactionPlanResultFail(err)
}
if result != nil {
log.Info("compaction already submitted")
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
return nil
}
t.plan, err = t.BuildCompactionRequest() t.plan, err = t.BuildCompactionRequest()
if err != nil { if err != nil {
err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error())) log.Warn("Failed to BuildCompactionRequest", zap.Error(err))
return err2 return merr.WrapErrBuildCompactionRequestFail(err) // retryable
} }
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil { if err != nil {
@ -460,7 +480,8 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO
task := t.ShadowClone(opts...) task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task) err := t.saveTaskMeta(task)
if err != nil { if err != nil {
return err log.Warn("Failed to saveTaskMeta", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable
} }
t.CompactionTask = task t.CompactionTask = task
return nil return nil

View File

@ -186,6 +186,10 @@ var (
ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2308, true) ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2308, true)
ErrClusteringCompactionGetCollectionFail = newMilvusError("fail to get collection in compaction", 2309, true) ErrClusteringCompactionGetCollectionFail = newMilvusError("fail to get collection in compaction", 2309, true)
ErrCompactionResultNotFound = newMilvusError("compaction result not found", 2310, false) ErrCompactionResultNotFound = newMilvusError("compaction result not found", 2310, false)
ErrAnalyzeTaskNotFound = newMilvusError("analyze task not found", 2311, true)
ErrBuildCompactionRequestFail = newMilvusError("fail to build CompactionRequest", 2312, true)
ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true)
ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false)
// General // General
ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false) ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false)

View File

@ -1120,3 +1120,23 @@ func WrapErrClusteringCompactionSubmitTaskFail(taskType string, err error) error
func WrapErrClusteringCompactionMetaError(operation string, err error) error { func WrapErrClusteringCompactionMetaError(operation string, err error) error {
return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation)) return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation))
} }
func WrapErrAnalyzeTaskNotFound(id int64) error {
return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id))
}
func WrapErrBuildCompactionRequestFail(err error) error {
return wrapFieldsWithDesc(ErrBuildCompactionRequestFail, err.Error())
}
func WrapErrGetCompactionPlanResultFail(err error) error {
return wrapFieldsWithDesc(ErrGetCompactionPlanResultFail, err.Error())
}
func WrapErrCompactionResult(msg ...string) error {
err := error(ErrCompactionResult)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}