mirror of https://github.com/milvus-io/milvus.git
Ignore task when IndexNode load empty index meta (#18266)
Signed-off-by: Cai.Zhang <cai.zhang@zilliz.com>pull/18267/head
parent
68661ddc8d
commit
93d9dfad1a
|
@ -25,22 +25,18 @@ import (
|
|||
"runtime/debug"
|
||||
"strconv"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/metrics"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/schemapb"
|
||||
"github.com/milvus-io/milvus/internal/storage"
|
||||
"github.com/milvus-io/milvus/internal/util/funcutil"
|
||||
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
|
||||
"github.com/milvus-io/milvus/internal/util/retry"
|
||||
"github.com/milvus-io/milvus/internal/util/timerecord"
|
||||
"github.com/milvus-io/milvus/internal/util/trace"
|
||||
|
@ -127,7 +123,7 @@ type IndexBuildTask struct {
|
|||
BaseTask
|
||||
cm storage.ChunkManager
|
||||
index indexcgowrapper.CodecIndex
|
||||
etcdKV *etcdkv.EtcdKV
|
||||
etcdKV kv.MetaKv
|
||||
savePaths []string
|
||||
req *indexpb.CreateIndexRequest
|
||||
nodeID UniqueID
|
||||
|
@ -171,7 +167,7 @@ func (it *IndexBuildTask) OnEnqueue() error {
|
|||
|
||||
// loadIndexMeta load meta from etcd.
|
||||
func (it *IndexBuildTask) loadIndexMeta(ctx context.Context) (*indexpb.IndexMeta, int64, error) {
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
indexMeta := &indexpb.IndexMeta{}
|
||||
var source int64
|
||||
fn := func() error {
|
||||
//TODO error handling need to be optimized, return Unrecoverable to avoid retry
|
||||
|
@ -180,9 +176,12 @@ func (it *IndexBuildTask) loadIndexMeta(ctx context.Context) (*indexpb.IndexMeta
|
|||
return err
|
||||
}
|
||||
if len(values) == 0 {
|
||||
return fmt.Errorf("IndexNode loadIndexMeta get empty")
|
||||
log.Warn("IndexNode loadIndexMeta get empty, maybe the task has been recycled, set task to abandon",
|
||||
zap.Int64("buildID", it.req.IndexBuildID))
|
||||
it.SetState(TaskStateAbandon)
|
||||
return nil
|
||||
}
|
||||
err = proto.Unmarshal([]byte(values[0]), &indexMeta)
|
||||
err = proto.Unmarshal([]byte(values[0]), indexMeta)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -193,10 +192,13 @@ func (it *IndexBuildTask) loadIndexMeta(ctx context.Context) (*indexpb.IndexMeta
|
|||
if err != nil {
|
||||
return nil, -1, err
|
||||
}
|
||||
return &indexMeta, source, nil
|
||||
return indexMeta, source, nil
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) updateTaskState(indexMeta *indexpb.IndexMeta, err error) TaskState {
|
||||
if it.GetState() == TaskStateAbandon {
|
||||
return it.GetState()
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("IndexNode IndexBuildTask internal err, mark the task as retry", zap.Int64("buildID", it.req.IndexBuildID), zap.Error(err))
|
||||
it.SetState(TaskStateRetry)
|
||||
|
@ -506,23 +508,20 @@ func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob)
|
|||
strconv.Itoa(int(it.partitionID)), strconv.Itoa(int(it.segmentID)), key)
|
||||
}
|
||||
|
||||
it.savePaths = make([]string, blobCnt)
|
||||
savePaths := make([]string, blobCnt)
|
||||
saveIndexFile := func(idx int) error {
|
||||
blob := blobs[idx]
|
||||
savePath := getSavePathByKey(blob.Key)
|
||||
saveIndexFileFn := func() error {
|
||||
v, err := it.etcdKV.Load(it.req.MetaPath)
|
||||
indexMeta, _, err := it.loadIndexMeta(ctx)
|
||||
if err != nil {
|
||||
log.Warn("IndexNode load meta failed", zap.Any("path", it.req.MetaPath), zap.Error(err))
|
||||
log.Warn("IndexNode load meta failed", zap.String("path", it.req.MetaPath), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
err = proto.Unmarshal([]byte(v), &indexMeta)
|
||||
if err != nil {
|
||||
log.Warn("IndexNode Unmarshal indexMeta error ", zap.Error(err))
|
||||
return err
|
||||
if it.GetState() != TaskStateNormal {
|
||||
log.Warn("IndexNode task state is not normal, skip task", zap.Int64("buildID", it.req.IndexBuildID))
|
||||
return nil
|
||||
}
|
||||
//log.Debug("IndexNode Unmarshal indexMeta success ", zap.Any("meta", indexMeta))
|
||||
if indexMeta.IndexVersion > it.req.Version {
|
||||
log.Warn("IndexNode try saveIndexFile failed req.Version is low", zap.Any("req.Version", it.req.Version),
|
||||
zap.Any("indexMeta.Version", indexMeta.IndexVersion))
|
||||
|
@ -535,12 +534,18 @@ func (it *IndexBuildTask) saveIndex(ctx context.Context, blobs []*storage.Blob)
|
|||
log.Warn("IndexNode try saveIndexFile final", zap.Error(err), zap.Any("savePath", savePath))
|
||||
return err
|
||||
}
|
||||
it.savePaths[idx] = savePath
|
||||
savePaths[idx] = savePath
|
||||
return nil
|
||||
}
|
||||
|
||||
// If an error occurs, return the error that the task state will be set to retry.
|
||||
return funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile")
|
||||
if err := funcutil.ProcessFuncParallel(blobCnt, runtime.NumCPU(), saveIndexFile, "saveIndexFile"); err != nil {
|
||||
log.Error("saveIndexFile fail", zap.Int64("buildID", it.req.IndexBuildID))
|
||||
return err
|
||||
}
|
||||
it.savePaths = savePaths
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *IndexBuildTask) releaseMemory() {
|
||||
|
|
|
@ -24,6 +24,8 @@ import (
|
|||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
|
@ -125,3 +127,70 @@ func TestIndexBuildTask_Execute(t *testing.T) {
|
|||
|
||||
})
|
||||
}
|
||||
|
||||
type mockETCDKV struct {
|
||||
kv.MetaKv
|
||||
|
||||
loadWithPrefix2 func(key string) ([]string, []string, []int64, error)
|
||||
}
|
||||
|
||||
func (mk *mockETCDKV) LoadWithPrefix2(key string) ([]string, []string, []int64, error) {
|
||||
return mk.loadWithPrefix2(key)
|
||||
}
|
||||
|
||||
func TestIndexBuildTask_loadIndexMeta(t *testing.T) {
|
||||
t.Run("load empty meta", func(t *testing.T) {
|
||||
indexTask := &IndexBuildTask{
|
||||
etcdKV: &mockETCDKV{
|
||||
loadWithPrefix2: func(key string) ([]string, []string, []int64, error) {
|
||||
return []string{}, []string{}, []int64{}, nil
|
||||
},
|
||||
},
|
||||
req: &indexpb.CreateIndexRequest{
|
||||
IndexBuildID: 1,
|
||||
DataPaths: []string{"path1", "path2"},
|
||||
},
|
||||
}
|
||||
|
||||
indexMeta, revision, err := indexTask.loadIndexMeta(context.Background())
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, int64(0), revision)
|
||||
assert.Equal(t, TaskStateAbandon, indexTask.GetState())
|
||||
|
||||
indexTask.updateTaskState(indexMeta, nil)
|
||||
assert.Equal(t, TaskStateAbandon, indexTask.GetState())
|
||||
})
|
||||
}
|
||||
|
||||
func TestIndexBuildTask_saveIndex(t *testing.T) {
|
||||
t.Run("save index failed", func(t *testing.T) {
|
||||
indexTask := &IndexBuildTask{
|
||||
etcdKV: &mockETCDKV{
|
||||
loadWithPrefix2: func(key string) ([]string, []string, []int64, error) {
|
||||
return []string{}, []string{}, []int64{}, errors.New("error")
|
||||
},
|
||||
},
|
||||
partitionID: 1,
|
||||
segmentID: 1,
|
||||
req: &indexpb.CreateIndexRequest{
|
||||
IndexBuildID: 1,
|
||||
DataPaths: []string{"path1", "path2"},
|
||||
Version: 1,
|
||||
},
|
||||
}
|
||||
|
||||
blobs := []*storage.Blob{
|
||||
{
|
||||
Key: "key1",
|
||||
Value: []byte("value1"),
|
||||
},
|
||||
{
|
||||
Key: "key2",
|
||||
Value: []byte("value2"),
|
||||
},
|
||||
}
|
||||
|
||||
err := indexTask.saveIndex(context.Background(), blobs)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue