fix: Skip unstable compaction test it (#31116)

See also: #31106

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/30824/head
XuanYang-cn 2024-03-08 10:23:00 +08:00 committed by GitHub
parent a65a9ce8a5
commit 7e17f24d45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 39 additions and 32 deletions

View File

@ -22,10 +22,21 @@ import (
"github.com/milvus-io/milvus/tests/integration"
)
// This is an unstable it, need to be fixed later
// func TestCompactionSuite(t *testing.T) {
// suite.Run(t, new(CompactionSuite))
// }
type CompactionSuite struct {
integration.MiniClusterSuite
dim int
}
// issue: https://github.com/milvus-io/milvus/issues/30137
func (s *DataNodeSuite) TestClearCompactionTask() {
func (s *CompactionSuite) TestClearCompactionTask() {
s.dim = 128
collName := "test_yx"
collName := "test_compaction"
// generate 1 segment
pks := s.generateSegment(collName, 1)
@ -38,7 +49,7 @@ func (s *DataNodeSuite) TestClearCompactionTask() {
s.deleteAndFlush(pks, collName)
}
func (s *DataNodeSuite) deleteAndFlush(pks []int64, collection string) {
func (s *CompactionSuite) deleteAndFlush(pks []int64, collection string) {
ctx := context.Background()
expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ","))
@ -75,40 +86,23 @@ func (s *DataNodeSuite) deleteAndFlush(pks []int64, collection string) {
log.Info("=========================Data flush done=========================")
}
func (s *DataNodeSuite) compactAndReboot(collection string) {
func (s *CompactionSuite) compactAndReboot(collection string) {
ctx := context.Background()
// create index
// create index and wait for index done
createIndexStatus, err := s.Cluster.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collection,
FieldName: integration.FloatVecField,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(s.dim, integration.IndexHNSW, metric.IP),
ExtraParams: integration.ConstructIndexParam(s.dim, integration.IndexFaissIDMap, metric.IP),
})
s.Require().NoError(err)
s.Require().True(merr.Ok(createIndexStatus))
for stay, timeout := true, time.After(time.Second*10); stay; {
select {
case <-timeout:
stay = false
default:
describeIndexResp, err := s.Cluster.Proxy.DescribeIndex(ctx, &milvuspb.DescribeIndexRequest{
CollectionName: collection,
FieldName: integration.FloatVecField,
IndexName: "_default",
})
s.Require().NoError(err)
for _, d := range describeIndexResp.GetIndexDescriptions() {
if d.GetFieldName() == integration.FloatVecField && d.GetState() == commonpb.IndexState_Finished {
log.Info("build index finished", zap.Any("index_desc", d))
stay = false
}
}
time.Sleep(1 * time.Second)
}
}
ctxTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
s.WaitForIndexBuilt(ctxTimeout, collection, integration.FloatVecField)
// get collectionID
coll, err := s.Cluster.Proxy.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
CollectionName: collection,
})
@ -122,8 +116,9 @@ func (s *DataNodeSuite) compactAndReboot(collection string) {
})
s.Require().NoError(err)
s.Require().True(merr.Ok(coll.GetStatus()))
s.NotEqualValues(-1, compactionResp.GetCompactionID())
s.EqualValues(1, compactionResp.GetCompactionPlanCount())
// make sure compaction is triggerred successfully
s.Require().NotEqualValues(-1, compactionResp.GetCompactionID())
s.Require().EqualValues(1, compactionResp.GetCompactionPlanCount())
compactID := compactionResp.GetCompactionID()
stateResp, err := s.Cluster.Proxy.GetCompactionState(ctx, &milvuspb.GetCompactionStateRequest{
@ -133,8 +128,20 @@ func (s *DataNodeSuite) compactAndReboot(collection string) {
s.Require().NoError(err)
s.Require().True(merr.Ok(stateResp.GetStatus()))
// sleep to ensure compaction tasks are submitted to DN
time.Sleep(3 * time.Second)
compactionSubmitted := func() bool {
resp, err := s.Cluster.DataNode.GetCompactionState(ctx, &datapb.CompactionStateRequest{})
s.Require().NoError(err)
s.Require().True(merr.Ok(resp.GetStatus()))
return len(resp.GetResults()) > 0
}
for !compactionSubmitted() {
select {
case <-time.After(1 * time.Minute):
s.FailNow("failed to wait compaction submitted after 1 minite")
case <-time.After(500 * time.Millisecond):
}
}
planResp, err := s.Cluster.Proxy.GetCompactionStateWithPlans(ctx, &milvuspb.GetCompactionPlansRequest{
CompactionID: compactID,
@ -163,7 +170,7 @@ func (s *DataNodeSuite) compactAndReboot(collection string) {
}
}
func (s *DataNodeSuite) generateSegment(collection string, segmentCount int) []int64 {
func (s *CompactionSuite) generateSegment(collection string, segmentCount int) []int64 {
c := s.Cluster
schema := integration.ConstructSchema(collection, s.dim, true)