mirror of https://github.com/milvus-io/milvus.git
Retry index building task in case of internal error such as minio failure (#9552)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/9642/head
parent
4908b1c461
commit
7801357dd3
|
@ -58,10 +58,11 @@ type task interface {
|
|||
|
||||
// BaseTask is an basic instance of task.
|
||||
type BaseTask struct {
|
||||
done chan error
|
||||
ctx context.Context
|
||||
id UniqueID
|
||||
err error
|
||||
done chan error
|
||||
ctx context.Context
|
||||
id UniqueID
|
||||
err error
|
||||
internalErr error
|
||||
}
|
||||
|
||||
// SetError sets an error to task.
|
||||
|
@ -152,7 +153,8 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
|
|||
log.Debug("IndexNode checkIndexMeta Unmarshal success", zap.Any("IndexMeta", indexMeta))
|
||||
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
|
||||
log.Warn("IndexNode checkIndexMeta Notify build index this version is not the latest version", zap.Any("version", it.req.Version))
|
||||
return nil
|
||||
errMsg := fmt.Errorf("this task has been reassigned with indexBuildID %d", it.req.IndexBuildID)
|
||||
return errMsg
|
||||
}
|
||||
if indexMeta.MarkDeleted {
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
|
@ -173,11 +175,16 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
|
|||
}
|
||||
indexMeta.IndexFilePaths = it.savePaths
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
// Under normal circumstances, it.err and it.internalErr will not be non-nil at the same time, but for the sake of insurance, the else judgment is added.
|
||||
if it.err != nil {
|
||||
log.Error("IndexNode CreateIndex Failed", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", err))
|
||||
log.Error("IndexNode CreateIndex Failed and can not be retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.err))
|
||||
indexMeta.State = commonpb.IndexState_Failed
|
||||
indexMeta.FailReason = it.err.Error()
|
||||
} else if it.internalErr != nil {
|
||||
log.Error("IndexNode CreateIndex Failed, but it can retried", zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Any("err", it.internalErr))
|
||||
indexMeta.State = commonpb.IndexState_Unissued
|
||||
}
|
||||
|
||||
log.Debug("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State))
|
||||
var metaValue []byte
|
||||
metaValue, err = proto.Marshal(&indexMeta)
|
||||
|
@ -319,7 +326,10 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
err = funcutil.ProcessFuncParallel(len(toLoadDataPaths), runtime.NumCPU(), loadKey, "loadKey")
|
||||
if err != nil {
|
||||
return err
|
||||
log.Warn("loadKey from minio failed", zap.Error(err))
|
||||
it.internalErr = err
|
||||
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
|
||||
return nil
|
||||
}
|
||||
log.Debug("IndexNode load data success")
|
||||
tr.Record("loadKey done")
|
||||
|
@ -436,7 +446,10 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
err = funcutil.ProcessFuncParallel(len(serializedIndexBlobs), runtime.NumCPU(), saveIndexFile, "saveIndexFile")
|
||||
if err != nil {
|
||||
return err
|
||||
log.Warn("saveIndexFile to minio failed", zap.Error(err))
|
||||
it.internalErr = err
|
||||
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
|
||||
return nil
|
||||
}
|
||||
tr.Record("save index file done")
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue