From f6621dc8471f4ad846ea3cef319fd228003d5bd3 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 9 Sep 2021 17:09:08 +0800 Subject: [PATCH] Increase code coverage for indexnode component (#7520) Signed-off-by: cai.zhang Co-authored-by: edward.zeng --- internal/indexnode/indexnode_test.go | 283 +++++++++++++++++++++++++++ internal/indexnode/task_scheduler.go | 14 +- 2 files changed, 290 insertions(+), 7 deletions(-) diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index e27b1c91ef..dcd705a71f 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -18,6 +18,8 @@ import ( "testing" "time" + "github.com/milvus-io/milvus/internal/proto/milvuspb" + "github.com/milvus-io/milvus/internal/log" "go.uber.org/zap" @@ -182,6 +184,8 @@ func TestIndexNode(t *testing.T) { in.kv.Remove(k) } }() + + defer in.etcdKV.RemoveWithPrefix(metaPath1) }) t.Run("CreateIndex BinaryVector", func(t *testing.T) { var insertCodec storage.InsertCodec @@ -293,6 +297,8 @@ func TestIndexNode(t *testing.T) { in.kv.Remove(k) } }() + + defer in.etcdKV.RemoveWithPrefix(metaPath2) }) t.Run("Create Deleted_Index", func(t *testing.T) { @@ -410,6 +416,8 @@ func TestIndexNode(t *testing.T) { in.kv.Remove(k) } }() + + defer in.etcdKV.RemoveWithPrefix(metaPath3) }) t.Run("GetComponentStates", func(t *testing.T) { @@ -444,3 +452,278 @@ func TestIndexNode(t *testing.T) { err = in.Stop() assert.Nil(t, err) } + +func TestCreateIndexFailed(t *testing.T) { + ctx := context.Background() + + indexID := UniqueID(1001) + indexBuildID1 := UniqueID(54322) + indexBuildID2 := UniqueID(54323) + floatVectorFieldID := UniqueID(102) + tsFieldID := UniqueID(1) + collectionID := UniqueID(202) + floatVectorFieldName := "float_vector" + metaPath1 := "FloatVector1" + metaPath2 := "FloatVector2" + floatVectorBinlogPath := "float_vector_binlog" + + in, err := NewIndexNode(ctx) + assert.Nil(t, err) + Params.Init() + + err = in.Register() + assert.Nil(t, err) + err = in.Init() + assert.Nil(t, err) + + err = in.Start() + assert.Nil(t, err) + + t.Run("CreateIndex error", func(t *testing.T) { + var insertCodec storage.InsertCodec + defer insertCodec.Close() + + insertCodec.Schema = &etcdpb.CollectionMeta{ + ID: collectionID, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: floatVectorFieldID, + Name: floatVectorFieldName, + IsPrimaryKey: false, + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + } + data := make(map[UniqueID]storage.FieldData) + tsData := make([]int64, nb) + for i := 0; i < nb; i++ { + tsData[i] = int64(i + 100) + } + data[tsFieldID] = &storage.Int64FieldData{ + NumRows: []int64{nb}, + Data: tsData, + } + data[floatVectorFieldID] = &storage.FloatVectorFieldData{ + NumRows: []int64{nb}, + Data: generateFloatVectors(), + Dim: dim, + } + insertData := storage.InsertData{ + Data: data, + Infos: []storage.BlobInfo{ + { + Length: 10, + }, + }, + } + binLogs, _, err := insertCodec.Serialize(999, 888, &insertData) + assert.Nil(t, err) + kvs := make(map[string]string, len(binLogs)) + paths := make([]string, 0, len(binLogs)) + for i, blob := range binLogs { + key := path.Join(floatVectorBinlogPath, strconv.Itoa(i)) + paths = append(paths, key) + kvs[key] = string(blob.Value[:]) + } + err = in.kv.MultiSave(kvs) + assert.Nil(t, err) + + indexMeta := &indexpb.IndexMeta{ + IndexBuildID: indexBuildID1, + State: commonpb.IndexState_InProgress, + Version: 1, + } + + value := proto.MarshalTextString(indexMeta) + err = in.etcdKV.Save(metaPath1, value) + assert.Nil(t, err) + req := &indexpb.CreateIndexRequest{ + IndexBuildID: indexBuildID1, + IndexName: "FloatVector", + IndexID: indexID, + Version: 1, + MetaPath: metaPath1, + DataPaths: paths, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "8", + }, + { + Key: "dim", + Value: "8", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", + Value: "IVF_SQ8", + }, + { + Key: "params", + Value: "{\"nlist\": 128}", + }, + { + Key: "metric_type", + Value: "L2", + }, + }, + } + + status, err := in.CreateIndex(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + + value, err = in.etcdKV.Load(metaPath1) + assert.Nil(t, err) + indexMetaTmp := indexpb.IndexMeta{} + err = proto.UnmarshalText(value, &indexMetaTmp) + assert.Nil(t, err) + if indexMetaTmp.State != commonpb.IndexState_Failed { + time.Sleep(10 * time.Second) + value, err = in.etcdKV.Load(metaPath1) + assert.Nil(t, err) + indexMetaTmp2 := indexpb.IndexMeta{} + err = proto.UnmarshalText(value, &indexMetaTmp2) + assert.Nil(t, err) + assert.Equal(t, commonpb.IndexState_Failed, indexMetaTmp2.State) + defer in.kv.MultiRemove(indexMetaTmp2.IndexFilePaths) + } + defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths) + defer func() { + for k := range kvs { + in.kv.Remove(k) + } + }() + + indexMeta2 := &indexpb.IndexMeta{ + IndexBuildID: indexBuildID2, + State: commonpb.IndexState_InProgress, + Version: 1, + } + + value2 := proto.MarshalTextString(indexMeta2) + err = in.etcdKV.Save(metaPath2, value2) + assert.Nil(t, err) + + req2 := &indexpb.CreateIndexRequest{ + IndexBuildID: indexBuildID2, + IndexName: "FloatVector", + IndexID: indexID, + Version: 1, + MetaPath: metaPath2, + DataPaths: paths, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", + Value: "8", + }, + { + Key: "params", + Value: "value", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", + Value: "IVF_SQ8", + }, + { + Key: "params", + Value: "{\"nlist\": 128}", + }, + { + Key: "metric_type", + Value: "L2", + }, + }, + } + + status, err = in.CreateIndex(ctx, req2) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) + + value, err = in.etcdKV.Load(metaPath2) + assert.Nil(t, err) + indexMetaTmp2 := indexpb.IndexMeta{} + err = proto.UnmarshalText(value, &indexMetaTmp2) + assert.Nil(t, err) + if indexMetaTmp.State != commonpb.IndexState_Failed { + time.Sleep(10 * time.Second) + value, err = in.etcdKV.Load(metaPath2) + assert.Nil(t, err) + indexMetaTmp2 := indexpb.IndexMeta{} + err = proto.UnmarshalText(value, &indexMetaTmp2) + assert.Nil(t, err) + assert.Equal(t, commonpb.IndexState_Failed, indexMetaTmp2.State) + defer in.kv.MultiRemove(indexMetaTmp2.IndexFilePaths) + } + defer in.kv.MultiRemove(indexMetaTmp2.IndexFilePaths) + defer func() { + for k := range kvs { + in.kv.Remove(k) + } + }() + }) + + t.Run("CreateIndex server not healthy", func(t *testing.T) { + in.UpdateStateCode(internalpb.StateCode_Initializing) + status, err := in.CreateIndex(ctx, &indexpb.CreateIndexRequest{}) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + }) + + err = in.Stop() + assert.Nil(t, err) +} + +func TestIndexNode_Error(t *testing.T) { + ctx := context.Background() + + in, err := NewIndexNode(ctx) + assert.Nil(t, err) + Params.Init() + + err = in.Register() + assert.Nil(t, err) + err = in.Init() + assert.Nil(t, err) + + err = in.Start() + assert.Nil(t, err) + + in.UpdateStateCode(internalpb.StateCode_Initializing) + + t.Run("CreateIndex", func(t *testing.T) { + status, err := in.CreateIndex(ctx, &indexpb.CreateIndexRequest{}) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode) + }) + + t.Run("GetMetrics", func(t *testing.T) { + resp, err := in.GetMetrics(ctx, &milvuspb.GetMetricsRequest{}) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + in.UpdateStateCode(internalpb.StateCode_Healthy) + + t.Run("Request Illegal", func(t *testing.T) { + resp, err := in.GetMetrics(ctx, &milvuspb.GetMetricsRequest{}) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + t.Run("MetricsTypeIllegal", func(t *testing.T) { + req, err := metricsinfo.ConstructRequestByMetricType("GetIndexNodeMetrics") + assert.Nil(t, err) + resp, err := in.GetMetrics(ctx, req) + assert.Nil(t, err) + assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode) + }) + + err = in.Stop() + assert.Nil(t, err) +} diff --git a/internal/indexnode/task_scheduler.go b/internal/indexnode/task_scheduler.go index e6fa4b114e..4cca9877d6 100644 --- a/internal/indexnode/task_scheduler.go +++ b/internal/indexnode/task_scheduler.go @@ -199,13 +199,13 @@ func NewTaskScheduler(ctx context.Context, return s, nil } -func (sched *TaskScheduler) setParallelism(parallel int) { - if parallel <= 0 { - log.Debug("IndexNode can not set parallelism to less than zero!") - return - } - sched.buildParallel = parallel -} +//func (sched *TaskScheduler) setParallelism(parallel int) { +// if parallel <= 0 { +// log.Debug("IndexNode can not set parallelism to less than zero!") +// return +// } +// sched.buildParallel = parallel +//} func (sched *TaskScheduler) scheduleIndexBuildTask() []task { ret := make([]task, 0)