From a4f15f4f43334e5c9432f15798cc1e8386085694 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Thu, 11 Jan 2024 14:00:52 +0800 Subject: [PATCH] enhance: Simplify cross cluster routing integration test (#29873) This pull request simplifies the integration test for cross-cluster routing by reusing `integration.MiniClusterSuite`, instead of defining custom Milvus clients, servers, and etcd client. issue: https://github.com/milvus-io/milvus/issues/29874 Signed-off-by: bigsheeper --- .../cross_cluster_routing_test.go | 178 ++---------------- tests/integration/minicluster_v2.go | 28 +-- 2 files changed, 27 insertions(+), 179 deletions(-) diff --git a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go index 2bab51ade2..15940216f3 100644 --- a/tests/integration/crossclusterrouting/cross_cluster_routing_test.go +++ b/tests/integration/crossclusterrouting/cross_cluster_routing_test.go @@ -17,7 +17,6 @@ package crossclusterrouting import ( - "context" "fmt" "math/rand" "strconv" @@ -26,188 +25,37 @@ import ( "time" "github.com/stretchr/testify/suite" - clientv3 "go.etcd.io/etcd/client/v3" - "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - grpcdatacoord "github.com/milvus-io/milvus/internal/distributed/datacoord" - grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord/client" - grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode" - grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" - grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode" - grpcindexnodeclient "github.com/milvus-io/milvus/internal/distributed/indexnode/client" - grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy" - grpcproxyclient "github.com/milvus-io/milvus/internal/distributed/proxy/client" - grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord" - grpcquerycoordclient "github.com/milvus-io/milvus/internal/distributed/querycoord/client" - grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode" - grpcquerynodeclient "github.com/milvus-io/milvus/internal/distributed/querynode/client" - grpcrootcoord "github.com/milvus-io/milvus/internal/distributed/rootcoord" - grpcrootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/proxypb" "github.com/milvus-io/milvus/internal/proto/querypb" - "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/commonpbutil" - "github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" ) type CrossClusterRoutingSuite struct { - suite.Suite - - ctx context.Context - cancel context.CancelFunc - - factory dependency.Factory - ChunkManager storage.ChunkManager - client *clientv3.Client - - // clients - rootCoordClient *grpcrootcoordclient.Client - proxyClient *grpcproxyclient.Client - dataCoordClient *grpcdatacoordclient.Client - queryCoordClient *grpcquerycoordclient.Client - dataNodeClient *grpcdatanodeclient.Client - queryNodeClient *grpcquerynodeclient.Client - indexNodeClient *grpcindexnodeclient.Client - - // servers - rootCoord *grpcrootcoord.Server - proxy *grpcproxy.Server - dataCoord *grpcdatacoord.Server - queryCoord *grpcquerycoord.Server - dataNode *grpcdatanode.Server - queryNode *grpcquerynode.Server - indexNode *grpcindexnode.Server + integration.MiniClusterSuite } func (s *CrossClusterRoutingSuite) SetupSuite() { - s.ctx, s.cancel = context.WithTimeout(context.Background(), time.Second*180) rand.Seed(time.Now().UnixNano()) + s.Require().NoError(s.SetupEmbedEtcd()) paramtable.Init() - paramtable.Get().Save("grpc.client.maxMaxAttempts", "1") - s.factory = dependency.NewDefaultFactory(true) - chunkManager, err := s.factory.NewPersistentStorageChunkManager(s.ctx) - s.NoError(err) - s.ChunkManager = chunkManager } func (s *CrossClusterRoutingSuite) TearDownSuite() { + s.TearDownEmbedEtcd() paramtable.Get().Save("grpc.client.maxMaxAttempts", strconv.FormatInt(paramtable.DefaultMaxAttempts, 10)) } -func (s *CrossClusterRoutingSuite) SetupTest() { - s.T().Logf("Setup test...") - var err error - - // setup etcd client - etcdConfig := ¶mtable.Get().EtcdCfg - s.client, err = etcd.GetEtcdClient( - etcdConfig.UseEmbedEtcd.GetAsBool(), - etcdConfig.EtcdUseSSL.GetAsBool(), - etcdConfig.Endpoints.GetAsStrings(), - etcdConfig.EtcdTLSCert.GetValue(), - etcdConfig.EtcdTLSKey.GetValue(), - etcdConfig.EtcdTLSCACert.GetValue(), - etcdConfig.EtcdTLSMinVersion.GetValue()) - s.NoError(err) - - // setup clients - s.rootCoordClient, err = grpcrootcoordclient.NewClient(s.ctx) - s.NoError(err) - s.dataCoordClient, err = grpcdatacoordclient.NewClient(s.ctx) - s.NoError(err) - s.queryCoordClient, err = grpcquerycoordclient.NewClient(s.ctx) - s.NoError(err) - s.proxyClient, err = grpcproxyclient.NewClient(s.ctx, paramtable.Get().ProxyGrpcClientCfg.GetInternalAddress(), 1) - s.NoError(err) - s.dataNodeClient, err = grpcdatanodeclient.NewClient(s.ctx, paramtable.Get().DataNodeGrpcClientCfg.GetAddress(), 1) - s.NoError(err) - s.queryNodeClient, err = grpcquerynodeclient.NewClient(s.ctx, paramtable.Get().QueryNodeGrpcClientCfg.GetAddress(), 1) - s.NoError(err) - s.indexNodeClient, err = grpcindexnodeclient.NewClient(s.ctx, paramtable.Get().IndexNodeGrpcClientCfg.GetAddress(), 1, false) - s.NoError(err) - - // setup servers - s.rootCoord, err = grpcrootcoord.NewServer(s.ctx, s.factory) - s.NoError(err) - err = s.rootCoord.Run() - s.NoError(err) - s.T().Logf("rootCoord server successfully started") - - s.dataCoord = grpcdatacoord.NewServer(s.ctx, s.factory) - s.NotNil(s.dataCoord) - err = s.dataCoord.Run() - s.NoError(err) - s.T().Logf("dataCoord server successfully started") - - s.queryCoord, err = grpcquerycoord.NewServer(s.ctx, s.factory) - s.NoError(err) - err = s.queryCoord.Run() - s.NoError(err) - s.T().Logf("queryCoord server successfully started") - - s.proxy, err = grpcproxy.NewServer(s.ctx, s.factory) - s.NoError(err) - err = s.proxy.Run() - s.NoError(err) - s.T().Logf("proxy server successfully started") - - s.dataNode, err = grpcdatanode.NewServer(s.ctx, s.factory) - s.NoError(err) - err = s.dataNode.Run() - s.NoError(err) - s.T().Logf("dataNode server successfully started") - - s.queryNode, err = grpcquerynode.NewServer(s.ctx, s.factory) - s.NoError(err) - err = s.queryNode.Run() - s.NoError(err) - s.T().Logf("queryNode server successfully started") - - s.indexNode, err = grpcindexnode.NewServer(s.ctx, s.factory) - s.NoError(err) - err = s.indexNode.Run() - s.NoError(err) - s.T().Logf("indexNode server successfully started") -} - -func (s *CrossClusterRoutingSuite) TearDownTest() { - err := s.rootCoord.Stop() - s.NoError(err) - err = s.proxy.Stop() - s.NoError(err) - err = s.dataCoord.Stop() - s.NoError(err) - err = s.queryCoord.Stop() - s.NoError(err) - err = s.dataNode.Stop() - s.NoError(err) - err = s.queryNode.Stop() - s.NoError(err) - err = s.indexNode.Stop() - s.NoError(err) - if s.ChunkManager == nil { - chunkManager, err := s.factory.NewPersistentStorageChunkManager(s.ctx) - if err != nil { - log.Warn("fail to create chunk manager to clean test data", zap.Error(err)) - } else { - s.ChunkManager = chunkManager - } - } - s.ChunkManager.RemoveWithPrefix(s.ctx, s.ChunkManager.RootPath()) - s.cancel() -} - -func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { +func (s *CrossClusterRoutingSuite) TestCrossClusterRouting() { const ( waitFor = time.Second * 10 duration = time.Millisecond * 10 @@ -216,7 +64,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { go func() { for { select { - case <-s.ctx.Done(): + case <-time.After(15 * time.Second): return default: err := paramtable.Get().Save(paramtable.Get().CommonCfg.ClusterPrefix.Key, fmt.Sprintf("%d", rand.Int())) @@ -229,7 +77,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { // test rootCoord s.Eventually(func() bool { - resp, err := s.rootCoordClient.ShowCollections(s.ctx, &milvuspb.ShowCollectionsRequest{ + resp, err := s.Cluster.RootCoordClient.ShowCollections(s.Cluster.GetContext(), &milvuspb.ShowCollectionsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_ShowCollections), ), @@ -244,7 +92,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { // test dataCoord s.Eventually(func() bool { - resp, err := s.dataCoordClient.GetRecoveryInfoV2(s.ctx, &datapb.GetRecoveryInfoRequestV2{}) + resp, err := s.Cluster.DataCoordClient.GetRecoveryInfoV2(s.Cluster.GetContext(), &datapb.GetRecoveryInfoRequestV2{}) s.Suite.T().Logf("resp: %s, err: %s", resp, err) if err != nil { return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error()) @@ -254,7 +102,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { // test queryCoord s.Eventually(func() bool { - resp, err := s.queryCoordClient.LoadCollection(s.ctx, &querypb.LoadCollectionRequest{}) + resp, err := s.Cluster.QueryCoordClient.LoadCollection(s.Cluster.GetContext(), &querypb.LoadCollectionRequest{}) s.Suite.T().Logf("resp: %s, err: %s", resp, err) if err != nil { return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error()) @@ -264,7 +112,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { // test proxy s.Eventually(func() bool { - resp, err := s.proxyClient.InvalidateCollectionMetaCache(s.ctx, &proxypb.InvalidateCollMetaCacheRequest{}) + resp, err := s.Cluster.ProxyClient.InvalidateCollectionMetaCache(s.Cluster.GetContext(), &proxypb.InvalidateCollMetaCacheRequest{}) s.Suite.T().Logf("resp: %s, err: %s", resp, err) if err != nil { return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error()) @@ -274,7 +122,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { // test dataNode s.Eventually(func() bool { - resp, err := s.dataNodeClient.FlushSegments(s.ctx, &datapb.FlushSegmentsRequest{}) + resp, err := s.Cluster.DataNodeClient.FlushSegments(s.Cluster.GetContext(), &datapb.FlushSegmentsRequest{}) s.Suite.T().Logf("resp: %s, err: %s", resp, err) if err != nil { return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error()) @@ -284,7 +132,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { // test queryNode s.Eventually(func() bool { - resp, err := s.queryNodeClient.Search(s.ctx, &querypb.SearchRequest{}) + resp, err := s.Cluster.QueryNodeClient.Search(s.Cluster.GetContext(), &querypb.SearchRequest{}) s.Suite.T().Logf("resp: %s, err: %s", resp, err) if err != nil { return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error()) @@ -294,7 +142,7 @@ func (s *CrossClusterRoutingSuite) TestCrossClusterRoutingSuite() { // test indexNode s.Eventually(func() bool { - resp, err := s.indexNodeClient.CreateJob(s.ctx, &indexpb.CreateJobRequest{}) + resp, err := s.Cluster.IndexNodeClient.CreateJob(s.Cluster.GetContext(), &indexpb.CreateJobRequest{}) s.Suite.T().Logf("resp: %s, err: %s", resp, err) if err != nil { return strings.Contains(err.Error(), merr.ErrServiceUnavailable.Error()) diff --git a/tests/integration/minicluster_v2.go b/tests/integration/minicluster_v2.go index d0b9108cb7..29a37faf61 100644 --- a/tests/integration/minicluster_v2.go +++ b/tests/integration/minicluster_v2.go @@ -118,6 +118,19 @@ func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2, etcdCli: cluster.EtcdCli, } + ports, err := GetAvailablePorts(7) + if err != nil { + return nil, err + } + log.Info("minicluster ports", zap.Ints("ports", ports)) + params.Save(params.RootCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[0])) + params.Save(params.DataCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[1])) + params.Save(params.QueryCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[2])) + params.Save(params.DataNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[3])) + params.Save(params.QueryNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[4])) + params.Save(params.IndexNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[5])) + params.Save(params.ProxyGrpcServerCfg.Port.Key, fmt.Sprint(ports[6])) + // setup clients cluster.RootCoordClient, err = grpcrootcoordclient.NewClient(ctx) if err != nil { @@ -190,20 +203,7 @@ func StartMiniClusterV2(ctx context.Context, opts ...OptionV2) (*MiniClusterV2, func (cluster *MiniClusterV2) Start() error { log.Info("mini cluster start") - ports, err := GetAvailablePorts(7) - if err != nil { - return err - } - log.Info("minicluster ports", zap.Ints("ports", ports)) - params.Save(params.RootCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[0])) - params.Save(params.DataCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[1])) - params.Save(params.QueryCoordGrpcServerCfg.Port.Key, fmt.Sprint(ports[2])) - params.Save(params.DataNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[3])) - params.Save(params.QueryNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[4])) - params.Save(params.IndexNodeGrpcServerCfg.Port.Key, fmt.Sprint(ports[5])) - params.Save(params.ProxyGrpcServerCfg.Port.Key, fmt.Sprint(ports[6])) - - err = cluster.RootCoord.Run() + err := cluster.RootCoord.Run() if err != nil { return err }