mirror of https://github.com/milvus-io/milvus.git
parent
33009d9250
commit
91d8f85df7
|
@ -134,6 +134,7 @@ func (it *IndexBuildTask) OnEnqueue() error {
|
|||
|
||||
func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
|
||||
fn := func() error {
|
||||
//TODO error handling need to be optimized, return Unrecoverable to avoid retry
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
_, values, versions, err := it.etcdKV.LoadWithPrefix2(it.req.MetaPath)
|
||||
if err != nil {
|
||||
|
@ -147,14 +148,15 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
|
|||
log.Debug("IndexNode checkIndexMeta load meta success", zap.Any("path", it.req.MetaPath), zap.Any("pre", pre))
|
||||
err = proto.Unmarshal([]byte(values[0]), &indexMeta)
|
||||
if err != nil {
|
||||
log.Error("IndexNode checkIndexMeta Unmarshal", zap.Error(err))
|
||||
log.Error("IndexNode failed to unmarshal index meta", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("IndexNode checkIndexMeta Unmarshal success", zap.Any("IndexMeta", indexMeta))
|
||||
if indexMeta.Version > it.req.Version || indexMeta.State == commonpb.IndexState_Finished {
|
||||
log.Warn("IndexNode checkIndexMeta Notify build index this version is not the latest version", zap.Any("version", it.req.Version))
|
||||
errMsg := fmt.Errorf("this task has been reassigned with indexBuildID %d", it.req.IndexBuildID)
|
||||
return errMsg
|
||||
log.Info("IndexNode checkIndexMeta version mismatch",
|
||||
zap.Any("req version", it.req.Version),
|
||||
zap.Any("index meta version", indexMeta.Version))
|
||||
return nil
|
||||
}
|
||||
if indexMeta.MarkDeleted {
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
|
@ -189,25 +191,27 @@ func (it *IndexBuildTask) checkIndexMeta(ctx context.Context, pre bool) error {
|
|||
var metaValue []byte
|
||||
metaValue, err = proto.Marshal(&indexMeta)
|
||||
if err != nil {
|
||||
log.Debug("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State),
|
||||
log.Warn("IndexNode", zap.Int64("indexBuildID", indexMeta.IndexBuildID), zap.Any("IndexState", indexMeta.State),
|
||||
zap.Any("proto.Marshal failed:", err))
|
||||
return err
|
||||
}
|
||||
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0],
|
||||
string(metaValue))
|
||||
log.Debug("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err))
|
||||
err = it.etcdKV.CompareVersionAndSwap(it.req.MetaPath, versions[0], string(metaValue))
|
||||
if err != nil {
|
||||
log.Warn("IndexNode checkIndexMeta CompareVersionAndSwap", zap.Error(err))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
err := retry.Do(ctx, fn, retry.Attempts(3))
|
||||
log.Debug("IndexNode checkIndexMeta final", zap.Error(err))
|
||||
if err != nil {
|
||||
log.Error("IndexNode failed to checkIndexMeta", zap.Error(err))
|
||||
}
|
||||
return err
|
||||
|
||||
}
|
||||
|
||||
// PreExecute does some checks before building the index, for example, whether the index has been deleted.
|
||||
func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
|
||||
log.Debug("IndexNode IndexBuildTask preExecute...")
|
||||
log.Debug("IndexNode IndexBuildTask preExecute...", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
sp, ctx := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-PreExecute")
|
||||
defer sp.Finish()
|
||||
return it.checkIndexMeta(ctx, true)
|
||||
|
@ -216,16 +220,15 @@ func (it *IndexBuildTask) PreExecute(ctx context.Context) error {
|
|||
// PostExecute does some checks after building the index, for example, whether the index has been deleted or
|
||||
// whether the index task is up to date.
|
||||
func (it *IndexBuildTask) PostExecute(ctx context.Context) error {
|
||||
log.Debug("IndexNode IndexBuildTask PostExecute...")
|
||||
log.Debug("IndexNode IndexBuildTask PostExecute...", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-PostExecute")
|
||||
defer sp.Finish()
|
||||
|
||||
return it.checkIndexMeta(ctx, false)
|
||||
}
|
||||
|
||||
// Execute actually performs the task of building an index.
|
||||
func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
||||
log.Debug("IndexNode IndexBuildTask Execute ...")
|
||||
log.Debug("IndexNode IndexBuildTask Execute ...", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
sp, _ := trace.StartSpanFromContextWithOperationName(ctx, "CreateIndex-Execute")
|
||||
defer sp.Finish()
|
||||
tr := timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildTask %d", it.req.IndexBuildID))
|
||||
|
@ -273,13 +276,17 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
|
||||
it.index, err = NewCIndex(typeParams, indexParams)
|
||||
if err != nil {
|
||||
log.Error("IndexNode IndexBuildTask Execute NewCIndex failed", zap.Error(err))
|
||||
log.Error("IndexNode IndexBuildTask Execute NewCIndex failed",
|
||||
zap.Int64("buildId", it.req.IndexBuildID),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
err = it.index.Delete()
|
||||
if err != nil {
|
||||
log.Warn("IndexNode IndexBuildTask Execute CIndexDelete Failed", zap.Error(err))
|
||||
log.Warn("IndexNode IndexBuildTask Execute CIndexDelete Failed",
|
||||
zap.Int64("buildId", it.req.IndexBuildID),
|
||||
zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -331,7 +338,7 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
// In this case, it.internalErr is no longer nil and err does not need to be returned, otherwise it.err will also be assigned.
|
||||
return nil
|
||||
}
|
||||
log.Debug("IndexNode load data success")
|
||||
log.Debug("IndexNode load data success", zap.Int64("buildId", it.req.IndexBuildID))
|
||||
tr.Record("loadKey done")
|
||||
|
||||
storageBlobs := getStorageBlobs(blobs)
|
||||
|
@ -417,26 +424,26 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
saveIndexFileFn := func() error {
|
||||
v, err := it.etcdKV.Load(it.req.MetaPath)
|
||||
if err != nil {
|
||||
log.Error("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err))
|
||||
log.Warn("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
err = proto.Unmarshal([]byte(v), &indexMeta)
|
||||
if err != nil {
|
||||
log.Error("IndexNode Unmarshal indexMeta error ", zap.Error(err))
|
||||
log.Warn("IndexNode Unmarshal indexMeta error ", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
//log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta))
|
||||
if indexMeta.Version > it.req.Version {
|
||||
log.Warn("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version),
|
||||
zap.Any("indexMeta.Version", indexMeta.Version))
|
||||
return errors.New("This task has been reassigned ")
|
||||
return errors.New("This task has been reassigned, check indexMeta.version and request ")
|
||||
}
|
||||
return saveBlob(savePath, value)
|
||||
}
|
||||
err := retry.Do(ctx, saveIndexFileFn, retry.Attempts(5))
|
||||
log.Debug("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath))
|
||||
if err != nil {
|
||||
log.Warn("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath))
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -453,7 +460,8 @@ func (it *IndexBuildTask) Execute(ctx context.Context) error {
|
|||
}
|
||||
tr.Record("save index file done")
|
||||
}
|
||||
log.Debug("IndexNode CreateIndex finished")
|
||||
log.Info("IndexNode CreateIndex successfully ", zap.Int64("collect", collectionID),
|
||||
zap.Int64("partition", partitionID), zap.Int64("segment", partitionID))
|
||||
tr.Elapse("all done")
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue