From 1cd15d9322f8b3f0437b01467a42253321171cf4 Mon Sep 17 00:00:00 2001 From: yiwangdr <80064917+yiwangdr@users.noreply.github.com> Date: Mon, 8 Apr 2024 20:39:17 -0700 Subject: [PATCH] test: support segment release in integration test (#31190) issue: #29507 Notice that api_testonly.go files should be guarded by compiler tag `test`, so that production build rules don't compile them and these APIs don't get misused. Signed-off-by: yiwangdr --- .golangci.yml | 2 + .../distributed/querycoord/api_testonly.go | 32 ++ .../distributed/querynode/api_testonly.go | 24 ++ internal/querycoordv2/api_testonly.go | 38 ++ scripts/run_intergration_test.sh | 4 +- .../partialsearch/partial_search_test.go | 347 ++++++++++++++++++ 6 files changed, 445 insertions(+), 2 deletions(-) create mode 100644 internal/distributed/querycoord/api_testonly.go create mode 100644 internal/distributed/querynode/api_testonly.go create mode 100644 internal/querycoordv2/api_testonly.go create mode 100644 tests/integration/partialsearch/partial_search_test.go diff --git a/.golangci.yml b/.golangci.yml index 9ef9fc5260..09779daf25 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -8,6 +8,8 @@ run: - scripts - internal/core - cmake_build + skip-files: + - partial_search_test.go linters: disable-all: true diff --git a/internal/distributed/querycoord/api_testonly.go b/internal/distributed/querycoord/api_testonly.go new file mode 100644 index 0000000000..bb1543fc7f --- /dev/null +++ b/internal/distributed/querycoord/api_testonly.go @@ -0,0 +1,32 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build test +// +build test + +package grpcquerycoord + +import ( + "github.com/milvus-io/milvus/internal/querycoordv2" +) + +func (s *Server) StopCheckerForTestOnly() { + s.queryCoord.(*querycoordv2.Server).StopCheckerForTestOnly() +} + +func (s *Server) StartCheckerForTestOnly() { + s.queryCoord.(*querycoordv2.Server).StartCheckerForTestOnly() +} diff --git a/internal/distributed/querynode/api_testonly.go b/internal/distributed/querynode/api_testonly.go new file mode 100644 index 0000000000..21d5f92344 --- /dev/null +++ b/internal/distributed/querynode/api_testonly.go @@ -0,0 +1,24 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build test +// +build test + +package grpcquerynode + +func (s *Server) GetServerIDForTestOnly() int64 { + return s.serverID.Load() +} diff --git a/internal/querycoordv2/api_testonly.go b/internal/querycoordv2/api_testonly.go new file mode 100644 index 0000000000..d8673700fd --- /dev/null +++ b/internal/querycoordv2/api_testonly.go @@ -0,0 +1,38 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build test +// +build test + +package querycoordv2 + +import ( + "github.com/milvus-io/milvus/pkg/log" +) + +func (s *Server) StopCheckerForTestOnly() { + if s.checkerController != nil { + log.Info("stop checker controller for integration test...") + s.checkerController.Stop() + } +} + +func (s *Server) StartCheckerForTestOnly() { + if s.checkerController != nil { + log.Info("start checker controller for integration test...") + s.checkerController.Start() + } +} diff --git a/scripts/run_intergration_test.sh b/scripts/run_intergration_test.sh index ca0a491315..999387e43c 100755 --- a/scripts/run_intergration_test.sh +++ b/scripts/run_intergration_test.sh @@ -39,9 +39,9 @@ for d in $(go list ./tests/integration/...); do if [[ $d == *"coordrecovery"* ]]; then echo "running coordrecovery" # simplified command to speed up coord init test since it is large. - $TEST_CMD -tags dynamic -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=30m + $TEST_CMD -tags dynamic,test -v -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=20m -timeout=30m else - $TEST_CMD -race -tags dynamic -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=15m -timeout=30m + $TEST_CMD -race -tags dynamic,test -v -coverpkg=./... -coverprofile=profile.out -covermode=atomic "$d" -caseTimeout=15m -timeout=30m fi if [ -f profile.out ]; then grep -v kafka profile.out | grep -v planparserv2/generated | grep -v mocks | sed '1d' >> ${FILE_COVERAGE_INFO} diff --git a/tests/integration/partialsearch/partial_search_test.go b/tests/integration/partialsearch/partial_search_test.go new file mode 100644 index 0000000000..790802e630 --- /dev/null +++ b/tests/integration/partialsearch/partial_search_test.go @@ -0,0 +1,347 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package partialsearch + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + "time" + + "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/commonpbutil" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/milvus-io/milvus/tests/integration" +) + +type PartialSearchSuite struct { + integration.MiniClusterSuite + dim int + numCollections int + rowsPerCollection int + waitTimeInSec time.Duration + prefix string +} + +func (s *PartialSearchSuite) setupParam() { + s.dim = 128 + s.numCollections = 1 + s.rowsPerCollection = 100 + s.waitTimeInSec = time.Second * 10 +} + +func (s *PartialSearchSuite) loadCollection(collectionName string, dim int, wg *sync.WaitGroup) { + c := s.Cluster + dbName := "" + schema := integration.ConstructSchema(collectionName, dim, true) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + createCollectionStatus, err := c.Proxy.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + }) + s.NoError(err) + + err = merr.Error(createCollectionStatus) + s.NoError(err) + + showCollectionsResp, err := c.Proxy.ShowCollections(context.TODO(), &milvuspb.ShowCollectionsRequest{}) + s.NoError(err) + s.True(merr.Ok(showCollectionsResp.GetStatus())) + + batchSize := 500000 + for start := 0; start < s.rowsPerCollection; start += batchSize { + rowNum := batchSize + if start+batchSize > s.rowsPerCollection { + rowNum = s.rowsPerCollection - start + } + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + s.NoError(err) + s.True(merr.Ok(insertResult.GetStatus())) + } + log.Info("=========================Data insertion finished=========================") + + // flush + flushResp, err := c.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + s.WaitForFlush(context.TODO(), ids, flushTs, dbName, collectionName) + log.Info("=========================Data flush finished=========================") + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(context.TODO(), &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, integration.IndexFaissIvfFlat, metric.IP), + }) + s.NoError(err) + err = merr.Error(createIndexStatus) + s.NoError(err) + s.WaitForIndexBuilt(context.TODO(), collectionName, integration.FloatVecField) + log.Info("=========================Index created=========================") + + // load + loadStatus, err := c.Proxy.LoadCollection(context.TODO(), &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + err = merr.Error(loadStatus) + s.NoError(err) + s.WaitForLoad(context.TODO(), collectionName) + log.Info("=========================Collection loaded=========================") + wg.Done() +} + +func (s *PartialSearchSuite) checkCollectionLoaded(collectionName string) bool { + loadProgress, err := s.Cluster.Proxy.GetLoadingProgress(context.TODO(), &milvuspb.GetLoadingProgressRequest{ + DbName: "", + CollectionName: collectionName, + }) + s.NoError(err) + if loadProgress.GetProgress() != int64(100) { + return false + } + return true +} + +func (s *PartialSearchSuite) checkCollectionsLoaded(startCollectionID, endCollectionID int) bool { + notLoaded := 0 + loaded := 0 + for idx := startCollectionID; idx < endCollectionID; idx++ { + collectionName := s.prefix + "_" + strconv.Itoa(idx) + if s.checkCollectionLoaded(collectionName) { + notLoaded++ + } else { + loaded++ + } + } + log.Info(fmt.Sprintf("loading status: %d/%d", loaded, endCollectionID-startCollectionID+1)) + return notLoaded == 0 +} + +func (s *PartialSearchSuite) checkAllCollectionsLoaded() bool { + return s.checkCollectionsLoaded(0, s.numCollections) +} + +func (s *PartialSearchSuite) search(collectionName string, dim int) { + c := s.Cluster + var err error + // Query + queryReq := &milvuspb.QueryRequest{ + Base: nil, + CollectionName: collectionName, + PartitionNames: nil, + Expr: "", + OutputFields: []string{"count(*)"}, + TravelTimestamp: 0, + GuaranteeTimestamp: 0, + } + queryResult, err := c.Proxy.Query(context.TODO(), queryReq) + s.NoError(err) + s.Equal(queryResult.Status.ErrorCode, commonpb.ErrorCode_Success) + s.Equal(len(queryResult.FieldsData), 1) + numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0] + s.Equal(numEntities, int64(s.rowsPerCollection)) + + // Search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + radius := 10 + + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.IP) + params["radius"] = radius + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.IP, params, nq, dim, topk, roundDecimal) + + searchResult, _ := c.Proxy.Search(context.TODO(), searchReq) + + err = merr.Error(searchResult.GetStatus()) + s.NoError(err) +} + +func (s *PartialSearchSuite) FailOnSearch(collectionName string) { + c := s.Cluster + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + radius := 10 + + params := integration.GetSearchParams(integration.IndexFaissIvfFlat, metric.IP) + params["radius"] = radius + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.IP, params, nq, s.dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(context.TODO(), searchReq) + s.NoError(err) + err = merr.Error(searchResult.GetStatus()) + s.Require().Error(err) +} + +func (s *PartialSearchSuite) setupData() { + // Add the second query node + log.Info("=========================Start to inject data=========================") + s.prefix = "TestPartialSearchUtil" + funcutil.GenRandomStr() + searchName := s.prefix + "_0" + wg := sync.WaitGroup{} + for idx := 0; idx < s.numCollections; idx++ { + wg.Add(1) + go s.loadCollection(s.prefix+"_"+strconv.Itoa(idx), s.dim, &wg) + } + wg.Wait() + log.Info("=========================Data injection finished=========================") + s.checkAllCollectionsLoaded() + log.Info(fmt.Sprintf("=========================start to search %s=========================", searchName)) + s.search(searchName, s.dim) + log.Info("=========================Search finished=========================") + time.Sleep(s.waitTimeInSec) + s.checkAllCollectionsLoaded() + log.Info(fmt.Sprintf("=========================start to search2 %s=========================", searchName)) + s.search(searchName, s.dim) + log.Info("=========================Search2 finished=========================") + s.checkAllCollectionsReady() +} + +func (s *PartialSearchSuite) checkCollectionsReady(startCollectionID, endCollectionID int) { + for i := startCollectionID; i < endCollectionID; i++ { + collectionName := s.prefix + "_" + strconv.Itoa(i) + s.search(collectionName, s.dim) + queryReq := &milvuspb.QueryRequest{ + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + } + _, err := s.Cluster.Proxy.Query(context.TODO(), queryReq) + s.NoError(err) + } +} + +func (s *PartialSearchSuite) checkAllCollectionsReady() { + s.checkCollectionsReady(0, s.numCollections) +} + +func (s *PartialSearchSuite) releaseSegmentsReq(collectionID, nodeID, segmentID typeutil.UniqueID, shard string) *querypb.ReleaseSegmentsRequest { + req := &querypb.ReleaseSegmentsRequest{ + Base: commonpbutil.NewMsgBase( + commonpbutil.WithMsgType(commonpb.MsgType_ReleaseSegments), + commonpbutil.WithMsgID(1<<30), + commonpbutil.WithTargetID(nodeID), + ), + + NodeID: nodeID, + CollectionID: collectionID, + SegmentIDs: []int64{segmentID}, + Scope: querypb.DataScope_Historical, + Shard: shard, + NeedTransfer: false, + } + return req +} + +func (s *PartialSearchSuite) describeCollection(name string) (int64, []string) { + resp, err := s.Cluster.Proxy.DescribeCollection(context.TODO(), &milvuspb.DescribeCollectionRequest{ + DbName: "default", + CollectionName: name, + }) + s.NoError(err) + log.Info(fmt.Sprintf("describe collection: %v", resp)) + return resp.CollectionID, resp.VirtualChannelNames +} + +func (s *PartialSearchSuite) getSegmentIDs(collectionName string) []int64 { + resp, err := s.Cluster.Proxy.GetPersistentSegmentInfo(context.TODO(), &milvuspb.GetPersistentSegmentInfoRequest{ + DbName: "default", + CollectionName: collectionName, + }) + s.NoError(err) + var res []int64 + for _, seg := range resp.Infos { + res = append(res, seg.SegmentID) + } + return res +} + +func (s *PartialSearchSuite) TestPartialSearch() { + s.setupParam() + s.setupData() + + startCollectionID := 0 + endCollectionID := 0 + // Search should work in the beginning + s.checkCollectionsReady(startCollectionID, endCollectionID) + // Test case with one segment released + // Partial search does not work yet. + c := s.Cluster + q1 := c.QueryNode + c.QueryCoord.StopCheckerForTestOnly() + collectionName := s.prefix + "_0" + nodeID := q1.GetServerIDForTestOnly() + collectionID, channels := s.describeCollection(collectionName) + segs := s.getSegmentIDs(collectionName) + s.Require().Positive(len(segs)) + s.Require().Positive(len(channels)) + segmentID := segs[0] + shard := channels[0] + req := s.releaseSegmentsReq(collectionID, nodeID, segmentID, shard) + q1.ReleaseSegments(context.TODO(), req) + s.FailOnSearch(collectionName) + c.QueryCoord.StartCheckerForTestOnly() +} + +func TestPartialSearchUtil(t *testing.T) { + suite.Run(t, new(PartialSearchSuite)) +}