Make compaction not generate empty segment (#15707)

Signed-off-by: yah01 <yah2er0ne@outlook.com>
pull/15770/head
yah01 2022-02-28 10:17:54 +08:00 committed by GitHub
parent e8edaa02fa
commit 0cfb6a85be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 148 additions and 22 deletions

View File

@ -770,11 +770,14 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen
}
data[k] = v
}
k, v, err := m.marshal(segment)
if err != nil {
return err
if segment.NumOfRows > 0 {
k, v, err := m.marshal(segment)
if err != nil {
return err
}
data[k] = v
}
data[k] = v
if err := m.saveKvTxn(data); err != nil {
return err
@ -784,7 +787,10 @@ func (m *meta) CompleteMergeCompaction(compactionLogs []*datapb.CompactionSegmen
m.segments.DropSegment(s.GetID())
}
m.segments.SetSegment(segment.GetID(), segment)
// Handle empty segment generated by merge-compaction
if segment.NumOfRows > 0 {
m.segments.SetSegment(segment.GetID(), segment)
}
return nil
}
@ -793,6 +799,17 @@ func (m *meta) CompleteInnerCompaction(segmentBinlogs *datapb.CompactionSegmentB
defer m.Unlock()
if segment := m.segments.GetSegment(segmentBinlogs.SegmentID); segment != nil {
// The compaction deletes the entire segment
if result.NumOfRows <= 0 {
err := m.removeSegmentInfo(segment)
if err != nil {
return err
}
m.segments.DropSegment(segment.ID)
return nil
}
cloned := segment.Clone()
cloned.Binlogs = m.updateBinlogs(cloned.GetBinlogs(), segmentBinlogs.GetFieldBinlogs(), result.GetInsertLogs())
cloned.Statslogs = m.updateBinlogs(cloned.GetStatslogs(), segmentBinlogs.GetField2StatslogPaths(), result.GetField2StatslogPaths())

View File

@ -350,7 +350,7 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) {
wantErr bool
}{
{
"test normal merge",
"test normal merge compaction",
fields{
memkv.NewMemoryKV(),
nil,
@ -389,6 +389,52 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) {
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log5")},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog5")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog5")},
NumOfRows: 1,
},
},
false,
},
{
"test removing all data merge compaction",
fields{
memkv.NewMemoryKV(),
nil,
&SegmentsInfo{map[int64]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")},
}},
2: {SegmentInfo: &datapb.SegmentInfo{
ID: 2,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")},
}},
}},
},
args{
[]*datapb.CompactionSegmentBinlogs{
{
SegmentID: 1,
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")},
},
{
SegmentID: 2,
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3", "log4")},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3", "statlog4")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3", "deltalog4")},
},
},
&datapb.CompactionResult{
SegmentID: 3,
InsertLogs: nil,
Field2StatslogPaths: nil,
Deltalogs: nil,
NumOfRows: 0,
},
},
false,
@ -408,10 +454,12 @@ func Test_meta_CompleteMergeCompaction(t *testing.T) {
assert.Nil(t, m.GetSegment(l.GetSegmentID()))
}
segment := m.GetSegment(tt.args.result.SegmentID)
assert.NotNil(t, segment)
assert.EqualValues(t, tt.args.result.GetInsertLogs(), segment.GetBinlogs())
assert.EqualValues(t, tt.args.result.GetField2StatslogPaths(), segment.GetStatslogs())
assert.EqualValues(t, tt.args.result.GetDeltalogs(), segment.GetDeltalogs())
assert.Equal(t, segment != nil, tt.args.result.NumOfRows > 0)
if segment != nil {
assert.EqualValues(t, tt.args.result.GetInsertLogs(), segment.GetBinlogs())
assert.EqualValues(t, tt.args.result.GetField2StatslogPaths(), segment.GetStatslogs())
assert.EqualValues(t, tt.args.result.GetDeltalogs(), segment.GetDeltalogs())
}
}
})
}
@ -435,7 +483,7 @@ func Test_meta_CompleteInnerCompaction(t *testing.T) {
want *SegmentInfo
}{
{
"test normal merge",
"test normal inner compaction",
fields{
memkv.NewMemoryKV(),
nil,
@ -462,6 +510,7 @@ func Test_meta_CompleteInnerCompaction(t *testing.T) {
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog3")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog3")},
NumOfRows: 1,
},
},
false,
@ -474,6 +523,40 @@ func Test_meta_CompleteInnerCompaction(t *testing.T) {
},
},
},
{
"test removing all data inner compaction",
fields{
memkv.NewMemoryKV(),
nil,
&SegmentsInfo{
map[int64]*SegmentInfo{
1: {SegmentInfo: &datapb.SegmentInfo{
ID: 1,
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")},
}},
},
},
},
args{
&datapb.CompactionSegmentBinlogs{
SegmentID: 1,
FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1", "log2")},
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statlog1", "statlog2")},
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(0, "deltalog1", "deltalog2")},
},
&datapb.CompactionResult{
SegmentID: 1,
InsertLogs: nil,
Field2StatslogPaths: nil,
Deltalogs: nil,
NumOfRows: 0,
},
},
false,
nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {

View File

@ -466,17 +466,18 @@ func (t *compactionTask) compact() error {
}
uploadStart := time.Now()
cpaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta)
segPaths, err := t.upload(ctxTimeout, targetSegID, partID, iDatas, deltaBuf.delData, meta)
if err != nil {
log.Error("compact wrong", zap.Int64("planID", t.plan.GetPlanID()), zap.Error(err))
return err
}
uploadEnd := time.Now()
defer func() {
log.Debug("upload elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(uploadEnd.Sub(uploadStart))))
}()
for _, fbl := range cpaths.deltaInfo {
for _, fbl := range segPaths.deltaInfo {
for _, deltaLogInfo := range fbl.GetBinlogs() {
deltaLogInfo.LogSize = deltaBuf.GetLogSize()
deltaLogInfo.TimestampFrom = deltaBuf.GetTimestampFrom()
@ -488,9 +489,9 @@ func (t *compactionTask) compact() error {
pack := &datapb.CompactionResult{
PlanID: t.plan.GetPlanID(),
SegmentID: targetSegID,
InsertLogs: cpaths.inPaths,
Field2StatslogPaths: cpaths.statsPaths,
Deltalogs: cpaths.deltaInfo,
InsertLogs: segPaths.inPaths,
Field2StatslogPaths: segPaths.statsPaths,
Deltalogs: segPaths.deltaInfo,
NumOfRows: numRows,
}
@ -512,7 +513,11 @@ func (t *compactionTask) compact() error {
// Compaction I: update pk range.
// Compaction II: remove the segments and add a new flushed segment with pk range.
if t.hasSegment(targetSegID, true) {
t.refreshFlushedSegStatistics(targetSegID, numRows)
if numRows <= 0 {
t.removeSegments(targetSegID)
} else {
t.refreshFlushedSegStatistics(targetSegID, numRows)
}
// no need to shorten the PK range of a segment, deleting dup PKs is valid
} else {
t.mergeFlushedSegments(targetSegID, collID, partID, t.plan.GetPlanID(), segIDs, t.plan.GetChannel(), numRows)
@ -527,9 +532,9 @@ func (t *compactionTask) compact() error {
log.Info("compaction done",
zap.Int64("planID", t.plan.GetPlanID()),
zap.Int("num of binlog paths", len(cpaths.inPaths)),
zap.Int("num of stats paths", len(cpaths.statsPaths)),
zap.Int("num of delta paths", len(cpaths.deltaInfo)),
zap.Int("num of binlog paths", len(segPaths.inPaths)),
zap.Int("num of stats paths", len(segPaths.statsPaths)),
zap.Int("num of delta paths", len(segPaths.deltaInfo)),
)
log.Info("overall elapse in ms", zap.Int64("planID", t.plan.GetPlanID()), zap.Any("elapse", nano2Milli(time.Since(compactStart))))

View File

@ -422,7 +422,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
mockbIO := &binlogIO{mockKv, alloc}
replica, err := newReplica(context.TODO(), rc, collID)
require.NoError(t, err)
replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1})
replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1, 2})
iData := genInsertData()
meta := NewMetaFactory().GetCollectionMeta(collID, "test_compact_coll_name")
@ -473,6 +473,28 @@ func TestCompactorInterfaceMethods(t *testing.T) {
planID := task.getPlanID()
assert.Equal(t, plan.GetPlanID(), planID)
// Compact to delete the entire segment
deleteAllData := &DeleteData{
Pks: []UniqueID{1, 2},
Tss: []Timestamp{20000, 20001},
RowCount: 2,
}
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, deleteAllData, meta)
require.NoError(t, err)
plan.PlanID++
err = task.compact()
assert.NoError(t, err)
// The segment should be removed
assert.False(t, replica.hasSegment(segID, true))
// re-add the segment
replica.addFlushedSegmentWithPKs(segID, collID, partID, "channelname", 2, []UniqueID{1, 2})
// Compact empty segment
err = mockKv.RemoveWithPrefix("/")
require.NoError(t, err)
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
@ -515,7 +537,6 @@ func TestCompactorInterfaceMethods(t *testing.T) {
mockfm.sleepSeconds = plan.TimeoutInSeconds + int32(1)
err = task.compact()
assert.Error(t, err)
})
t.Run("Test typeII compact valid", func(t *testing.T) {