enhance: Simplify cross cluster routing integration test (#29873)

This pull request simplifies the integration test for cross-cluster
routing by reusing `integration.MiniClusterSuite`, instead of defining
custom Milvus clients, servers, and etcd client.

issue: https://github.com/milvus-io/milvus/issues/29874

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/29878/head
yihao.dai 2024-01-11 14:00:52 +08:00 committed by GitHub
parent 1f759837c4
commit a4f15f4f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 27 additions and 179 deletions

View File

@ -17,7 +17,6 @@
package crossclusterrouting
import (
"context"
"fmt"
"math/rand"
"strconv"
@ -26,188 +25,37 @@ import (
"time"
"github.com/stretchr/testify/suite"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord"
grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord/client"
grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode"
grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client"
grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client"
grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy"
grpcproxyclient "github.com/milvus-io/milvus/internal/distributed/proxy/client"
grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord"
grpcquerycoordclient "github.com/milvus-io/milvus/internal/distributed/querycoord/client"
grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode"
grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client"
grpcrootcoord "github.com/milvus-io/milvus/internal/distributed/rootcoord"
grpcrootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/tests/integration"
)
type CrossClusterRoutingSuite struct {
suite.Suite
ctx context.Context
cancel context.CancelFunc
factory dependency.Factory
ChunkManager storage.ChunkManager
client *clientv3.Client
// clients
rootCoordClient *grpcrootcoordclient.Client
proxyClient *grpcproxyclient.Client
dataCoordClient *grpcdatacoordclient.Client
queryCoordClient *grpcquerycoordclient.Client
dataNodeClient *grpcdatanodeclient.Client
queryNodeClient *grpcquerynodeclient.Client
indexNodeClient *grpcindexnodeclient.Client
// servers
rootCoord *grpcrootcoord.Server
proxy *grpcproxy.Server
dataCoord *grpcdatacoord.Server
queryCoord *grpcquerycoord.Server
dataNode *grpcdatanode.Server
queryNode *grpcquerynode.Server
indexNode *grpcindexnode.Server
integration.MiniClusterSuite
}
func (s *CrossClusterRoutingSuite) SetupSuite() {
s.ctx, s.cancel = context.WithTimeout(context.Background(), time.Second*180)
rand.Seed(time.Now().UnixNano())
s.Require().NoError(s.SetupEmbedEtcd())
paramtable.Init()
paramtable.Get().Save("grpc.client.maxMaxAttempts", "1")
s.factory = dependency.NewDefaultFactory(true)
chunkManager, err := s.factory.NewPersistentStorageChunkManager(s.ctx)
s.NoError(err)
s.ChunkManager = chunkManager
}
func (s *CrossClusterRoutingSuite) TearDownSuite() {
s.TearDownEmbedEtcd()
paramtable.Get().Save("grpc.client.maxMaxAttempts", strconv.FormatInt(paramtable.DefaultMaxAttempts, 10))
}
func (s *CrossClusterRoutingSuite) SetupTest() {
s.T().Logf("Setup test...")
var err error
// setup etcd client
etcdConfig := &paramtable.Get().EtcdCfg
s.client, err = etcd.GetEtcdClient(
etcdConfig.UseEmbedEtcd.GetAsBool(),
etcdConfig.EtcdUseSSL.GetAsBool(),
etcdConfig.Endpoints.GetAsStrings(),
etcdConfig.EtcdTLSCert.GetValue(),
etcdConfig.EtcdTLSKey.GetValue(),
etcdConfig.EtcdTLSCACert.GetValue(),
etcdConfig.EtcdTLSMinVersion.GetValue())
s.NoError(err)
// setup clients
s.rootCoordClient, err = grpcrootcoordclient.NewClient(s.ctx)
s.NoError(err)
s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx)
s.NoError(err)
s.queryCoordClient, err = grpcquerycoordclient.NewClient(s.ctx)
s.NoError(err)
s.proxyClient, err = grpcproxyclient.NewClient(s.ctx, paramtable.Get().ProxyGrpcClientCfg.GetInternalAddress(), 1)
s.NoError(err)
s.dataNodeClient, err = grpcdatanodeclient.NewClient(s.ctx, paramtable.Get().DataNodeGrpcClientCfg.GetAddress(), 1)
s.NoError(err)
s.queryNodeClient, err = grpcquerynodeclient.NewClient(s.ctx, paramtable.Get().QueryNodeGrpcClientCfg.GetAddress(), 1)
s.NoError(err)
s.indexNodeClient, err = grpcindexnodeclient.NewClient(s.ctx, paramtable.Get().IndexNodeGrpcClientCfg.GetAddress(), 1, false)
s.NoError(err)
// setup servers
s.rootCoord, err = grpcrootcoord.NewServer(s.ctx, s.factory)
s.NoError(err)
err = s.rootCoord.Run()
s.NoError(err)
s.T().Logf("rootCoord server successfully started")
s.dataCoord = grpcdatacoord.NewServer(s.ctx, s.factory)
s.NotNil(s.dataCoord)
err = s.dataCoord.Run()
s.NoError(err)
s.T().Logf("dataCoord server successfully started")
s.queryCoord, err = grpcquerycoord.NewServer(s.ctx, s.factory)
s.NoError(err)
err = s.queryCoord.Run()
s.NoError(err)
s.T().Logf("queryCoord server successfully started")
s.proxy, err = grpcproxy.NewServer(s.ctx, s.factory)
s.NoError(err)
err = s.proxy.Run()
s.NoError(err)
s.T().Logf("proxy server successfully started")
s.dataNode, err = grpcdatanode.NewServer(s.ctx, s.factory)
s.NoError(err)
err = s.dataNode.Run()
s.NoError(err)
s.T().Logf("dataNode server successfully started")
s.queryNode, err = grpcquerynode.NewServer(s.ctx, s.factory)
s.NoError(err)
err = s.queryNode.Run()
s.NoError(err)
s.T().Logf("queryNode server successfully started")
s.indexNode, err = grpcindexnode.NewServer(s.ctx, s.factory)
s.NoError(err)
err = s.indexNode.Run()
s.NoError(err)
s.T().Logf("indexNode server successfully started")
}
func (s *CrossClusterRoutingSuite) TearDownTest() {
err := s.rootCoord.Stop()
s.NoError(err)
err = s.proxy.Stop()
s.NoError(err)
err = s.dataCoord.Stop()
s.NoError(err)
err = s.queryCoord.Stop()
s.NoError(err)
err = s.dataNode.Stop()
s.NoError(err)
err = s.queryNode.Stop()
s.NoError(err)
err = s.indexNode.Stop()
s.NoError(err)
if s.ChunkManager == nil {
chunkManager, err := s.factory.NewPersistentStorageChunkManager(s.ctx)
if err != nil {
log.Warn("fail to create chunk manager to clean test data", zap.Error(err))
} else {
s.ChunkManager = chunkManager
}
}
s.ChunkManager.RemoveWithPrefix(s.ctx, s.ChunkManager.RootPath())
s.cancel()
}
func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
func (s *CrossClusterRoutingSuite) TestCrossClusterRouting() {
const (
waitFor = time.Second * 10
duration = time.Millisecond * 10
@ -216,7 +64,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
go func() {
for {
select {
case <-s.ctx.Done():
case <-time.After(15 * time.Second):
return
default:
err := paramtable.Get().Save(paramtable.Get().CommonCfg.ClusterPrefix.Key, fmt.Sprintf("%d", rand.Int()))
@ -229,7 +77,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
// test rootCoord
s.Eventually(func() bool {
resp, err := s.rootCoordClient.ShowCollections(s.ctx, &milvuspb.ShowCollectionsRequest{
resp, err := s.Cluster.RootCoordClient.ShowCollections(s.Cluster.GetContext(), &milvuspb.ShowCollectionsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections),
),
@ -244,7 +92,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
// test dataCoord
s.Eventually(func() bool {
resp, err := s.dataCoordClient.GetRecoveryInfoV2(s.ctx, &datapb.GetRecoveryInfoRequestV2{})
resp, err := s.Cluster.DataCoordClient.GetRecoveryInfoV2(s.Cluster.GetContext(), &datapb.GetRecoveryInfoRequestV2{})
s.Suite.T().Logf("resp: %s, err: %s", resp, err)
if err != nil {
return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
@ -254,7 +102,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
// test queryCoord
s.Eventually(func() bool {
resp, err := s.queryCoordClient.LoadCollection(s.ctx, &querypb.LoadCollectionRequest{})
resp, err := s.Cluster.QueryCoordClient.LoadCollection(s.Cluster.GetContext(), &querypb.LoadCollectionRequest{})
s.Suite.T().Logf("resp: %s, err: %s", resp, err)
if err != nil {
return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
@ -264,7 +112,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
// test proxy
s.Eventually(func() bool {
resp, err := s.proxyClient.InvalidateCollectionMetaCache(s.ctx, &proxypb.InvalidateCollMetaCacheRequest{})
resp, err := s.Cluster.ProxyClient.InvalidateCollectionMetaCache(s.Cluster.GetContext(), &proxypb.InvalidateCollMetaCacheRequest{})
s.Suite.T().Logf("resp: %s, err: %s", resp, err)
if err != nil {
return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
@ -274,7 +122,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
// test dataNode
s.Eventually(func() bool {
resp, err := s.dataNodeClient.FlushSegments(s.ctx, &datapb.FlushSegmentsRequest{})
resp, err := s.Cluster.DataNodeClient.FlushSegments(s.Cluster.GetContext(), &datapb.FlushSegmentsRequest{})
s.Suite.T().Logf("resp: %s, err: %s", resp, err)
if err != nil {
return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
@ -284,7 +132,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
// test queryNode
s.Eventually(func() bool {
resp, err := s.queryNodeClient.Search(s.ctx, &querypb.SearchRequest{})
resp, err := s.Cluster.QueryNodeClient.Search(s.Cluster.GetContext(), &querypb.SearchRequest{})
s.Suite.T().Logf("resp: %s, err: %s", resp, err)
if err != nil {
return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())
@ -294,7 +142,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() {
// test indexNode
s.Eventually(func() bool {
resp, err := s.indexNodeClient.CreateJob(s.ctx, &indexpb.CreateJobRequest{})
resp, err := s.Cluster.IndexNodeClient.CreateJob(s.Cluster.GetContext(), &indexpb.CreateJobRequest{})
s.Suite.T().Logf("resp: %s, err: %s", resp, err)
if err != nil {
return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error())

View File

@ -118,6 +118,19 @@ func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2,
etcdCli: cluster.EtcdCli,
}
ports, err := GetAvailablePorts(7)
if err != nil {
return nil, err
}
log.Info("minicluster ports", zap.Ints("ports", ports))
params.Save(params.RootCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[0]))
params.Save(params.DataCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[1]))
params.Save(params.QueryCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[2]))
params.Save(params.DataNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[3]))
params.Save(params.QueryNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[4]))
params.Save(params.IndexNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[5]))
params.Save(params.ProxyGrpcServerCfg.Port.Key, fmt.Sprint(ports[6]))
// setup clients
cluster.RootCoordClient, err = grpcrootcoordclient.NewClient(ctx)
if err != nil {
@ -190,20 +203,7 @@ func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2,
func (cluster *MiniClusterV2) Start() error {
log.Info("mini cluster start")
ports, err := GetAvailablePorts(7)
if err != nil {
return err
}
log.Info("minicluster ports", zap.Ints("ports", ports))
params.Save(params.RootCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[0]))
params.Save(params.DataCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[1]))
params.Save(params.QueryCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[2]))
params.Save(params.DataNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[3]))
params.Save(params.QueryNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[4]))
params.Save(params.IndexNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[5]))
params.Save(params.ProxyGrpcServerCfg.Port.Key, fmt.Sprint(ports[6]))
err = cluster.RootCoord.Run()
err := cluster.RootCoord.Run()
if err != nil {
return err
}