enhance: [cherry-pick] refine clustering compaction basic it (#34794)

issue: #34792
pr: #34793

Signed-off-by: wayblink <anyang.wang@zilliz.com>
pull/34869/head
wayblink 2024-07-21 20:05:41 +08:00 committed by GitHub
parent c0c3c5f528
commit 21973a600d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 79 additions and 1 deletions

View File

@ -29,9 +29,12 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/util/hookutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metric"
"github.com/milvus-io/milvus/tests/integration"
)
@ -107,6 +110,37 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() {
}
s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName)
indexType := integration.IndexFaissIvfFlat
metricType := metric.L2
vecType := schemapb.DataType_FloatVector
// create index
createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{
CollectionName: collectionName,
FieldName: fVecColumn.FieldName,
IndexName: "_default",
ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType),
})
if createIndexStatus.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("createIndexStatus fail reason", zap.String("reason", createIndexStatus.GetReason()))
}
s.NoError(err)
s.Equal(commonpb.ErrorCode_Success, createIndexStatus.GetErrorCode())
s.WaitForIndexBuilt(ctx, collectionName, fVecColumn.FieldName)
// load
loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
if loadStatus.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("loadStatus fail reason", zap.String("reason", loadStatus.GetReason()))
}
s.Equal(commonpb.ErrorCode_Success, loadStatus.GetErrorCode())
s.WaitForLoad(ctx, collectionName)
compactReq := &milvuspb.ManualCompactionRequest{
CollectionID: showCollectionsResp.CollectionIds[0],
MajorCompaction: true,
@ -125,10 +159,54 @@ func (s *ClusteringCompactionSuite) TestClusteringCompaction() {
return resp.GetState() == commonpb.CompactionState_Completed
}
for !compacted() {
time.Sleep(1 * time.Second)
time.Sleep(3 * time.Second)
}
log.Info("compact done")
// search
expr := fmt.Sprintf("%s > 0", integration.Int64Field)
nq := 10
topk := 10
roundDecimal := -1
params := integration.GetSearchParams(indexType, metricType)
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
fVecColumn.FieldName, vecType, nil, metricType, params, nq, dim, topk, roundDecimal)
searchCheckReport := func() {
timeoutCtx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
defer cancelFunc()
for {
select {
case <-timeoutCtx.Done():
s.Fail("search check timeout")
case report := <-c.Extension.GetReportChan():
reportInfo := report.(map[string]any)
log.Info("search report info", zap.Any("reportInfo", reportInfo))
s.Equal(hookutil.OpTypeSearch, reportInfo[hookutil.OpTypeKey])
s.NotEqualValues(0, reportInfo[hookutil.ResultDataSizeKey])
s.NotEqualValues(0, reportInfo[hookutil.RelatedDataSizeKey])
s.EqualValues(rowNum, reportInfo[hookutil.RelatedCntKey])
return
}
}
}
go searchCheckReport()
searchResult, err := c.Proxy.Search(ctx, searchReq)
err = merr.CheckRPCCall(searchResult, err)
s.NoError(err)
querySegmentInfo, err := c.Proxy.GetQuerySegmentInfo(ctx, &milvuspb.GetQuerySegmentInfoRequest{
DbName: dbName,
CollectionName: collectionName,
})
var queryRows int64 = 0
for _, seg := range querySegmentInfo.Infos {
queryRows += seg.NumRows
}
s.Equal(int64(3000), queryRows)
log.Info("TestClusteringCompaction succeed")
}