mirror of https://github.com/milvus-io/milvus.git
Load index meta in every retry (#15376)
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>pull/15380/head
parent
515fe962d5
commit
3f48a40ab0
|
@ -201,55 +201,56 @@ func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta) TaskStat
|
|||
// if failed, IndexNode will panic to inform indexcoord.
|
||||
func (it *IndexBuildTask) saveIndexMeta(ctx context.Context) error {
|
||||
defer it.tr.Record("IndexNode IndexBuildTask saveIndexMeta")
|
||||
indexMeta, version, err := it.loadIndexMeta(ctx)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to load index meta, IndexBuildID=%d", indexMeta.IndexBuildID)
|
||||
panic(errMsg)
|
||||
}
|
||||
|
||||
taskState := it.updateTaskState(indexMeta)
|
||||
if taskState == TaskStateAbandon {
|
||||
log.Info("IndexNode IndexBuildTask saveIndexMeta", zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
|
||||
return nil
|
||||
}
|
||||
|
||||
indexMeta.IndexFilePaths = it.savePaths
|
||||
indexMeta.SerializeSize = it.serializedSize
|
||||
|
||||
if taskState == TaskStateFailed {
|
||||
log.Error("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Failed",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.err))
|
||||
indexMeta.State = commonpb.IndexState_Failed
|
||||
indexMeta.FailReason = it.err.Error()
|
||||
} else if taskState == TaskStateRetry {
|
||||
log.Info("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Unissued",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.internalErr))
|
||||
indexMeta.State = commonpb.IndexState_Unissued
|
||||
} else { // TaskStateNormal
|
||||
log.Info("IndexNode IndexBuildTask saveIndexmeta indexMeta.state to IndexState_Unissued",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
}
|
||||
|
||||
var metaValue []byte
|
||||
metaValue, err = proto.Marshal(indexMeta)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta, IndexBuildID=%d, err=%s",
|
||||
indexMeta.IndexBuildID, err.Error())
|
||||
panic(errMsg)
|
||||
}
|
||||
|
||||
strMetaValue := string(metaValue)
|
||||
|
||||
fn := func() error {
|
||||
indexMeta, version, err := it.loadIndexMeta(ctx)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to load index meta, IndexBuildID=%d", indexMeta.IndexBuildID)
|
||||
panic(errMsg)
|
||||
}
|
||||
|
||||
taskState := it.updateTaskState(indexMeta)
|
||||
if taskState == TaskStateAbandon {
|
||||
log.Info("IndexNode IndexBuildTask saveIndexMeta", zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
|
||||
return nil
|
||||
}
|
||||
|
||||
indexMeta.IndexFilePaths = it.savePaths
|
||||
indexMeta.SerializeSize = it.serializedSize
|
||||
|
||||
if taskState == TaskStateFailed {
|
||||
log.Error("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Failed",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.err))
|
||||
indexMeta.State = commonpb.IndexState_Failed
|
||||
indexMeta.FailReason = it.err.Error()
|
||||
} else if taskState == TaskStateRetry {
|
||||
log.Info("IndexNode IndexBuildTask saveIndexMeta set indexMeta.state to IndexState_Unissued",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID), zap.Error(it.internalErr))
|
||||
indexMeta.State = commonpb.IndexState_Unissued
|
||||
} else { // TaskStateNormal
|
||||
log.Info("IndexNode IndexBuildTask saveIndexmeta indexMeta.state to IndexState_Unissued",
|
||||
zap.String("TaskState", taskState.String()),
|
||||
zap.Int64("IndexBuildID", indexMeta.IndexBuildID))
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
}
|
||||
|
||||
var metaValue []byte
|
||||
metaValue, err = proto.Marshal(indexMeta)
|
||||
if err != nil {
|
||||
errMsg := fmt.Sprintf("IndexNode IndexBuildTask saveIndexMeta fail to marshal index meta, IndexBuildID=%d, err=%s",
|
||||
indexMeta.IndexBuildID, err.Error())
|
||||
panic(errMsg)
|
||||
}
|
||||
|
||||
strMetaValue := string(metaValue)
|
||||
|
||||
return it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, version, strMetaValue)
|
||||
}
|
||||
|
||||
err = retry.Do(ctx, fn, retry.Attempts(3))
|
||||
err := retry.Do(ctx, fn, retry.Attempts(3))
|
||||
if err != nil {
|
||||
panic(err.Error())
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue