fix: Repair integration test framework (#28308)

#28307

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/28696/head
wayblink 2023-11-24 10:26:23 +08:00 committed by GitHub
parent 2843664a20
commit 5aedbd0af4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 496 additions and 179 deletions

View File

@ -212,7 +212,6 @@ func (s *BulkInsertSuite) TestBulkInsert() {
}
func TestBulkInsert(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(BulkInsertSuite))
}
@ -260,7 +259,6 @@ func GenerateNumpyFile(filePath string, rowCount int, dType schemapb.DataType, t
}
func TestGenerateNumpyFile(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
err := os.MkdirAll(TempFilesPath, os.ModePerm)
require.NoError(t, err)
err = GenerateNumpyFile(TempFilesPath+"embeddings.npy", 100, schemapb.DataType_FloatVector, []*commonpb.KeyValuePair{

View File

@ -40,7 +40,7 @@ type HelloMilvusSuite struct {
}
func (s *HelloMilvusSuite) TestHelloMilvus() {
ctx, cancel := context.WithCancel(s.Cluster.GetContext())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := s.Cluster
@ -156,6 +156,5 @@ func (s *HelloMilvusSuite) TestHelloMilvus() {
}
func TestHelloMilvus(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(HelloMilvusSuite))
}

View File

@ -154,6 +154,5 @@ func (s *GetIndexStatisticsSuite) TestGetIndexStatistics() {
}
func TestGetIndexStat(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(GetIndexStatisticsSuite))
}

View File

@ -121,6 +121,5 @@ func (s *InsertSuite) TestInsert() {
}
func TestInsert(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(InsertSuite))
}

View File

@ -1141,6 +1141,5 @@ func (s *JSONExprSuite) TestJsonContains() {
}
func TestJsonExpr(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(JSONExprSuite))
}

View File

@ -36,7 +36,7 @@ import (
// MetaWatcher to observe meta data of milvus cluster
type MetaWatcher interface {
ShowSessions() ([]*sessionutil.Session, error)
ShowSessions() ([]*sessionutil.SessionRaw, error)
ShowSegments() ([]*datapb.SegmentInfo, error)
ShowReplicas() ([]*querypb.Replica, error)
}
@ -47,7 +47,7 @@ type EtcdMetaWatcher struct {
etcdCli *clientv3.Client
}
func (watcher *EtcdMetaWatcher) ShowSessions() ([]*sessionutil.Session, error) {
func (watcher *EtcdMetaWatcher) ShowSessions() ([]*sessionutil.SessionRaw, error) {
metaPath := watcher.rootPath + "/meta/session"
return listSessionsByPrefix(watcher.etcdCli, metaPath)
}
@ -67,7 +67,7 @@ func (watcher *EtcdMetaWatcher) ShowReplicas() ([]*querypb.Replica, error) {
//=================== Below largely copied from birdwatcher ========================
// listSessions returns all session
func listSessionsByPrefix(cli *clientv3.Client, prefix string) ([]*sessionutil.Session, error) {
func listSessionsByPrefix(cli *clientv3.Client, prefix string) ([]*sessionutil.SessionRaw, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
defer cancel()
resp, err := cli.Get(ctx, prefix, clientv3.WithPrefix())
@ -75,9 +75,9 @@ func listSessionsByPrefix(cli *clientv3.Client, prefix string) ([]*sessionutil.S
return nil, err
}
sessions := make([]*sessionutil.Session, 0, len(resp.Kvs))
sessions := make([]*sessionutil.SessionRaw, 0, len(resp.Kvs))
for _, kv := range resp.Kvs {
session := &sessionutil.Session{}
session := &sessionutil.SessionRaw{}
err := json.Unmarshal(kv.Value, session)
if err != nil {
continue

View File

@ -42,10 +42,10 @@ type MetaWatcherSuite struct {
func (s *MetaWatcherSuite) TestShowSessions() {
sessions, err := s.Cluster.MetaWatcher.ShowSessions()
s.NoError(err)
s.NotEmpty(sessions)
for _, session := range sessions {
log.Info("ShowSessions result", zap.String("session", session.String()))
log.Info("ShowSessions result", zap.Any("session", session))
}
log.Info("TestShowSessions succeed")
}
func (s *MetaWatcherSuite) TestShowSegments() {
@ -128,12 +128,27 @@ func (s *MetaWatcherSuite) TestShowSegments() {
s.NoError(err)
s.Equal(insertResult.GetStatus().GetErrorCode(), commonpb.ErrorCode_Success)
// flush
flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{
DbName: dbName,
CollectionNames: []string{collectionName},
})
s.NoError(err)
segmentIDs, has := flushResp.GetCollSegIDs()[collectionName]
ids := segmentIDs.GetData()
s.Require().NotEmpty(segmentIDs)
s.Require().True(has)
flushTs, has := flushResp.GetCollFlushTs()[collectionName]
s.True(has)
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
segments, err := c.MetaWatcher.ShowSegments()
s.NoError(err)
s.NotEmpty(segments)
for _, segment := range segments {
log.Info("ShowSegments result", zap.String("segment", segment.String()))
}
log.Info("TestShowSegments succeed")
}
func (s *MetaWatcherSuite) TestShowReplicas() {
@ -306,6 +321,5 @@ func (s *MetaWatcherSuite) TestShowReplicas() {
}
func TestMetaWatcher(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(MetaWatcherSuite))
}

View File

@ -498,14 +498,14 @@ func GetMetaRootPath(rootPath string) string {
func DefaultParams() map[string]string {
testPath := fmt.Sprintf("integration-test-%d", time.Now().Unix())
return map[string]string{
EtcdRootPath: testPath,
MinioRootPath: testPath,
params.EtcdCfg.RootPath.Key: testPath,
params.MinioCfg.RootPath.Key: testPath,
//"runtime.role": typeutil.StandaloneRole,
params.IntegrationTestCfg.IntegrationMode.Key: "true",
params.LocalStorageCfg.Path.Key: path.Join("/tmp", testPath),
params.CommonCfg.StorageType.Key: "local",
params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs
params.CommonCfg.GracefulStopTimeout.Key: "10",
//params.IntegrationTestCfg.IntegrationMode.Key: "true",
params.LocalStorageCfg.Path.Key: path.Join("/tmp", testPath),
params.CommonCfg.StorageType.Key: "local",
params.DataNodeCfg.MemoryForceSyncEnable.Key: "false", // local execution will print too many logs
params.CommonCfg.GracefulStopTimeout.Key: "10",
}
}

View File

@ -17,14 +17,9 @@
package integration
import (
"context"
"testing"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/datanode"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/querynodev2"
)
type MiniClusterMethodsSuite struct {
@ -35,151 +30,151 @@ func (s *MiniClusterMethodsSuite) TestStartAndStop() {
// Do nothing
}
func (s *MiniClusterMethodsSuite) TestRemoveDataNode() {
c := s.Cluster
ctx, cancel := context.WithCancel(c.GetContext())
defer cancel()
datanode := datanode.NewDataNode(ctx, c.factory)
datanode.SetEtcdClient(c.EtcdCli)
// datanode := c.CreateDefaultDataNode()
err := c.AddDataNode(datanode)
s.NoError(err)
s.Equal(2, c.clusterConfig.DataNodeNum)
s.Equal(2, len(c.DataNodes))
err = c.RemoveDataNode(datanode)
s.NoError(err)
s.Equal(1, c.clusterConfig.DataNodeNum)
s.Equal(1, len(c.DataNodes))
// add default node and remove randomly
err = c.AddDataNode(nil)
s.NoError(err)
s.Equal(2, c.clusterConfig.DataNodeNum)
s.Equal(2, len(c.DataNodes))
err = c.RemoveDataNode(nil)
s.NoError(err)
s.Equal(1, c.clusterConfig.DataNodeNum)
s.Equal(1, len(c.DataNodes))
}
func (s *MiniClusterMethodsSuite) TestRemoveQueryNode() {
c := s.Cluster
ctx, cancel := context.WithCancel(c.GetContext())
defer cancel()
queryNode := querynodev2.NewQueryNode(ctx, c.factory)
queryNode.SetEtcdClient(c.EtcdCli)
// queryNode := c.CreateDefaultQueryNode()
err := c.AddQueryNode(queryNode)
s.NoError(err)
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(2, len(c.QueryNodes))
err = c.RemoveQueryNode(queryNode)
s.NoError(err)
s.Equal(1, c.clusterConfig.QueryNodeNum)
s.Equal(1, len(c.QueryNodes))
// add default node and remove randomly
err = c.AddQueryNode(nil)
s.NoError(err)
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(2, len(c.QueryNodes))
err = c.RemoveQueryNode(nil)
s.NoError(err)
s.Equal(1, c.clusterConfig.QueryNodeNum)
s.Equal(1, len(c.QueryNodes))
}
func (s *MiniClusterMethodsSuite) TestRemoveIndexNode() {
c := s.Cluster
ctx, cancel := context.WithCancel(c.GetContext())
defer cancel()
indexNode := indexnode.NewIndexNode(ctx, c.factory)
indexNode.SetEtcdClient(c.EtcdCli)
// indexNode := c.CreateDefaultIndexNode()
err := c.AddIndexNode(indexNode)
s.NoError(err)
s.Equal(2, c.clusterConfig.IndexNodeNum)
s.Equal(2, len(c.IndexNodes))
err = c.RemoveIndexNode(indexNode)
s.NoError(err)
s.Equal(1, c.clusterConfig.IndexNodeNum)
s.Equal(1, len(c.IndexNodes))
// add default node and remove randomly
err = c.AddIndexNode(nil)
s.NoError(err)
s.Equal(2, c.clusterConfig.IndexNodeNum)
s.Equal(2, len(c.IndexNodes))
err = c.RemoveIndexNode(nil)
s.NoError(err)
s.Equal(1, c.clusterConfig.IndexNodeNum)
s.Equal(1, len(c.IndexNodes))
}
func (s *MiniClusterMethodsSuite) TestUpdateClusterSize() {
c := s.Cluster
err := c.UpdateClusterSize(ClusterConfig{
QueryNodeNum: -1,
DataNodeNum: -1,
IndexNodeNum: -1,
})
s.Error(err)
err = c.UpdateClusterSize(ClusterConfig{
QueryNodeNum: 2,
DataNodeNum: 2,
IndexNodeNum: 2,
})
s.NoError(err)
s.Equal(2, c.clusterConfig.DataNodeNum)
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(2, c.clusterConfig.IndexNodeNum)
s.Equal(2, len(c.DataNodes))
s.Equal(2, len(c.QueryNodes))
s.Equal(2, len(c.IndexNodes))
err = c.UpdateClusterSize(ClusterConfig{
DataNodeNum: 3,
QueryNodeNum: 2,
IndexNodeNum: 1,
})
s.NoError(err)
s.Equal(3, c.clusterConfig.DataNodeNum)
s.Equal(2, c.clusterConfig.QueryNodeNum)
s.Equal(1, c.clusterConfig.IndexNodeNum)
s.Equal(3, len(c.DataNodes))
s.Equal(2, len(c.QueryNodes))
s.Equal(1, len(c.IndexNodes))
}
//func (s *MiniClusterMethodsSuite) TestRemoveDataNode() {
// c := s.Cluster
// ctx, cancel := context.WithCancel(c.GetContext())
// defer cancel()
//
// datanode := datanode.NewDataNode(ctx, c.factory)
// datanode.SetEtcdClient(c.EtcdCli)
// // datanode := c.CreateDefaultDataNode()
//
// err := c.AddDataNode(datanode)
// s.NoError(err)
//
// s.Equal(2, c.clusterConfig.DataNodeNum)
// s.Equal(2, len(c.DataNodes))
//
// err = c.RemoveDataNode(datanode)
// s.NoError(err)
//
// s.Equal(1, c.clusterConfig.DataNodeNum)
// s.Equal(1, len(c.DataNodes))
//
// // add default node and remove randomly
// err = c.AddDataNode(nil)
// s.NoError(err)
//
// s.Equal(2, c.clusterConfig.DataNodeNum)
// s.Equal(2, len(c.DataNodes))
//
// err = c.RemoveDataNode(nil)
// s.NoError(err)
//
// s.Equal(1, c.clusterConfig.DataNodeNum)
// s.Equal(1, len(c.DataNodes))
//}
//
//func (s *MiniClusterMethodsSuite) TestRemoveQueryNode() {
// c := s.Cluster
// ctx, cancel := context.WithCancel(c.GetContext())
// defer cancel()
//
// queryNode := querynodev2.NewQueryNode(ctx, c.factory)
// queryNode.SetEtcdClient(c.EtcdCli)
// // queryNode := c.CreateDefaultQueryNode()
//
// err := c.AddQueryNode(queryNode)
// s.NoError(err)
//
// s.Equal(2, c.clusterConfig.QueryNodeNum)
// s.Equal(2, len(c.QueryNodes))
//
// err = c.RemoveQueryNode(queryNode)
// s.NoError(err)
//
// s.Equal(1, c.clusterConfig.QueryNodeNum)
// s.Equal(1, len(c.QueryNodes))
//
// // add default node and remove randomly
// err = c.AddQueryNode(nil)
// s.NoError(err)
//
// s.Equal(2, c.clusterConfig.QueryNodeNum)
// s.Equal(2, len(c.QueryNodes))
//
// err = c.RemoveQueryNode(nil)
// s.NoError(err)
//
// s.Equal(1, c.clusterConfig.QueryNodeNum)
// s.Equal(1, len(c.QueryNodes))
//}
//
//func (s *MiniClusterMethodsSuite) TestRemoveIndexNode() {
// c := s.Cluster
// ctx, cancel := context.WithCancel(c.GetContext())
// defer cancel()
//
// indexNode := indexnode.NewIndexNode(ctx, c.factory)
// indexNode.SetEtcdClient(c.EtcdCli)
// // indexNode := c.CreateDefaultIndexNode()
//
// err := c.AddIndexNode(indexNode)
// s.NoError(err)
//
// s.Equal(2, c.clusterConfig.IndexNodeNum)
// s.Equal(2, len(c.IndexNodes))
//
// err = c.RemoveIndexNode(indexNode)
// s.NoError(err)
//
// s.Equal(1, c.clusterConfig.IndexNodeNum)
// s.Equal(1, len(c.IndexNodes))
//
// // add default node and remove randomly
// err = c.AddIndexNode(nil)
// s.NoError(err)
//
// s.Equal(2, c.clusterConfig.IndexNodeNum)
// s.Equal(2, len(c.IndexNodes))
//
// err = c.RemoveIndexNode(nil)
// s.NoError(err)
//
// s.Equal(1, c.clusterConfig.IndexNodeNum)
// s.Equal(1, len(c.IndexNodes))
//}
//
//func (s *MiniClusterMethodsSuite) TestUpdateClusterSize() {
// c := s.Cluster
//
// err := c.UpdateClusterSize(ClusterConfig{
// QueryNodeNum: -1,
// DataNodeNum: -1,
// IndexNodeNum: -1,
// })
// s.Error(err)
//
// err = c.UpdateClusterSize(ClusterConfig{
// QueryNodeNum: 2,
// DataNodeNum: 2,
// IndexNodeNum: 2,
// })
// s.NoError(err)
//
// s.Equal(2, c.clusterConfig.DataNodeNum)
// s.Equal(2, c.clusterConfig.QueryNodeNum)
// s.Equal(2, c.clusterConfig.IndexNodeNum)
//
// s.Equal(2, len(c.DataNodes))
// s.Equal(2, len(c.QueryNodes))
// s.Equal(2, len(c.IndexNodes))
//
// err = c.UpdateClusterSize(ClusterConfig{
// DataNodeNum: 3,
// QueryNodeNum: 2,
// IndexNodeNum: 1,
// })
// s.NoError(err)
//
// s.Equal(3, c.clusterConfig.DataNodeNum)
// s.Equal(2, c.clusterConfig.QueryNodeNum)
// s.Equal(1, c.clusterConfig.IndexNodeNum)
//
// s.Equal(3, len(c.DataNodes))
// s.Equal(2, len(c.QueryNodes))
// s.Equal(1, len(c.IndexNodes))
//}
func TestMiniCluster(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")

View File

@ -0,0 +1,316 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package integration
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/cockroachdb/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord"
grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode"
grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy"
grpcproxyclient "github.com/milvus-io/milvus/internal/distributed/proxy/client"
grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord"
grpcquerycoordclient "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode"
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
grpcrootcoord "github.com/milvus-io/milvus/internal/distributed/rootcoord"
grpcrootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type MiniClusterV2 struct {
ctx context.Context
mu sync.RWMutex
params map[string]string
clusterConfig ClusterConfig
factory dependency.Factory
ChunkManager storage.ChunkManager
EtcdCli *clientv3.Client
Proxy *grpcproxy.Server
DataCoord *grpcdatacoord.Server
RootCoord *grpcrootcoord.Server
QueryCoord *grpcquerycoord.Server
DataCoordClient *grpcdatacoordclient.Client
RootCoordClient *grpcrootcoordclient.Client
QueryCoordClient *grpcquerycoordclient.Client
ProxyClient *grpcproxyclient.Client
DataNodeClient *grpcdatanodeclient.Client
QueryNodeClient *grpcquerynodeclient.Client
IndexNodeClient *grpcindexnodeclient.Client
DataNode *grpcdatanode.Server
QueryNode *grpcquerynode.Server
IndexNode *grpcindexnode.Server
MetaWatcher MetaWatcher
}
type OptionV2 func(cluster *MiniClusterV2)
func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2, error) {
cluster := &MiniClusterV2{
ctx: ctx,
}
paramtable.Init()
cluster.params = DefaultParams()
cluster.clusterConfig = DefaultClusterConfig()
for _, opt := range opts {
opt(cluster)
}
for k, v := range cluster.params {
params.Save(k, v)
}
// setup etcd client
etcdConfig := &paramtable.Get().EtcdCfg
etcdCli, err := etcd.GetEtcdClient(
etcdConfig.UseEmbedEtcd.GetAsBool(),
etcdConfig.EtcdUseSSL.GetAsBool(),
etcdConfig.Endpoints.GetAsStrings(),
etcdConfig.EtcdTLSCert.GetValue(),
etcdConfig.EtcdTLSKey.GetValue(),
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
if err != nil {
return nil, err
}
cluster.EtcdCli = etcdCli
cluster.MetaWatcher = &EtcdMetaWatcher{
rootPath: etcdConfig.RootPath.GetValue(),
etcdCli: cluster.EtcdCli,
}
// setup clients
cluster.RootCoordClient, err = grpcrootcoordclient.NewClient(ctx)
if err != nil {
return nil, err
}
cluster.DataCoordClient, err = grpcdatacoordclient.NewClient(ctx)
if err != nil {
return nil, err
}
cluster.QueryCoordClient, err = grpcquerycoordclient.NewClient(ctx)
if err != nil {
return nil, err
}
cluster.ProxyClient, err = grpcproxyclient.NewClient(ctx, paramtable.Get().ProxyGrpcClientCfg.GetInternalAddress(), 0)
if err != nil {
return nil, err
}
cluster.DataNodeClient, err = grpcdatanodeclient.NewClient(ctx, paramtable.Get().DataNodeGrpcClientCfg.GetAddress(), 0)
if err != nil {
return nil, err
}
cluster.QueryNodeClient, err = grpcquerynodeclient.NewClient(ctx, paramtable.Get().QueryNodeGrpcClientCfg.GetAddress(), 0)
if err != nil {
return nil, err
}
cluster.IndexNodeClient, err = grpcindexnodeclient.NewClient(ctx, paramtable.Get().IndexNodeGrpcClientCfg.GetAddress(), 0, false)
if err != nil {
return nil, err
}
// setup servers
cluster.factory = dependency.NewDefaultFactory(true)
chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
if err != nil {
return nil, err
}
cluster.ChunkManager = chunkManager
cluster.RootCoord, err = grpcrootcoord.NewServer(ctx, cluster.factory)
if err != nil {
return nil, err
}
cluster.DataCoord = grpcdatacoord.NewServer(ctx, cluster.factory)
if err != nil {
return nil, err
}
cluster.QueryCoord, err = grpcquerycoord.NewServer(ctx, cluster.factory)
if err != nil {
return nil, err
}
cluster.Proxy, err = grpcproxy.NewServer(ctx, cluster.factory)
if err != nil {
return nil, err
}
cluster.DataNode, err = grpcdatanode.NewServer(ctx, cluster.factory)
if err != nil {
return nil, err
}
cluster.QueryNode, err = grpcquerynode.NewServer(ctx, cluster.factory)
if err != nil {
return nil, err
}
cluster.IndexNode, err = grpcindexnode.NewServer(ctx, cluster.factory)
if err != nil {
return nil, err
}
return cluster, nil
}
func (cluster *MiniClusterV2) Start() error {
log.Info("mini cluster start")
ports, err := GetAvailablePorts(7)
if err != nil {
return err
}
log.Info("minicluster ports", zap.Ints("ports", ports))
params.Save(params.RootCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[0]))
params.Save(params.DataCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[1]))
params.Save(params.QueryCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[2]))
params.Save(params.DataNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[3]))
params.Save(params.QueryNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[4]))
params.Save(params.IndexNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[5]))
params.Save(params.ProxyGrpcServerCfg.Port.Key, fmt.Sprint(ports[6]))
err = cluster.RootCoord.Run()
if err != nil {
return err
}
err = cluster.DataCoord.Run()
if err != nil {
return err
}
err = cluster.QueryCoord.Run()
if err != nil {
return err
}
err = cluster.DataNode.Run()
if err != nil {
return err
}
err = cluster.QueryNode.Run()
if err != nil {
return err
}
err = cluster.IndexNode.Run()
if err != nil {
return err
}
err = cluster.Proxy.Run()
if err != nil {
return err
}
ctx2, cancel := context.WithTimeout(context.Background(), time.Second*120)
defer cancel()
healthy := false
for !healthy {
checkHealthResp, _ := cluster.Proxy.CheckHealth(ctx2, &milvuspb.CheckHealthRequest{})
healthy = checkHealthResp.IsHealthy
time.Sleep(time.Second * 1)
}
if !healthy {
return errors.New("minicluster is not healthy after 120s")
}
log.Info("minicluster started")
return nil
}
func (cluster *MiniClusterV2) Stop() error {
log.Info("mini cluster stop")
cluster.RootCoord.Stop()
log.Info("mini cluster rootCoord stopped")
cluster.DataCoord.Stop()
log.Info("mini cluster dataCoord stopped")
cluster.QueryCoord.Stop()
log.Info("mini cluster queryCoord stopped")
cluster.Proxy.Stop()
log.Info("mini cluster proxy stopped")
cluster.DataNode.Stop()
log.Info("mini cluster dataNode stopped")
cluster.QueryNode.Stop()
log.Info("mini cluster queryNode stopped")
cluster.IndexNode.Stop()
log.Info("mini cluster indexNode stopped")
cluster.EtcdCli.KV.Delete(cluster.ctx, params.EtcdCfg.RootPath.GetValue(), clientv3.WithPrefix())
defer cluster.EtcdCli.Close()
if cluster.ChunkManager == nil {
chunkManager, err := cluster.factory.NewPersistentStorageChunkManager(cluster.ctx)
if err != nil {
log.Warn("fail to create chunk manager to clean test data", zap.Error(err))
} else {
cluster.ChunkManager = chunkManager
}
}
cluster.ChunkManager.RemoveWithPrefix(cluster.ctx, cluster.ChunkManager.RootPath())
return nil
}
func (cluster *MiniClusterV2) GetContext() context.Context {
return cluster.ctx
}
func GetAvailablePorts(n int) ([]int, error) {
ports := make([]int, n)
for i := range ports {
port, err := GetAvailablePort()
if err != nil {
return nil, err
}
ports[i] = port
}
return ports, nil
}
func GetAvailablePort() (int, error) {
address, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", "0.0.0.0"))
if err != nil {
return 0, err
}
listener, err := net.ListenTCP("tcp", address)
if err != nil {
return 0, err
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port, nil
}

View File

@ -10,14 +10,14 @@ import (
type QueryNodeV2Suite struct {
suite.Suite
c *MiniCluster
c *MiniClusterV2
}
func (s *QueryNodeV2Suite) SetupSuite() {
ctx := context.Background()
var err error
s.c, err = StartMiniCluster(ctx)
s.c, err = StartMiniClusterV2(ctx)
s.Require().NoError(err)
time.Sleep(time.Second)

View File

@ -337,6 +337,5 @@ func (s *RangeSearchSuite) TestRangeSearchL2() {
}
func TestRangeSearch(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(RangeSearchSuite))
}

View File

@ -61,7 +61,7 @@ type MiniClusterSuite struct {
suite.Suite
EmbedEtcdSuite
Cluster *MiniCluster
Cluster *MiniClusterV2
cancelFunc context.CancelFunc
}
@ -86,7 +86,7 @@ func (s *MiniClusterSuite) SetupTest() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*180)
s.cancelFunc = cancel
c, err := StartMiniCluster(ctx, func(c *MiniCluster) {
c, err := StartMiniClusterV2(ctx, func(c *MiniClusterV2) {
// change config etcd endpoints
c.params[params.EtcdCfg.Endpoints.Key] = val
})

View File

@ -159,6 +159,5 @@ func (s *UpsertSuite) TestUpsert() {
}
func TestUpsert(t *testing.T) {
t.Skip("Skip integration test, need to refactor integration test framework")
suite.Run(t, new(UpsertSuite))
}

View File

@ -84,7 +84,7 @@ func (s *MiniClusterSuite) waitForIndexBuiltInternal(ctx context.Context, dbName
}
}
func waitingForIndexBuilt(ctx context.Context, cluster *MiniCluster, t *testing.T, collection, field string) {
func waitingForIndexBuilt(ctx context.Context, cluster *MiniClusterV2, t *testing.T, collection, field string) {
getIndexBuilt := func() bool {
resp, err := cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
CollectionName: collection,