mirror of https://github.com/milvus-io/milvus.git
fix: Set an empty segment if compaction deleted all inserts (#36044)
See also: #36038 --------- Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/36103/head
parent
99817953eb
commit
2687747278
|
@ -270,7 +270,12 @@ func (s *MixCompactionTaskSuite) TestSplitMergeEntityExpired() {
|
|||
|
||||
compactionSegments, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, nil)
|
||||
s.NoError(err)
|
||||
s.Equal(0, len(compactionSegments))
|
||||
s.Equal(1, len(compactionSegments))
|
||||
s.EqualValues(0, compactionSegments[0].GetNumOfRows())
|
||||
s.EqualValues(19531, compactionSegments[0].GetSegmentID())
|
||||
s.Empty(compactionSegments[0].GetDeltalogs())
|
||||
s.Empty(compactionSegments[0].GetInsertLogs())
|
||||
s.Empty(compactionSegments[0].GetField2StatslogPaths())
|
||||
}
|
||||
|
||||
func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
||||
|
@ -280,10 +285,11 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
|||
description string
|
||||
deletions map[interface{}]uint64
|
||||
expectedRes int
|
||||
leftNumRows int
|
||||
}{
|
||||
{"no deletion", nil, 1},
|
||||
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1},
|
||||
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 0},
|
||||
{"no deletion", nil, 1, 1},
|
||||
{"mismatch deletion", map[interface{}]uint64{int64(1): deleteTs}, 1, 1},
|
||||
{"deleted pk=4", map[interface{}]uint64{int64(4): deleteTs}, 1, 0},
|
||||
}
|
||||
|
||||
alloc := allocator.NewLocalAllocator(888888, math.MaxInt64)
|
||||
|
@ -304,9 +310,7 @@ func (s *MixCompactionTaskSuite) TestMergeNoExpiration() {
|
|||
res, err := s.task.mergeSplit(s.task.ctx, [][]string{lo.Keys(kvs)}, test.deletions)
|
||||
s.NoError(err)
|
||||
s.EqualValues(test.expectedRes, len(res))
|
||||
if test.expectedRes > 0 {
|
||||
s.EqualValues(1, res[0].GetNumOfRows())
|
||||
}
|
||||
s.EqualValues(test.leftNumRows, res[0].GetNumOfRows())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ type MultiSegmentWriter struct {
|
|||
// segID -> fieldID -> binlogs
|
||||
|
||||
res []*datapb.CompactionSegment
|
||||
// DONOT leave it empty of all segments are deleted, just return a segment with zero meta for datacoord
|
||||
}
|
||||
|
||||
type compactionAlloactor struct {
|
||||
|
@ -195,9 +196,27 @@ func (w *MultiSegmentWriter) Write(v *storage.Value) error {
|
|||
return writer.Write(v)
|
||||
}
|
||||
|
||||
// Could return an empty list if every insert of the segment is deleted
|
||||
func (w *MultiSegmentWriter) appendEmptySegment() error {
|
||||
writer, err := w.getWriter()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
w.res = append(w.res, &datapb.CompactionSegment{
|
||||
SegmentID: writer.GetSegmentID(),
|
||||
NumOfRows: 0,
|
||||
Channel: w.channel,
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
// DONOT return an empty list if every insert of the segment is deleted,
|
||||
// append an empty segment instead
|
||||
func (w *MultiSegmentWriter) Finish() ([]*datapb.CompactionSegment, error) {
|
||||
if w.current == -1 {
|
||||
if err := w.appendEmptySegment(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w.res, nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue