mirror of https://github.com/milvus-io/milvus.git
fix: Clustering compaction ignoring deltalogs (#39132)
See also: #39131 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/38452/head
parent
4355b485e5
commit
b8fca4f5c1
|
@ -473,7 +473,8 @@ func (t *clusteringCompactionTask) mapping(ctx context.Context,
|
|||
for _, segment := range inputSegments {
|
||||
segmentClone := &datapb.CompactionSegmentBinlogs{
|
||||
SegmentID: segment.SegmentID,
|
||||
// only FieldBinlogs needed
|
||||
// only FieldBinlogs and deltalogs needed
|
||||
Deltalogs: segment.Deltalogs,
|
||||
FieldBinlogs: segment.FieldBinlogs,
|
||||
}
|
||||
future := t.mappingPool.Submit(func() (any, error) {
|
||||
|
|
|
@ -170,6 +170,15 @@ func (s *ClusteringCompactionTaskSuite) TestCompactionInit() {
|
|||
}
|
||||
|
||||
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
|
||||
dblobs, err := getInt64DeltaBlobs(
|
||||
1,
|
||||
[]int64{100},
|
||||
[]uint64{tsoutil.ComposeTSByTime(getMilvusBirthday().Add(time.Second), 0)},
|
||||
)
|
||||
s.Require().NoError(err)
|
||||
s.mockBinlogIO.EXPECT().Download(mock.Anything, []string{"1"}).
|
||||
Return([][]byte{dblobs.GetValue()}, nil).Once()
|
||||
|
||||
schema := genCollectionSchema()
|
||||
var segmentID int64 = 1001
|
||||
segWriter, err := NewSegmentWriter(schema, 1000, compactionBatchSize, segmentID, PartitionID, CollectionID, []int64{})
|
||||
|
@ -193,6 +202,9 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
|
|||
{
|
||||
SegmentID: segmentID,
|
||||
FieldBinlogs: lo.Values(fBinlogs),
|
||||
Deltalogs: []*datapb.FieldBinlog{
|
||||
{Binlogs: []*datapb.Binlog{{LogID: 1, LogPath: "1"}}},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -236,6 +248,12 @@ func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormal() {
|
|||
s.Equal(2, totalBinlogNum/len(schema.GetFields()))
|
||||
s.Equal(1, statsBinlogNum)
|
||||
s.Equal(totalRowNum, statsRowNum)
|
||||
|
||||
s.EqualValues(10239,
|
||||
lo.SumBy(compactionResult.GetSegments(), func(seg *datapb.CompactionSegment) int64 {
|
||||
return seg.GetNumOfRows()
|
||||
}),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *ClusteringCompactionTaskSuite) TestScalarCompactionNormalByMemoryLimit() {
|
||||
|
|
Loading…
Reference in New Issue