Add unittest for index component (#7475)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/7508/head
cai.zhang 2021-09-06 17:54:41 +08:00 committed by GitHub
parent fb4e23bc79
commit e6e03fc93f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1231 additions and 139 deletions

View File

@ -0,0 +1,122 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexcoordclient
import (
"context"
"testing"
grpcindexcoord "github.com/milvus-io/milvus/internal/distributed/indexcoord"
"github.com/milvus-io/milvus/internal/indexcoord"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
func TestIndexCoordClient(t *testing.T) {
ctx := context.Background()
indexCoordServer, err := grpcindexcoord.NewServer(ctx)
assert.Nil(t, err)
icm := &indexcoord.Mock{}
err = indexCoordServer.SetClient(icm)
assert.Nil(t, err)
err = indexCoordServer.Run()
assert.Nil(t, err)
icc, err := NewClient(ctx, indexcoord.Params.MetaRootPath, indexcoord.Params.EtcdEndpoints)
assert.Nil(t, err)
assert.NotNil(t, icc)
err = icc.Init()
assert.Nil(t, err)
err = icc.Register()
assert.Nil(t, err)
err = icc.Start()
assert.Nil(t, err)
t.Run("GetComponentStates", func(t *testing.T) {
states, err := icc.GetComponentStates(ctx)
assert.Nil(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode)
assert.Equal(t, commonpb.ErrorCode_Success, states.Status.ErrorCode)
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
resp, err := icc.GetTimeTickChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
resp, err := icc.GetStatisticsChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("BuildIndex", func(t *testing.T) {
req := &indexpb.BuildIndexRequest{
IndexBuildID: 0,
IndexID: 0,
}
resp, err := icc.BuildIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("DropIndex", func(t *testing.T) {
req := &indexpb.DropIndexRequest{
IndexID: 0,
}
resp, err := icc.DropIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("GetIndexStates", func(t *testing.T) {
req := &indexpb.GetIndexStatesRequest{
IndexBuildIDs: []int64{0},
}
resp, err := icc.GetIndexStates(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, len(req.IndexBuildIDs), len(resp.States))
assert.Equal(t, commonpb.IndexState_Finished, resp.States[0].State)
})
t.Run("GetIndexFilePaths", func(t *testing.T) {
req := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []int64{0},
}
resp, err := icc.GetIndexFilePaths(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, len(req.IndexBuildIDs), len(resp.FilePaths))
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
resp, err := icc.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
err = indexCoordServer.Stop()
assert.Nil(t, err)
err = icc.Stop()
assert.Nil(t, err)
}

View File

@ -18,6 +18,8 @@ import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
"google.golang.org/grpc"
@ -37,7 +39,7 @@ type UniqueID = typeutil.UniqueID
type Timestamp = typeutil.Timestamp
type Server struct {
indexcoord *indexcoord.IndexCoord
indexcoord types.IndexCoord
grpcServer *grpc.Server
grpcErrChan chan error
@ -82,8 +84,6 @@ func (s *Server) init() error {
log.Error("IndexCoord", zap.Any("init error", err))
return err
}
s.indexcoord.UpdateStateCode(internalpb.StateCode_Initializing)
if err := s.indexcoord.Init(); err != nil {
log.Error("IndexCoord", zap.Any("init error", err))
return err
@ -118,6 +118,11 @@ func (s *Server) Stop() error {
return nil
}
func (s *Server) SetClient(indexCoordClient types.IndexCoord) error {
s.indexcoord = indexCoordClient
return nil
}
func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
return s.indexcoord.GetComponentStates(ctx)
}

View File

@ -0,0 +1,110 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexcoord
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/indexcoord"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
func TestIndexCoordinateServer(t *testing.T) {
ctx := context.Background()
indexCoord, err := NewServer(ctx)
assert.Nil(t, err)
assert.NotNil(t, indexCoord)
indexCoordClient := &indexcoord.Mock{}
err = indexCoord.SetClient(indexCoordClient)
assert.Nil(t, err)
err = indexCoord.Run()
assert.Nil(t, err)
t.Run("GetComponentStates", func(t *testing.T) {
req := &internalpb.GetComponentStatesRequest{}
states, err := indexCoord.GetComponentStates(ctx, req)
assert.Nil(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode)
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
req := &internalpb.GetTimeTickChannelRequest{}
resp, err := indexCoord.GetTimeTickChannel(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
req := &internalpb.GetStatisticsChannelRequest{}
resp, err := indexCoord.GetStatisticsChannel(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("BuildIndex", func(t *testing.T) {
req := &indexpb.BuildIndexRequest{
IndexBuildID: 0,
IndexID: 0,
DataPaths: []string{},
}
resp, err := indexCoord.BuildIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetIndexStates", func(t *testing.T) {
req := &indexpb.GetIndexStatesRequest{
IndexBuildIDs: []UniqueID{0},
}
resp, err := indexCoord.GetIndexStates(ctx, req)
assert.Nil(t, err)
assert.Equal(t, len(req.IndexBuildIDs), len(resp.States))
assert.Equal(t, commonpb.IndexState_Finished, resp.States[0].State)
})
t.Run("DropIndex", func(t *testing.T) {
req := &indexpb.DropIndexRequest{
IndexID: 0,
}
resp, err := indexCoord.DropIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("GetIndexFilePaths", func(t *testing.T) {
req := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []UniqueID{0, 1},
}
resp, err := indexCoord.GetIndexFilePaths(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, len(req.IndexBuildIDs), len(resp.FilePaths))
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{
Request: "",
}
resp, err := indexCoord.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, "IndexCoord", resp.ComponentName)
})
err = indexCoord.Stop()
assert.Nil(t, err)
}

View File

@ -0,0 +1,92 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexnodeclient
import (
"context"
"testing"
grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
func TestIndexNodeClient(t *testing.T) {
ctx := context.Background()
ins, err := grpcindexnode.NewServer(ctx)
assert.Nil(t, err)
assert.NotNil(t, ins)
inm := &indexnode.Mock{}
err = ins.SetClient(inm)
assert.Nil(t, err)
err = ins.Run()
assert.Nil(t, err)
inc, err := NewClient(ctx, "localhost:21121")
assert.Nil(t, err)
assert.NotNil(t, inc)
err = inc.Init()
assert.Nil(t, err)
err = inc.Start()
assert.Nil(t, err)
t.Run("GetComponentStates", func(t *testing.T) {
states, err := inc.GetComponentStates(ctx)
assert.Nil(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode)
assert.Equal(t, commonpb.ErrorCode_Success, states.Status.ErrorCode)
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
resp, err := inc.GetTimeTickChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
resp, err := inc.GetStatisticsChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("CreateIndex", func(t *testing.T) {
req := &indexpb.CreateIndexRequest{
IndexBuildID: 0,
IndexID: 0,
}
resp, err := inc.CreateIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
resp, err := inc.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
err = ins.Stop()
assert.Nil(t, err)
err = inc.Stop()
assert.Nil(t, err)
}

View File

@ -19,6 +19,8 @@ import (
"strconv"
"sync"
"github.com/milvus-io/milvus/internal/types"
"go.uber.org/zap"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
@ -34,7 +36,7 @@ import (
)
type Server struct {
indexnode *indexnode.IndexNode
indexnode types.IndexNode
grpcServer *grpc.Server
grpcErrChan chan error
@ -132,8 +134,6 @@ func (s *Server) init() error {
return err
}
s.indexnode.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))
err = s.indexnode.Init()
if err != nil {
log.Error("IndexNode Init failed", zap.Error(err))
@ -168,6 +168,11 @@ func (s *Server) Stop() error {
return nil
}
func (s *Server) SetClient(indexNodeClient types.IndexNode) error {
s.indexnode = indexNodeClient
return nil
}
func (s *Server) GetComponentStates(ctx context.Context, req *internalpb.GetComponentStatesRequest) (*internalpb.ComponentStates, error) {
return s.indexnode.GetComponentStates(ctx)
}

View File

@ -0,0 +1,83 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package grpcindexnode
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
func TestIndexNodeServer(t *testing.T) {
ctx := context.Background()
ins, err := NewServer(ctx)
assert.Nil(t, err)
assert.NotNil(t, ins)
inm := &indexnode.Mock{}
err = ins.SetClient(inm)
assert.Nil(t, err)
err = ins.Run()
assert.Nil(t, err)
t.Run("GetComponentStates", func(t *testing.T) {
req := &internalpb.GetComponentStatesRequest{}
states, err := ins.GetComponentStates(ctx, req)
assert.Nil(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode)
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
req := &internalpb.GetTimeTickChannelRequest{}
resp, err := ins.GetTimeTickChannel(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
req := &internalpb.GetStatisticsChannelRequest{}
resp, err := ins.GetStatisticsChannel(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("CreateIndex", func(t *testing.T) {
req := &indexpb.CreateIndexRequest{
IndexBuildID: 0,
IndexID: 0,
DataPaths: []string{},
}
resp, err := ins.CreateIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{
Request: "",
}
resp, err := ins.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
err = ins.Stop()
assert.Nil(t, err)
}

View File

@ -104,6 +104,7 @@ func (i *IndexCoord) Register() error {
func (i *IndexCoord) Init() error {
log.Debug("IndexCoord", zap.Any("etcd endpoints", Params.EtcdEndpoints))
i.UpdateStateCode(internalpb.StateCode_Initializing)
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
@ -191,9 +192,6 @@ func (i *IndexCoord) Init() error {
i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load()))
log.Debug("IndexCoord assign tasks server success", zap.Error(err))
return nil
}
@ -219,7 +217,9 @@ func (i *IndexCoord) Start() error {
for _, cb := range i.startCallbacks {
cb()
}
log.Debug("IndexCoord start")
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexCoord", zap.Any("State", i.stateCode.Load()))
log.Debug("IndexCoord start successfully")
return nil
}
@ -227,6 +227,7 @@ func (i *IndexCoord) Start() error {
func (i *IndexCoord) Stop() error {
i.loopCancel()
i.sched.Close()
i.loopWg.Wait()
for _, cb := range i.closeCallbacks {
cb()
}
@ -699,7 +700,7 @@ func (i *IndexCoord) assignTaskLoop() {
if err != nil {
log.Debug("IndexCoord assignTaskLoop", zap.Any("GetSessions error", err))
}
if len(i.nodeManager.nodeClients) <= 0 {
if len(sessions) <= 0 {
log.Debug("There is no IndexNode available as this time.")
break
}

View File

@ -0,0 +1,213 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexcoord
import (
"context"
"errors"
"strconv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type Mock struct {
etcdKV *etcdkv.EtcdKV
Failure bool
}
func (icm *Mock) Init() error {
if icm.Failure {
return errors.New("IndexCoordinate init failed")
}
return nil
}
func (icm *Mock) Start() error {
if icm.Failure {
return errors.New("IndexCoordinate start failed")
}
return nil
}
func (icm *Mock) Stop() error {
if icm.Failure {
return errors.New("IndexCoordinate stop failed")
}
err := icm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexCoordRole)
return err
}
func (icm *Mock) Register() error {
if icm.Failure {
return errors.New("IndexCoordinate register failed")
}
icm.etcdKV, _ = etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
err := icm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexCoordRole)
session := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
session.Init(typeutil.IndexCoordRole, Params.Address, true)
return err
}
func (icm *Mock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
if icm.Failure {
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
StateCode: internalpb.StateCode_Abnormal,
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexCoordinate GetComponentStates failed")
}
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
StateCode: internalpb.StateCode_Healthy,
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
func (icm *Mock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
if icm.Failure {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexCoordinate GetStatisticsChannel failed")
}
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: "",
}, nil
}
func (icm *Mock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
if icm.Failure {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexCoordinate GetTimeTickChannel failed")
}
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: "",
}, nil
}
func (icm *Mock) BuildIndex(ctx context.Context, req *indexpb.BuildIndexRequest) (*indexpb.BuildIndexResponse, error) {
if icm.Failure {
return &indexpb.BuildIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
IndexBuildID: 0,
}, errors.New("IndexCoordinate BuildIndex error")
}
return &indexpb.BuildIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexBuildID: 0,
}, nil
}
func (icm *Mock) DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error) {
if icm.Failure {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, errors.New("IndexCoordinate DropIndex failed")
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (icm *Mock) GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error) {
if icm.Failure {
return &indexpb.GetIndexStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexCoordinate GetIndexStates failed")
}
states := make([]*indexpb.IndexInfo, len(req.IndexBuildIDs))
for i := range states {
states[i] = &indexpb.IndexInfo{
IndexBuildID: req.IndexBuildIDs[i],
State: commonpb.IndexState_Finished,
IndexID: 0,
}
}
return &indexpb.GetIndexStatesResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
States: states,
}, nil
}
func (icm *Mock) GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error) {
if icm.Failure {
return &indexpb.GetIndexFilePathsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexCoordinate GetIndexFilePaths failed")
}
filePaths := make([]*indexpb.IndexFilePathInfo, len(req.IndexBuildIDs))
for i := range filePaths {
filePaths[i] = &indexpb.IndexFilePathInfo{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
IndexBuildID: req.IndexBuildIDs[i],
IndexFilePaths: []string{strconv.FormatInt(req.IndexBuildIDs[i], 10)},
}
}
return &indexpb.GetIndexFilePathsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
FilePaths: filePaths,
}, nil
}
func (icm *Mock) GetMetrics(ctx context.Context, request *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if icm.Failure {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexCoordinate GetMetrics failed")
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Response: "",
ComponentName: "IndexCoord",
}, nil
}

View File

@ -0,0 +1,193 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexcoord
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
func TestIndexCoordMock(t *testing.T) {
Params.Init()
icm := Mock{}
err := icm.Register()
assert.Nil(t, err)
err = icm.Init()
assert.Nil(t, err)
err = icm.Start()
assert.Nil(t, err)
ctx := context.Background()
t.Run("Register", func(t *testing.T) {
})
t.Run("GetComponentStates", func(t *testing.T) {
states, err := icm.GetComponentStates(ctx)
assert.Nil(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode)
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
resp, err := icm.GetTimeTickChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
resp, err := icm.GetStatisticsChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("BuildIndex", func(t *testing.T) {
req := &indexpb.BuildIndexRequest{
IndexBuildID: 0,
IndexID: 0,
DataPaths: []string{},
}
resp, err := icm.BuildIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetIndexStates", func(t *testing.T) {
req := &indexpb.GetIndexStatesRequest{
IndexBuildIDs: []UniqueID{0},
}
resp, err := icm.GetIndexStates(ctx, req)
assert.Nil(t, err)
assert.Equal(t, len(req.IndexBuildIDs), len(resp.States))
assert.Equal(t, commonpb.IndexState_Finished, resp.States[0].State)
})
t.Run("DropIndex", func(t *testing.T) {
req := &indexpb.DropIndexRequest{
IndexID: 0,
}
resp, err := icm.DropIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("GetIndexFilePaths", func(t *testing.T) {
req := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []UniqueID{0, 1},
}
resp, err := icm.GetIndexFilePaths(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, len(req.IndexBuildIDs), len(resp.FilePaths))
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{
Request: "",
}
resp, err := icm.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, "IndexCoord", resp.ComponentName)
})
err = icm.Stop()
assert.Nil(t, err)
}
func TestIndexCoordMockError(t *testing.T) {
icm := Mock{
Failure: true,
}
err := icm.Init()
assert.NotNil(t, err)
err = icm.Start()
assert.NotNil(t, err)
ctx := context.Background()
t.Run("Register", func(t *testing.T) {
err = icm.Register()
assert.NotNil(t, err)
})
t.Run("GetComponentStates", func(t *testing.T) {
states, err := icm.GetComponentStates(ctx)
assert.NotNil(t, err)
assert.Equal(t, internalpb.StateCode_Abnormal, states.State.StateCode)
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
resp, err := icm.GetTimeTickChannel(ctx)
assert.NotNil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
resp, err := icm.GetStatisticsChannel(ctx)
assert.NotNil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("BuildIndex", func(t *testing.T) {
req := &indexpb.BuildIndexRequest{
IndexBuildID: 0,
IndexID: 0,
DataPaths: []string{},
}
resp, err := icm.BuildIndex(ctx, req)
assert.NotNil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("GetIndexStates", func(t *testing.T) {
req := &indexpb.GetIndexStatesRequest{
IndexBuildIDs: []UniqueID{0},
}
resp, err := icm.GetIndexStates(ctx, req)
assert.NotNil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("DropIndex", func(t *testing.T) {
req := &indexpb.DropIndexRequest{
IndexID: 0,
}
resp, err := icm.DropIndex(ctx, req)
assert.NotNil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.ErrorCode)
})
t.Run("GetIndexFilePaths", func(t *testing.T) {
req := &indexpb.GetIndexFilePathsRequest{
IndexBuildIDs: []UniqueID{0, 1},
}
resp, err := icm.GetIndexFilePaths(ctx, req)
assert.NotNil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{
Request: "",
}
resp, err := icm.GetMetrics(ctx, req)
assert.NotNil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
err = icm.Stop()
assert.NotNil(t, err)
}

View File

@ -17,122 +17,25 @@ import (
"testing"
"time"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
"github.com/milvus-io/milvus/internal/indexnode"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
)
type indexNodeMock struct {
types.IndexNode
}
func (in *indexNodeMock) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
indexMeta := indexpb.IndexMeta{}
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, err
}
_, values, versions, err := etcdKV.LoadWithPrefix2(req.MetaPath)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, err
}
err = proto.UnmarshalText(values[0], &indexMeta)
if err != nil {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, err
}
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
indexMeta.State = commonpb.IndexState_Finished
_ = etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
time.Sleep(10 * time.Second)
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func getSystemInfoMetricsByIndexNodeMock(
ctx context.Context,
req *milvuspb.GetMetricsRequest,
in *indexNodeMock,
) (*milvuspb.GetMetricsResponse, error) {
id := UniqueID(16384)
nodeInfos := metricsinfo.IndexNodeInfos{
BaseComponentInfos: metricsinfo.BaseComponentInfos{
Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
},
}
resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
}, nil
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: resp,
ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
}, nil
}
func (in *indexNodeMock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: err.Error(),
},
Response: "",
}, nil
}
if metricType == metricsinfo.SystemInfoMetrics {
return getSystemInfoMetricsByIndexNodeMock(ctx, req, in)
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
}, nil
}
func TestIndexCoord(t *testing.T) {
ctx := context.Background()
ic, err := NewIndexCoord(ctx)
@ -140,14 +43,25 @@ func TestIndexCoord(t *testing.T) {
Params.Init()
err = ic.Register()
assert.Nil(t, err)
// TODO: add indexNodeMock to etcd
err = ic.Init()
assert.Nil(t, err)
indexNodeID := UniqueID(100)
ic.nodeManager.setClient(indexNodeID, &indexNodeMock{})
err = ic.Start()
assert.Nil(t, err)
in, err := grpcindexnode.NewServer(ctx)
assert.Nil(t, err)
assert.NotNil(t, in)
inm := &indexnode.Mock{
Build: true,
Failure: false,
}
err = in.SetClient(inm)
assert.Nil(t, err)
err = in.Run()
assert.Nil(t, err)
state, err := ic.GetComponentStates(ctx)
assert.Nil(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, state.State.StateCode)
@ -183,7 +97,7 @@ func TestIndexCoord(t *testing.T) {
if resp.States[0].State == commonpb.IndexState_Finished {
break
}
time.Sleep(3 * time.Second)
time.Sleep(1 * time.Second)
}
})
@ -221,8 +135,38 @@ func TestIndexCoord(t *testing.T) {
zap.String("resp", resp.Response))
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
resp, err := ic.GetTimeTickChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
resp, err := ic.GetStatisticsChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetMetrics when indexcoord is not healthy", func(t *testing.T) {
ic.UpdateStateCode(internalpb.StateCode_Abnormal)
req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics)
assert.Nil(t, err)
resp, err := ic.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
ic.UpdateStateCode(internalpb.StateCode_Healthy)
})
t.Run("GetMetrics when request is illegal", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{}
resp, err := ic.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_UnexpectedError, resp.Status.ErrorCode)
})
err = in.Stop()
assert.Nil(t, err)
time.Sleep(11 * time.Second)
ic.nodeManager.RemoveNode(indexNodeID)
err = ic.Stop()
assert.Nil(t, err)

View File

@ -32,7 +32,7 @@ type TaskQueue interface {
utEmpty() bool
utFull() bool
addUnissuedTask(t task) error
FrontUnissuedTask() task
//FrontUnissuedTask() task
PopUnissuedTask() task
AddActiveTask(t task)
PopActiveTask(tID UniqueID) task
@ -78,17 +78,17 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error {
return nil
}
func (queue *BaseTaskQueue) FrontUnissuedTask() task {
queue.utLock.Lock()
defer queue.utLock.Unlock()
if queue.unissuedTasks.Len() <= 0 {
log.Warn("sorry, but the unissued task list is empty!")
return nil
}
return queue.unissuedTasks.Front().Value.(task)
}
//func (queue *BaseTaskQueue) FrontUnissuedTask() task {
// queue.utLock.Lock()
// defer queue.utLock.Unlock()
//
// if queue.unissuedTasks.Len() <= 0 {
// log.Warn("sorry, but the unissued task list is empty!")
// return nil
// }
//
// return queue.unissuedTasks.Front().Value.(task)
//}
func (queue *BaseTaskQueue) PopUnissuedTask() task {
queue.utLock.Lock()

View File

@ -87,6 +87,8 @@ func (i *IndexNode) Register() error {
}
func (i *IndexNode) Init() error {
i.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("IndexNode", zap.Any("State", internalpb.StateCode_Initializing))
connectEtcdFn := func() error {
etcdKV, err := etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
i.etcdKV = etcdKV
@ -114,15 +116,14 @@ func (i *IndexNode) Init() error {
}
log.Debug("IndexNode NewMinIOKV success")
i.closer = trace.InitTracing("index_node")
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexNode", zap.Any("State", i.stateCode.Load()))
return nil
}
func (i *IndexNode) Start() error {
i.sched.Start()
i.UpdateStateCode(internalpb.StateCode_Healthy)
log.Debug("IndexNode", zap.Any("State", i.stateCode.Load()))
// Start callbacks
for _, cb := range i.startCallbacks {
cb()
@ -199,14 +200,14 @@ func (i *IndexNode) CreateIndex(ctx context.Context, request *indexpb.CreateInde
}
// AddStartCallback adds a callback in the startServer phase.
func (i *IndexNode) AddStartCallback(callbacks ...func()) {
i.startCallbacks = append(i.startCallbacks, callbacks...)
}
//func (i *IndexNode) AddStartCallback(callbacks ...func()) {
// i.startCallbacks = append(i.startCallbacks, callbacks...)
//}
// AddCloseCallback adds a callback in the Close phase.
func (i *IndexNode) AddCloseCallback(callbacks ...func()) {
i.closeCallbacks = append(i.closeCallbacks, callbacks...)
}
//func (i *IndexNode) AddCloseCallback(callbacks ...func()) {
// i.closeCallbacks = append(i.closeCallbacks, callbacks...)
//}
func (i *IndexNode) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
log.Debug("get IndexNode components states ...")

View File

@ -0,0 +1,247 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexnode
import (
"context"
"errors"
"strconv"
"sync"
"time"
"github.com/milvus-io/milvus/internal/log"
"github.com/golang/protobuf/proto"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
type Mock struct {
Build bool
Failure bool
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
etcdKV *etcdkv.EtcdKV
buildIndex chan *indexpb.CreateIndexRequest
}
func (inm *Mock) Init() error {
if inm.Failure {
return errors.New("IndexNode init failed")
}
inm.ctx, inm.cancel = context.WithCancel(context.Background())
inm.buildIndex = make(chan *indexpb.CreateIndexRequest)
return nil
}
func (inm *Mock) buildIndexTask() {
log.Debug("IndexNodeMock wait for building index")
defer inm.wg.Done()
for {
select {
case <-inm.ctx.Done():
return
case req := <-inm.buildIndex:
if inm.Failure && inm.Build {
indexMeta := indexpb.IndexMeta{}
_, values, versions, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath)
_ = proto.UnmarshalText(values[0], &indexMeta)
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
indexMeta.State = commonpb.IndexState_Failed
time.Sleep(time.Second)
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
}
if inm.Build {
indexMeta := indexpb.IndexMeta{}
_, values, versions, _ := inm.etcdKV.LoadWithPrefix2(req.MetaPath)
_ = proto.UnmarshalText(values[0], &indexMeta)
indexMeta.IndexFilePaths = []string{"IndexFilePath-1", "IndexFilePath-2"}
indexMeta.State = commonpb.IndexState_Failed
time.Sleep(time.Second)
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0],
proto.MarshalTextString(&indexMeta))
indexMeta.Version = indexMeta.Version + 1
indexMeta.State = commonpb.IndexState_Finished
_ = inm.etcdKV.CompareVersionAndSwap(req.MetaPath, versions[0]+1,
proto.MarshalTextString(&indexMeta))
}
}
}
}
func (inm *Mock) Start() error {
inm.wg.Add(1)
go inm.buildIndexTask()
if inm.Failure {
return errors.New("IndexNode start failed")
}
return nil
}
func (inm *Mock) Stop() error {
inm.cancel()
inm.wg.Wait()
inm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexNodeRole)
if inm.Failure {
return errors.New("IndexNode stop failed")
}
return nil
}
func (inm *Mock) Register() error {
if inm.Failure {
return errors.New("IndexNode register failed")
}
inm.etcdKV, _ = etcdkv.NewEtcdKV(Params.EtcdEndpoints, Params.MetaRootPath)
inm.etcdKV.RemoveWithPrefix("session/" + typeutil.IndexNodeRole)
session := sessionutil.NewSession(context.Background(), Params.MetaRootPath, Params.EtcdEndpoints)
session.Init(typeutil.IndexNodeRole, Params.IP+":"+strconv.Itoa(Params.Port), false)
return nil
}
func (inm *Mock) GetComponentStates(ctx context.Context) (*internalpb.ComponentStates, error) {
if inm.Failure {
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
StateCode: internalpb.StateCode_Abnormal,
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, nil
}
return &internalpb.ComponentStates{
State: &internalpb.ComponentInfo{
StateCode: internalpb.StateCode_Healthy,
},
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
}, nil
}
func (inm *Mock) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
if inm.Failure {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexNode GetStatisticsChannel failed")
}
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: "",
}, nil
}
func (inm *Mock) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
if inm.Failure {
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
},
}, errors.New("IndexNode GetTimeTickChannel failed")
}
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
},
Value: "",
}, nil
}
func (inm *Mock) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRequest) (*commonpb.Status, error) {
if inm.Build {
inm.buildIndex <- req
}
if inm.Failure {
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
}, errors.New("IndexNode CreateIndex failed")
}
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
}, nil
}
func (inm *Mock) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
if inm.Failure {
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_UnexpectedError,
Reason: metricsinfo.MsgUnimplementedMetric,
},
Response: "",
}, errors.New("IndexNode GetMetrics failed")
}
return &milvuspb.GetMetricsResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
Reason: "",
},
Response: "",
ComponentName: "IndexNode",
}, nil
}
//func getSystemInfoMetricsByIndexNodeMock(
// ctx context.Context,
// req *milvuspb.GetMetricsRequest,
// in *IndexNodeMock,
//) (*milvuspb.GetMetricsResponse, error) {
//
// id := UniqueID(16384)
//
// nodeInfos := metricsinfo.IndexNodeInfos{
// BaseComponentInfos: metricsinfo.BaseComponentInfos{
// Name: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
// },
// }
// resp, err := metricsinfo.MarshalComponentInfos(nodeInfos)
// if err != nil {
// return &milvuspb.GetMetricsResponse{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: err.Error(),
// },
// Response: "",
// ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
// }, nil
// }
//
// return &milvuspb.GetMetricsResponse{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_Success,
// Reason: "",
// },
// Response: resp,
// ComponentName: metricsinfo.ConstructComponentName(typeutil.IndexNodeRole, id),
// }, nil
//}

View File

@ -0,0 +1,77 @@
// Copyright (C) 2019-2020 Zilliz. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations under the License.
package indexnode
import (
"context"
"testing"
"github.com/milvus-io/milvus/internal/proto/commonpb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/proto/milvuspb"
"github.com/stretchr/testify/assert"
)
func TestIndexNodeMock(t *testing.T) {
Params.Init()
inm := Mock{}
err := inm.Register()
assert.Nil(t, err)
err = inm.Init()
assert.Nil(t, err)
err = inm.Start()
assert.Nil(t, err)
ctx := context.Background()
t.Run("GetComponentStates", func(t *testing.T) {
states, err := inm.GetComponentStates(ctx)
assert.Nil(t, err)
assert.Equal(t, internalpb.StateCode_Healthy, states.State.StateCode)
})
t.Run("GetTimeTickChannel", func(t *testing.T) {
resp, err := inm.GetTimeTickChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("GetStatisticsChannel", func(t *testing.T) {
resp, err := inm.GetStatisticsChannel(ctx)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
})
t.Run("CreateIndex", func(t *testing.T) {
req := &indexpb.CreateIndexRequest{
IndexBuildID: 0,
IndexID: 0,
DataPaths: []string{},
}
resp, err := inm.CreateIndex(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.ErrorCode)
})
t.Run("GetMetrics", func(t *testing.T) {
req := &milvuspb.GetMetricsRequest{
Request: "",
}
resp, err := inm.GetMetrics(ctx, req)
assert.Nil(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.Status.ErrorCode)
assert.Equal(t, "IndexNode", resp.ComponentName)
})
err = inm.Stop()
assert.Nil(t, err)
}

View File

@ -83,7 +83,6 @@ type IndexCoord interface {
DropIndex(ctx context.Context, req *indexpb.DropIndexRequest) (*commonpb.Status, error)
GetIndexStates(ctx context.Context, req *indexpb.GetIndexStatesRequest) (*indexpb.GetIndexStatesResponse, error)
GetIndexFilePaths(ctx context.Context, req *indexpb.GetIndexFilePathsRequest) (*indexpb.GetIndexFilePathsResponse, error)
GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error)
}