Increase code coverage for indexnode component (#7520)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>

Co-authored-by: edward.zeng <jie.zeng@zilliz.com>
pull/7657/head
cai.zhang 2021-09-09 17:09:08 +08:00 committed by GitHub
parent 3bd971c6bc
commit f6621dc847
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 290 additions and 7 deletions

View File

@ -18,6 +18,8 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
@ -182,6 +184,8 @@ func TestIndexNode(t *testing.T) {
in.kv.Remove(k)
}
}()
defer in.etcdKV.RemoveWithPrefix(metaPath1)
})
t.Run("CreateIndex BinaryVector", func(t *testing.T) {
var insertCodec storage.InsertCodec
@ -293,6 +297,8 @@ func TestIndexNode(t *testing.T) {
in.kv.Remove(k)
}
}()
defer in.etcdKV.RemoveWithPrefix(metaPath2)
})
t.Run("Create Deleted_Index", func(t *testing.T) {
@ -410,6 +416,8 @@ func TestIndexNode(t *testing.T) {
in.kv.Remove(k)
}
}()
defer in.etcdKV.RemoveWithPrefix(metaPath3)
})
t.Run("GetComponentStates", func(t *testing.T) {
@ -444,3 +452,278 @@ func TestIndexNode(t *testing.T) {
err = in.Stop()
assert.Nil(t, err)
}
func TestCreateIndexFailed(t *testing.T) {
ctx := context.Background()
indexID := UniqueID(1001)
indexBuildID1 := UniqueID(54322)
indexBuildID2 := UniqueID(54323)
floatVectorFieldID := UniqueID(102)
tsFieldID := UniqueID(1)
collectionID := UniqueID(202)
floatVectorFieldName := "float_vector"
metaPath1 := "FloatVector1"
metaPath2 := "FloatVector2"
floatVectorBinlogPath := "float_vector_binlog"
in, err := NewIndexNode(ctx)
assert.Nil(t, err)
Params.Init()
err = in.Register()
assert.Nil(t, err)
err = in.Init()
assert.Nil(t, err)
err = in.Start()
assert.Nil(t, err)
t.Run("CreateIndex error", func(t *testing.T) {
var insertCodec storage.InsertCodec
defer insertCodec.Close()
insertCodec.Schema = &etcdpb.CollectionMeta{
ID: collectionID,
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: floatVectorFieldID,
Name: floatVectorFieldName,
IsPrimaryKey: false,
DataType: schemapb.DataType_FloatVector,
},
},
},
}
data := make(map[UniqueID]storage.FieldData)
tsData := make([]int64, nb)
for i := 0; i < nb; i++ {
tsData[i] = int64(i + 100)
}
data[tsFieldID] = &storage.Int64FieldData{
NumRows: []int64{nb},
Data: tsData,
}
data[floatVectorFieldID] = &storage.FloatVectorFieldData{
NumRows: []int64{nb},
Data: generateFloatVectors(),
Dim: dim,
}
insertData := storage.InsertData{
Data: data,
Infos: []storage.BlobInfo{
{
Length: 10,
},
},
}
binLogs, _, err := insertCodec.Serialize(999, 888, &insertData)
assert.Nil(t, err)
kvs := make(map[string]string, len(binLogs))
paths := make([]string, 0, len(binLogs))
for i, blob := range binLogs {
key := path.Join(floatVectorBinlogPath, strconv.Itoa(i))
paths = append(paths, key)
kvs[key] = string(blob.Value[:])
}
err = in.kv.MultiSave(kvs)
assert.Nil(t, err)
indexMeta := &indexpb.IndexMeta{
IndexBuildID: indexBuildID1,
State: commonpb.IndexState_InProgress,
Version: 1,
}
value := proto.MarshalTextString(indexMeta)
err = in.etcdKV.Save(metaPath1, value)
assert.Nil(t, err)
req := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID1,
IndexName: "FloatVector",
IndexID: indexID,
Version: 1,
MetaPath: metaPath1,
DataPaths: paths,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "8",
},
{
Key: "dim",
Value: "8",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type",
Value: "IVF_SQ8",
},
{
Key: "params",
Value: "{\"nlist\": 128}",
},
{
Key: "metric_type",
Value: "L2",
},
},
}
status, err := in.CreateIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
value, err = in.etcdKV.Load(metaPath1)
assert.Nil(t, err)
indexMetaTmp := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp)
assert.Nil(t, err)
if indexMetaTmp.State != commonpb.IndexState_Failed {
time.Sleep(10 * time.Second)
value, err = in.etcdKV.Load(metaPath1)
assert.Nil(t, err)
indexMetaTmp2 := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp2)
assert.Nil(t, err)
assert.Equal(t, commonpb.IndexState_Failed, indexMetaTmp2.State)
defer in.kv.MultiRemove(indexMetaTmp2.IndexFilePaths)
}
defer in.kv.MultiRemove(indexMetaTmp.IndexFilePaths)
defer func() {
for k := range kvs {
in.kv.Remove(k)
}
}()
indexMeta2 := &indexpb.IndexMeta{
IndexBuildID: indexBuildID2,
State: commonpb.IndexState_InProgress,
Version: 1,
}
value2 := proto.MarshalTextString(indexMeta2)
err = in.etcdKV.Save(metaPath2, value2)
assert.Nil(t, err)
req2 := &indexpb.CreateIndexRequest{
IndexBuildID: indexBuildID2,
IndexName: "FloatVector",
IndexID: indexID,
Version: 1,
MetaPath: metaPath2,
DataPaths: paths,
TypeParams: []*commonpb.KeyValuePair{
{
Key: "dim",
Value: "8",
},
{
Key: "params",
Value: "value",
},
},
IndexParams: []*commonpb.KeyValuePair{
{
Key: "index_type",
Value: "IVF_SQ8",
},
{
Key: "params",
Value: "{\"nlist\": 128}",
},
{
Key: "metric_type",
Value: "L2",
},
},
}
status, err = in.CreateIndex(ctx, req2)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode)
value, err = in.etcdKV.Load(metaPath2)
assert.Nil(t, err)
indexMetaTmp2 := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp2)
assert.Nil(t, err)
if indexMetaTmp.State != commonpb.IndexState_Failed {
time.Sleep(10 * time.Second)
value, err = in.etcdKV.Load(metaPath2)
assert.Nil(t, err)
indexMetaTmp2 := indexpb.IndexMeta{}
err = proto.UnmarshalText(value, &indexMetaTmp2)
assert.Nil(t, err)
assert.Equal(t, commonpb.IndexState_Failed, indexMetaTmp2.State)
defer in.kv.MultiRemove(indexMetaTmp2.IndexFilePaths)
}
defer in.kv.MultiRemove(indexMetaTmp2.IndexFilePaths)
defer func() {
for k := range kvs {
in.kv.Remove(k)
}
}()
})
t.Run("CreateIndex server not healthy", func(t *testing.T) {
in.UpdateStateCode(internalpb.StateCode_Initializing)
status, err := in.CreateIndex(ctx, &indexpb.CreateIndexRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
})
err = in.Stop()
assert.Nil(t, err)
}
func TestIndexNode_Error(t *testing.T) {
ctx := context.Background()
in, err := NewIndexNode(ctx)
assert.Nil(t, err)
Params.Init()
err = in.Register()
assert.Nil(t, err)
err = in.Init()
assert.Nil(t, err)
err = in.Start()
assert.Nil(t, err)
in.UpdateStateCode(internalpb.StateCode_Initializing)
t.Run("CreateIndex", func(t *testing.T) {
status, err := in.CreateIndex(ctx, &indexpb.CreateIndexRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, status.ErrorCode)
})
t.Run("GetMetrics", func(t *testing.T) {
resp, err := in.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
in.UpdateStateCode(internalpb.StateCode_Healthy)
t.Run("Request Illegal", func(t *testing.T) {
resp, err := in.GetMetrics(ctx, &milvuspb.GetMetricsRequest{})
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("MetricsTypeIllegal", func(t *testing.T) {
req, err := metricsinfo.ConstructRequestByMetricType("GetIndexNodeMetrics")
assert.Nil(t, err)
resp, err := in.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
err = in.Stop()
assert.Nil(t, err)
}

View File

@ -199,13 +199,13 @@ func NewTaskScheduler(ctx context.Context,
return s, nil
}
func (sched *TaskScheduler) setParallelism(parallel int) {
if parallel <= 0 {
log.Debug("IndexNode can not set parallelism to less than zero!")
return
}
sched.buildParallel = parallel
}
//func (sched *TaskScheduler) setParallelism(parallel int) {
// if parallel <= 0 {
// log.Debug("IndexNode can not set parallelism to less than zero!")
// return
// }
// sched.buildParallel = parallel
//}
func (sched *TaskScheduler) scheduleIndexBuildTask() []task {
ret := make([]task, 0)