mirror of https://github.com/milvus-io/milvus.git
Increase indexcoord component code coverage (#7558)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/7570/head
parent
416bfeafc9
commit
e1081b6783
|
@ -79,6 +79,8 @@ gtags.conf
|
|||
# go-codecov
|
||||
coverage.txt
|
||||
profile.out
|
||||
cover.out
|
||||
coverage.html
|
||||
|
||||
# virtualenv
|
||||
venv/
|
||||
|
|
|
@ -17,8 +17,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||
|
||||
grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/indexnode"
|
||||
|
@ -38,6 +36,13 @@ import (
|
|||
|
||||
func TestIndexCoord(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
inm0 := &indexnode.Mock{}
|
||||
err := inm0.Init()
|
||||
assert.Nil(t, err)
|
||||
err = inm0.Register()
|
||||
assert.Nil(t, err)
|
||||
err = inm0.Start()
|
||||
assert.Nil(t, err)
|
||||
ic, err := NewIndexCoord(ctx)
|
||||
assert.Nil(t, err)
|
||||
Params.Init()
|
||||
|
@ -49,6 +54,9 @@ func TestIndexCoord(t *testing.T) {
|
|||
err = ic.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = inm0.Stop()
|
||||
assert.Nil(t, err)
|
||||
|
||||
in, err := grpcindexnode.NewServer(ctx)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, in)
|
||||
|
@ -158,7 +166,8 @@ func TestIndexCoord(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("GetMetrics when request is illegal", func(t *testing.T) {
|
||||
req := &milvuspb.GetMetricsRequest{}
|
||||
req, err := metricsinfo.ConstructRequestByMetricType("GetIndexNodeMetrics")
|
||||
assert.Nil(t, err)
|
||||
resp, err := ic.GetMetrics(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
|
|
|
@ -13,6 +13,7 @@ package indexcoord
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -104,6 +105,11 @@ func (mt *metaTable) reloadMeta(indexBuildID UniqueID) (*Meta, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(values) == 0 {
|
||||
log.Error("IndexCoord reload Meta", zap.Any("indexBuildID", indexBuildID), zap.Error(errors.New("meta doesn't exist in KV")))
|
||||
return nil, errors.New("meta doesn't exist in KV")
|
||||
}
|
||||
im := &indexpb.IndexMeta{}
|
||||
err = proto.UnmarshalText(values[0], im)
|
||||
if err != nil {
|
||||
|
|
|
@ -0,0 +1,291 @@
|
|||
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software distributed under the License
|
||||
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
|
||||
// or implied. See the License for the specific language governing permissions and limitations under the License.
|
||||
|
||||
package indexcoord
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestMetaTable(t *testing.T) {
|
||||
Params.Init()
|
||||
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
|
||||
assert.Nil(t, err)
|
||||
|
||||
req := &indexpb.BuildIndexRequest{
|
||||
IndexBuildID: 1,
|
||||
IndexName: "test_index",
|
||||
IndexID: 0,
|
||||
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
|
||||
}
|
||||
indexMeta1 := &indexpb.IndexMeta{
|
||||
IndexBuildID: 1,
|
||||
State: commonpb.IndexState_Finished,
|
||||
Req: req,
|
||||
IndexFilePaths: []string{"IndexFilePath-1-1", "IndexFilePath-1-2"},
|
||||
NodeID: 0,
|
||||
Version: 10,
|
||||
Recycled: false,
|
||||
}
|
||||
value := proto.MarshalTextString(indexMeta1)
|
||||
key := "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
|
||||
err = etcdKV.Save(key, value)
|
||||
assert.Nil(t, err)
|
||||
metaTable, err := NewMetaTable(etcdKV)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, metaTable)
|
||||
|
||||
t.Run("saveIndexMeta", func(t *testing.T) {
|
||||
meta := &Meta{
|
||||
indexMeta: indexMeta1,
|
||||
revision: 10,
|
||||
}
|
||||
err = metaTable.saveIndexMeta(meta)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("reloadMeta", func(t *testing.T) {
|
||||
indexBuildID := UniqueID(3)
|
||||
meta3, err := metaTable.reloadMeta(indexBuildID)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, meta3)
|
||||
})
|
||||
|
||||
t.Run("AddIndex", func(t *testing.T) {
|
||||
req := &indexpb.BuildIndexRequest{
|
||||
IndexBuildID: 1,
|
||||
}
|
||||
err = metaTable.AddIndex(req.IndexBuildID, req)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("BuildIndex", func(t *testing.T) {
|
||||
err = metaTable.BuildIndex(UniqueID(4), 1)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
indexMeta1.NodeID = 2
|
||||
value = proto.MarshalTextString(indexMeta1)
|
||||
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
|
||||
err = etcdKV.Save(key, value)
|
||||
assert.Nil(t, err)
|
||||
err = metaTable.BuildIndex(indexMeta1.IndexBuildID, 1)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("UpdateVersion", func(t *testing.T) {
|
||||
err = metaTable.UpdateVersion(UniqueID(5))
|
||||
assert.NotNil(t, err)
|
||||
|
||||
indexMeta1.Version = indexMeta1.Version + 1
|
||||
value = proto.MarshalTextString(indexMeta1)
|
||||
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
|
||||
err = etcdKV.Save(key, value)
|
||||
assert.Nil(t, err)
|
||||
err = metaTable.UpdateVersion(indexMeta1.IndexBuildID)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("MarkIndexAsDeleted", func(t *testing.T) {
|
||||
indexMeta1.Version = indexMeta1.Version + 1
|
||||
value = proto.MarshalTextString(indexMeta1)
|
||||
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
|
||||
err = etcdKV.Save(key, value)
|
||||
assert.Nil(t, err)
|
||||
err = metaTable.MarkIndexAsDeleted(indexMeta1.Req.IndexID)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("GetIndexState", func(t *testing.T) {
|
||||
indexInfos := metaTable.GetIndexStates([]UniqueID{0, 1})
|
||||
assert.Equal(t, 2, len(indexInfos))
|
||||
assert.Equal(t, "index 0 not exists", indexInfos[0].Reason)
|
||||
assert.Equal(t, "index 1 has been deleted", indexInfos[1].Reason)
|
||||
})
|
||||
|
||||
t.Run("GetIndexFilePathInfo", func(t *testing.T) {
|
||||
indexFilePathInfo, err := metaTable.GetIndexFilePathInfo(0)
|
||||
assert.Nil(t, indexFilePathInfo)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
indexFilePathInfo2, err := metaTable.GetIndexFilePathInfo(1)
|
||||
assert.Nil(t, indexFilePathInfo2)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("UpdateRecycleState", func(t *testing.T) {
|
||||
indexMeta1.Version = indexMeta1.Version + 1
|
||||
value = proto.MarshalTextString(indexMeta1)
|
||||
key = "indexes/" + strconv.FormatInt(indexMeta1.IndexBuildID, 10)
|
||||
err = etcdKV.Save(key, value)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = metaTable.UpdateRecycleState(indexMeta1.IndexBuildID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = metaTable.UpdateRecycleState(indexMeta1.IndexBuildID)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = metaTable.UpdateRecycleState(5)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("HasSameReq", func(t *testing.T) {
|
||||
req2 := &indexpb.BuildIndexRequest{
|
||||
IndexBuildID: 6,
|
||||
IndexName: "test_index",
|
||||
IndexID: 2,
|
||||
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
|
||||
}
|
||||
|
||||
err = metaTable.AddIndex(6, req2)
|
||||
assert.Nil(t, err)
|
||||
|
||||
req3 := &indexpb.BuildIndexRequest{
|
||||
IndexBuildID: 6,
|
||||
IndexName: "test_index",
|
||||
IndexID: 3,
|
||||
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
|
||||
}
|
||||
|
||||
has, err := metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.IndexID = 2
|
||||
req3.IndexName = "test_index1"
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.IndexName = "test_index"
|
||||
req3.DataPaths = []string{"DataPath-1-1", "DataPath-1-2", "DataPath-1-3"}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.DataPaths = []string{"DataPath-1-1", "DataPath-1-3"}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.DataPaths = []string{"DataPath-1-1", "DataPath-1-2"}
|
||||
req3.TypeParams = []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.TypeParams = []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-3", Value: "TypeParam-1-3"}}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.TypeParams = []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-3"}}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.TypeParams = []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}}
|
||||
req3.IndexParams = []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.IndexParams = []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-3", Value: "IndexParam-1-3"}}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
|
||||
req3.IndexParams = []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-3"}}
|
||||
has, err = metaTable.HasSameReq(req3)
|
||||
assert.Equal(t, false, has)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("LoadMetaFromETCD", func(t *testing.T) {
|
||||
req4 := &indexpb.BuildIndexRequest{
|
||||
IndexBuildID: 7,
|
||||
IndexName: "test_index",
|
||||
IndexID: 4,
|
||||
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
|
||||
}
|
||||
err = metaTable.AddIndex(7, req4)
|
||||
assert.Nil(t, err)
|
||||
|
||||
ok := metaTable.LoadMetaFromETCD(8, 0)
|
||||
assert.Equal(t, false, ok)
|
||||
|
||||
key = "indexes/" + strconv.FormatInt(req4.IndexBuildID, 10)
|
||||
err = etcdKV.RemoveWithPrefix(key)
|
||||
assert.Nil(t, err)
|
||||
|
||||
ok = metaTable.LoadMetaFromETCD(req4.IndexBuildID, 10)
|
||||
assert.Equal(t, false, ok)
|
||||
})
|
||||
|
||||
t.Run("GetNodeTaskStats", func(t *testing.T) {
|
||||
req5 := &indexpb.BuildIndexRequest{
|
||||
IndexBuildID: 9,
|
||||
IndexName: "test_index",
|
||||
IndexID: 5,
|
||||
DataPaths: []string{"DataPath-1-1", "DataPath-1-2"},
|
||||
TypeParams: []*commonpb.KeyValuePair{{Key: "TypeParam-1-1", Value: "TypeParam-1-1"}, {Key: "TypeParam-1-2", Value: "TypeParam-1-2"}},
|
||||
IndexParams: []*commonpb.KeyValuePair{{Key: "IndexParam-1-1", Value: "IndexParam-1-1"}, {Key: "IndexParam-1-2", Value: "IndexParam-1-2"}},
|
||||
}
|
||||
|
||||
err = metaTable.AddIndex(req5.IndexBuildID, req5)
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = metaTable.BuildIndex(req5.IndexBuildID, 4)
|
||||
assert.Nil(t, err)
|
||||
|
||||
priorities := metaTable.GetNodeTaskStats()
|
||||
assert.Equal(t, 1, priorities[4])
|
||||
})
|
||||
|
||||
err = etcdKV.RemoveWithPrefix("indexes/")
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestMetaTable_Error(t *testing.T) {
|
||||
Params.Init()
|
||||
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("reloadFromKV error", func(t *testing.T) {
|
||||
value := "indexMeta-1"
|
||||
key := "indexes/" + strconv.FormatInt(2, 10)
|
||||
err = etcdKV.Save(key, value)
|
||||
assert.Nil(t, err)
|
||||
meta, err := NewMetaTable(etcdKV)
|
||||
assert.NotNil(t, err)
|
||||
assert.Nil(t, meta)
|
||||
err = etcdKV.RemoveWithPrefix(key)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
err = etcdKV.RemoveWithPrefix("indexes/")
|
||||
assert.Nil(t, err)
|
||||
}
|
|
@ -12,11 +12,58 @@
|
|||
package indexcoord
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/indexnode"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGetSystemInfoMetrics(t *testing.T) {
|
||||
log.Info("TestGetSystemInfoMetrics, todo")
|
||||
ctx := context.Background()
|
||||
ic, err := NewIndexCoord(ctx)
|
||||
assert.Nil(t, err)
|
||||
Params.Init()
|
||||
err = ic.Register()
|
||||
assert.Nil(t, err)
|
||||
|
||||
err = ic.Init()
|
||||
assert.Nil(t, err)
|
||||
err = ic.Start()
|
||||
assert.Nil(t, err)
|
||||
|
||||
t.Run("getSystemInfoMetrics", func(t *testing.T) {
|
||||
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
|
||||
assert.Nil(t, err)
|
||||
|
||||
resp, err := getSystemInfoMetrics(ctx, req, ic)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, resp)
|
||||
})
|
||||
|
||||
t.Run("getSystemInfoMetrics error", func(t *testing.T) {
|
||||
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
|
||||
assert.Nil(t, err)
|
||||
|
||||
inm1 := &indexnode.Mock{
|
||||
Failure: true,
|
||||
Err: true,
|
||||
}
|
||||
inm2 := &indexnode.Mock{
|
||||
Failure: true,
|
||||
Err: false,
|
||||
}
|
||||
|
||||
ic.nodeManager.setClient(1, inm1)
|
||||
ic.nodeManager.setClient(2, inm2)
|
||||
|
||||
resp, err := getSystemInfoMetrics(ctx, req, ic)
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, resp)
|
||||
})
|
||||
|
||||
err = ic.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ package indexnode
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -35,6 +34,7 @@ import (
|
|||
type Mock struct {
|
||||
Build bool
|
||||
Failure bool
|
||||
Err bool
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
|
@ -46,7 +46,7 @@ type Mock struct {
|
|||
}
|
||||
|
||||
func (inm *Mock) Init() error {
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return errors.New("IndexNode init failed")
|
||||
}
|
||||
inm.ctx, inm.cancel = context.WithCancel(context.Background())
|
||||
|
@ -62,67 +62,67 @@ func (inm *Mock) buildIndexTask() {
|
|||
case <-inm.ctx.Done():
|
||||
return
|
||||
case req := <-inm.buildIndex:
|
||||
if inm.Failure && inm.Build {
|
||||
if inm.Failure {
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
|
||||
_, values, versions, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath)
|
||||
_ = proto.UnmarshalText(values[0], &indexMeta)
|
||||
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
|
||||
indexMeta.State = commonpb.IndexState_Failed
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(4 * time.Second)
|
||||
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
|
||||
proto.MarshalTextString(&indexMeta))
|
||||
continue
|
||||
}
|
||||
if inm.Build {
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
_, values, versions, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath)
|
||||
_ = proto.UnmarshalText(values[0], &indexMeta)
|
||||
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
|
||||
indexMeta.State = commonpb.IndexState_Failed
|
||||
time.Sleep(time.Second)
|
||||
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
|
||||
proto.MarshalTextString(&indexMeta))
|
||||
indexMeta.Version = indexMeta.Version + 1
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0]+1,
|
||||
proto.MarshalTextString(&indexMeta))
|
||||
}
|
||||
indexMeta := indexpb.IndexMeta{}
|
||||
_, values, versions, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath)
|
||||
_ = proto.UnmarshalText(values[0], &indexMeta)
|
||||
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
|
||||
indexMeta.State = commonpb.IndexState_Failed
|
||||
time.Sleep(4 * time.Second)
|
||||
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
|
||||
proto.MarshalTextString(&indexMeta))
|
||||
indexMeta.Version = indexMeta.Version + 1
|
||||
indexMeta.State = commonpb.IndexState_Finished
|
||||
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0]+1,
|
||||
proto.MarshalTextString(&indexMeta))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (inm *Mock) Start() error {
|
||||
inm.wg.Add(1)
|
||||
go inm.buildIndexTask()
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return errors.New("IndexNode start failed")
|
||||
}
|
||||
inm.wg.Add(1)
|
||||
go inm.buildIndexTask()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (inm *Mock) Stop() error {
|
||||
if inm.Err {
|
||||
return errors.New("IndexNode stop failed")
|
||||
}
|
||||
inm.cancel()
|
||||
inm.wg.Wait()
|
||||
inm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexNodeRole)
|
||||
if inm.Failure {
|
||||
return errors.New("IndexNode stop failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (inm *Mock) Register() error {
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return errors.New("IndexNode register failed")
|
||||
}
|
||||
Params.Init()
|
||||
inm.etcdKV, _ = etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
|
||||
inm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexNodeRole)
|
||||
session := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
|
||||
session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
|
||||
session.Init(typeutil.IndexNodeRole, "localhost:21121", false)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (inm *Mock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{
|
||||
StateCode: internalpb.StateCode_Abnormal,
|
||||
|
@ -130,7 +130,7 @@ func (inm *Mock) GetComponentStates(ctx context.Context) (*internalpb.ComponentS
|
|||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
},
|
||||
}, nil
|
||||
}, errors.New("IndexNode GetComponentStates Failed")
|
||||
}
|
||||
return &internalpb.ComponentStates{
|
||||
State: &internalpb.ComponentInfo{
|
||||
|
@ -143,7 +143,7 @@ func (inm *Mock) GetComponentStates(ctx context.Context) (*internalpb.ComponentS
|
|||
}
|
||||
|
||||
func (inm *Mock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -159,7 +159,7 @@ func (inm *Mock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResp
|
|||
}
|
||||
|
||||
func (inm *Mock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return &milvuspb.StringResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -179,7 +179,7 @@ func (inm *Mock) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
|||
inm.buildIndex <- req
|
||||
}
|
||||
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
}, errors.New("IndexNode CreateIndex failed")
|
||||
|
@ -191,7 +191,7 @@ func (inm *Mock) CreateIndex(ctx context.Context, req *indexpb.CreateIndexReques
|
|||
}
|
||||
|
||||
func (inm *Mock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
|
||||
if inm.Failure {
|
||||
if inm.Err {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
|
@ -201,6 +201,16 @@ func (inm *Mock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
|
|||
}, errors.New("IndexNode GetMetrics failed")
|
||||
}
|
||||
|
||||
if inm.Failure {
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: metricsinfo.MsgUnimplementedMetric,
|
||||
},
|
||||
Response: "",
|
||||
}, nil
|
||||
}
|
||||
|
||||
return &milvuspb.GetMetricsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
|
@ -210,38 +220,3 @@ func (inm *Mock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest
|
|||
ComponentName: "IndexNode",
|
||||
}, nil
|
||||
}
|
||||
|
||||
//func getSystemInfoMetricsByIndexNodeMock(
|
||||
// ctx context.Context,
|
||||
// req *milvuspb.GetMetricsRequest,
|
||||
// in *IndexNodeMock,
|
||||
//) (*milvuspb.GetMetricsResponse, error) {
|
||||
//
|
||||
// id := UniqueID(16384)
|
||||
//
|
||||
// nodeInfos := metricsinfo.IndexNodeInfos{
|
||||
// BaseComponentInfos: metricsinfo.BaseComponentInfos{
|
||||
// Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
|
||||
// },
|
||||
// }
|
||||
// resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
|
||||
// if err != nil {
|
||||
// return &milvuspb.GetMetricsResponse{
|
||||
// Status: &commonpb.Status{
|
||||
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
// Reason: err.Error(),
|
||||
// },
|
||||
// Response: "",
|
||||
// ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
|
||||
// }, nil
|
||||
// }
|
||||
//
|
||||
// return &milvuspb.GetMetricsResponse{
|
||||
// Status: &commonpb.Status{
|
||||
// ErrorCode: commonpb.ErrorCode_Success,
|
||||
// Reason: "",
|
||||
// },
|
||||
// Response: resp,
|
||||
// ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
|
||||
// }, nil
|
||||
//}
|
||||
|
|
|
@ -13,8 +13,11 @@ package indexnode
|
|||
|
||||
import (
|
||||
"context"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/indexpb"
|
||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||
|
@ -24,7 +27,9 @@ import (
|
|||
|
||||
func TestIndexNodeMock(t *testing.T) {
|
||||
Params.Init()
|
||||
inm := Mock{}
|
||||
inm := Mock{
|
||||
Build: true,
|
||||
}
|
||||
err := inm.Register()
|
||||
assert.Nil(t, err)
|
||||
err = inm.Init()
|
||||
|
@ -69,7 +74,108 @@ func TestIndexNodeMock(t *testing.T) {
|
|||
resp, err := inm.GetMetrics(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
|
||||
assert.Equal(t, "IndexNode", resp.ComponentName)
|
||||
})
|
||||
|
||||
err = inm.Stop()
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
|
||||
func TestIndexNodeMockError(t *testing.T) {
|
||||
inm := Mock{
|
||||
Failure: false,
|
||||
Build: false,
|
||||
Err: true,
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
err := inm.Register()
|
||||
assert.NotNil(t, err)
|
||||
|
||||
err = inm.Init()
|
||||
assert.NotNil(t, err)
|
||||
|
||||
err = inm.Start()
|
||||
assert.NotNil(t, err)
|
||||
|
||||
t.Run("GetComponentStates error", func(t *testing.T) {
|
||||
resp, err := inm.GetComponentStates(ctx)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetStatisticsChannel error", func(t *testing.T) {
|
||||
resp, err := inm.GetStatisticsChannel(ctx)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetTimeTickChannel error", func(t *testing.T) {
|
||||
resp, err := inm.GetTimeTickChannel(ctx)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("CreateIndex error", func(t *testing.T) {
|
||||
resp, err := inm.CreateIndex(ctx, &indexpb.CreateIndexRequest{})
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
|
||||
})
|
||||
|
||||
t.Run("GetMetrics error", func(t *testing.T) {
|
||||
req := &milvuspb.GetMetricsRequest{}
|
||||
resp, err := inm.GetMetrics(ctx, req)
|
||||
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
err = inm.Stop()
|
||||
assert.NotNil(t, err)
|
||||
}
|
||||
|
||||
func TestIndexNodeMockFiled(t *testing.T) {
|
||||
inm := Mock{
|
||||
Failure: true,
|
||||
Build: true,
|
||||
Err: false,
|
||||
}
|
||||
|
||||
err := inm.Register()
|
||||
assert.Nil(t, err)
|
||||
err = inm.Init()
|
||||
assert.Nil(t, err)
|
||||
err = inm.Start()
|
||||
assert.Nil(t, err)
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("CreateIndex failed", func(t *testing.T) {
|
||||
req := &indexpb.CreateIndexRequest{
|
||||
IndexBuildID: 0,
|
||||
IndexID: 0,
|
||||
DataPaths: []string{},
|
||||
}
|
||||
key := "/indexes/" + strconv.FormatInt(10, 10)
|
||||
indexMeta := &indexpb.IndexMeta{
|
||||
IndexBuildID: 10,
|
||||
State: commonpb.IndexState_InProgress,
|
||||
Version: 0,
|
||||
}
|
||||
|
||||
value := proto.MarshalTextString(indexMeta)
|
||||
err := inm.etcdKV.Save(key, value)
|
||||
assert.Nil(t, err)
|
||||
resp, err := inm.CreateIndex(ctx, req)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
|
||||
err = inm.etcdKV.RemoveWithPrefix(key)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
t.Run("GetMetrics failed", func(t *testing.T) {
|
||||
req := &milvuspb.GetMetricsRequest{}
|
||||
resp, err := inm.GetMetrics(ctx, req)
|
||||
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
|
||||
})
|
||||
|
||||
err = inm.Stop()
|
||||
|
|
Loading…
Reference in New Issue