From 7e17f24d4516b32b86ccef9c891880e63e110002 Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 8 Mar 2024 10:23:00 +0800 Subject: [PATCH] fix: Skip unstable compaction test it (#31116) See also: #31106 Signed-off-by: yangxuan --- tests/integration/datanode/compaction_test.go | 71 ++++++++++--------- 1 file changed, 39 insertions(+), 32 deletions(-) diff --git a/tests/integration/datanode/compaction_test.go b/tests/integration/datanode/compaction_test.go index e905987ba6..051b189a38 100644 --- a/tests/integration/datanode/compaction_test.go +++ b/tests/integration/datanode/compaction_test.go @@ -22,10 +22,21 @@ import ( "github.com/milvus-io/milvus/tests/integration" ) +// This is an unstable it, need to be fixed later +// func TestCompactionSuite(t *testing.T) { +// suite.Run(t, new(CompactionSuite)) +// } + +type CompactionSuite struct { + integration.MiniClusterSuite + + dim int +} + // issue: https://github.com/milvus-io/milvus/issues/30137 -func (s *DataNodeSuite) TestClearCompactionTask() { +func (s *CompactionSuite) TestClearCompactionTask() { s.dim = 128 - collName := "test_yx" + collName := "test_compaction" // generate 1 segment pks := s.generateSegment(collName, 1) @@ -38,7 +49,7 @@ func (s *DataNodeSuite) TestClearCompactionTask() { s.deleteAndFlush(pks, collName) } -func (s *DataNodeSuite) deleteAndFlush(pks []int64, collection string) { +func (s *CompactionSuite) deleteAndFlush(pks []int64, collection string) { ctx := context.Background() expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) @@ -75,40 +86,23 @@ func (s *DataNodeSuite) deleteAndFlush(pks []int64, collection string) { log.Info("=========================Data flush done=========================") } -func (s *DataNodeSuite) compactAndReboot(collection string) { +func (s *CompactionSuite) compactAndReboot(collection string) { ctx := context.Background() - // create index + // create index and wait for index done createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ CollectionName: collection, FieldName: integration.FloatVecField, IndexName: "_default", - ExtraParams: integration.ConstructIndexParam(s.dim, integration.IndexHNSW, metric.IP), + ExtraParams: integration.ConstructIndexParam(s.dim, integration.IndexFaissIDMap, metric.IP), }) s.Require().NoError(err) s.Require().True(merr.Ok(createIndexStatus)) - for stay, timeout := true, time.After(time.Second*10); stay; { - select { - case <-timeout: - stay = false - default: - describeIndexResp, err := s.Cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{ - CollectionName: collection, - FieldName: integration.FloatVecField, - IndexName: "_default", - }) - s.Require().NoError(err) - - for _, d := range describeIndexResp.GetIndexDescriptions() { - if d.GetFieldName() == integration.FloatVecField && d.GetState() == commonpb.IndexState_Finished { - log.Info("build index finished", zap.Any("index_desc", d)) - stay = false - } - } - time.Sleep(1 * time.Second) - } - } + ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + s.WaitForIndexBuilt(ctxTimeout, collection, integration.FloatVecField) + // get collectionID coll, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{ CollectionName: collection, }) @@ -122,8 +116,9 @@ func (s *DataNodeSuite) compactAndReboot(collection string) { }) s.Require().NoError(err) s.Require().True(merr.Ok(coll.GetStatus())) - s.NotEqualValues(-1, compactionResp.GetCompactionID()) - s.EqualValues(1, compactionResp.GetCompactionPlanCount()) + // make sure compaction is triggerred successfully + s.Require().NotEqualValues(-1, compactionResp.GetCompactionID()) + s.Require().EqualValues(1, compactionResp.GetCompactionPlanCount()) compactID := compactionResp.GetCompactionID() stateResp, err := s.Cluster.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{ @@ -133,8 +128,20 @@ func (s *DataNodeSuite) compactAndReboot(collection string) { s.Require().NoError(err) s.Require().True(merr.Ok(stateResp.GetStatus())) - // sleep to ensure compaction tasks are submitted to DN - time.Sleep(3 * time.Second) + compactionSubmitted := func() bool { + resp, err := s.Cluster.DataNode.GetCompactionState(ctx, &datapb.CompactionStateRequest{}) + s.Require().NoError(err) + s.Require().True(merr.Ok(resp.GetStatus())) + return len(resp.GetResults()) > 0 + } + + for !compactionSubmitted() { + select { + case <-time.After(1 * time.Minute): + s.FailNow("failed to wait compaction submitted after 1 minite") + case <-time.After(500 * time.Millisecond): + } + } planResp, err := s.Cluster.Proxy.GetCompactionStateWithPlans(ctx, &milvuspb.GetCompactionPlansRequest{ CompactionID: compactID, @@ -163,7 +170,7 @@ func (s *DataNodeSuite) compactAndReboot(collection string) { } } -func (s *DataNodeSuite) generateSegment(collection string, segmentCount int) []int64 { +func (s *CompactionSuite) generateSegment(collection string, segmentCount int) []int64 { c := s.Cluster schema := integration.ConstructSchema(collection, s.dim, true)