fix: Make compactor able to clear empty segments (#32821)

See also: #32553

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/33024/head
XuanYang-cn 2024-05-13 18:21:32 +08:00 committed by GitHub
parent ba625835bc
commit 29b621f759
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 74 additions and 41 deletions

View File

@ -486,22 +486,23 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
dblobs := make(map[UniqueID][]*Blob)
allPath := make([][]string, 0)
for _, s := range t.plan.GetSegmentBinlogs() {
// Get the number of field binlog files from non-empty segment
var binlogNum int
log := log.With(zap.Int64("segmentID", s.GetSegmentID()))
// Get the batch count of field binlog files
var binlogBatch int
for _, b := range s.GetFieldBinlogs() {
if b != nil {
binlogNum = len(b.GetBinlogs())
binlogBatch = len(b.GetBinlogs())
break
}
}
// Unable to deal with all empty segments cases, so return error
if binlogNum == 0 {
log.Warn("compact wrong, all segments' binlogs are empty")
return nil, errIllegalCompactionPlan
if binlogBatch == 0 {
log.Warn("compacting empty segment")
continue
}
for idx := 0; idx < binlogNum; idx++ {
for idx := 0; idx < binlogBatch; idx++ {
var ps []string
for _, f := range s.GetFieldBinlogs() {
ps = append(ps, f.GetBinlogs()[idx].GetLogPath())
@ -509,7 +510,6 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
allPath = append(allPath, ps)
}
segID := s.GetSegmentID()
paths := make([]string, 0)
for _, d := range s.GetDeltalogs() {
for _, l := range d.GetBinlogs() {
@ -521,13 +521,25 @@ func (t *compactionTask) compact() (*datapb.CompactionPlanResult, error) {
if len(paths) != 0 {
bs, err := downloadBlobs(ctxTimeout, t.binlogIO, paths)
if err != nil {
log.Warn("compact wrong, fail to download deltalogs", zap.Int64("segment", segID), zap.Strings("path", paths), zap.Error(err))
log.Warn("compact wrong, fail to download deltalogs", zap.Strings("path", paths), zap.Error(err))
return nil, err
}
dblobs[segID] = append(dblobs[segID], bs...)
dblobs[s.GetSegmentID()] = append(dblobs[s.GetSegmentID()], bs...)
}
}
log.Info("compact download deltalogs done", zap.Duration("elapse", t.tr.RecordSpan()))
// Unable to deal with all empty segments cases, so return error
if len(allPath) == 0 {
log.Warn("compact wrong, all segments are empty")
return nil, errIllegalCompactionPlan
}
log.Info("compact download deltalogs elapse", zap.Duration("elapse", t.tr.RecordSpan()))
if err != nil {
log.Warn("compact IO wrong", zap.Error(err))
return nil, err
}
deltaPk2Ts, err := t.mergeDeltalogs(dblobs)
if err != nil {

View File

@ -778,6 +778,31 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Deltalogs: nil,
}}
paramtable.Get().Save(Params.CommonCfg.EntityExpirationTTL.Key, "0") // Turn off auto expiration
t.Run("Test compact with all segment empty", func(t *testing.T) {
alloc := allocator.NewMockAllocator(t)
alloc.EXPECT().AllocOne().Call.Return(int64(11111), nil)
ctx, cancel := context.WithCancel(context.TODO())
mockSyncmgr := syncmgr.NewMockSyncManager(t)
mockSyncmgr.EXPECT().Block(mock.Anything).Return()
task := &compactionTask{
ctx: ctx,
cancel: cancel,
Allocator: alloc,
done: make(chan struct{}, 1),
tr: timerecord.NewTimeRecorder("test"),
syncMgr: mockSyncmgr,
plan: &datapb.CompactionPlan{
PlanID: 999,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{{SegmentID: 100}},
TimeoutInSeconds: 10,
Type: datapb.CompactionType_MixCompaction,
},
}
_, err := task.compact()
assert.ErrorIs(t, errIllegalCompactionPlan, err)
})
t.Run("Test compact invalid empty segment binlogs", func(t *testing.T) {
plan := &datapb.CompactionPlan{
@ -894,16 +919,17 @@ func TestCompactorInterfaceMethods(t *testing.T) {
NumOfRows: 2,
}, bfs)
metaCache.EXPECT().GetSegmentByID(mock.Anything).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) {
switch id {
case c.segID1:
return seg1, true
case c.segID2:
return seg2, true
default:
return nil, false
}
})
bfs = metacache.NewBloomFilterSet()
seg3 := metacache.NewSegmentInfo(&datapb.SegmentInfo{
CollectionID: c.colID,
PartitionID: c.parID,
ID: 99999,
}, bfs)
metaCache.EXPECT().GetSegmentByID(c.segID1).Return(seg1, true)
metaCache.EXPECT().GetSegmentByID(c.segID2).Return(seg2, true)
metaCache.EXPECT().GetSegmentByID(seg3.SegmentID()).Return(seg3, true)
metaCache.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false)
iData1 := genInsertDataWithPKs(c.pks1, c.pkType)
dData1 := &DeleteData{
@ -953,6 +979,9 @@ func TestCompactorInterfaceMethods(t *testing.T) {
Field2StatslogPaths: lo.Values(sPaths2),
Deltalogs: dPaths2,
},
{
SegmentID: seg3.SegmentID(), // empty segment
},
},
StartTime: 0,
TimeoutInSeconds: 10,

View File

@ -91,6 +91,7 @@ func (ddn *ddNode) IsValidInMsg(in []Msg) bool {
// Operate handles input messages, implementing flowgrpah.Node
func (ddn *ddNode) Operate(in []Msg) []Msg {
log := log.With(zap.String("channel", ddn.vChannelName))
msMsg, ok := in[0].(*MsgStreamMsg)
if !ok {
log.Warn("type assertion failed for MsgStreamMsg", zap.String("name", reflect.TypeOf(in[0]).Name()))
@ -109,14 +110,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
endPositions: msMsg.EndPositions(),
dropCollection: false,
}
log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID), zap.String("channel", ddn.vChannelName))
log.Warn("MsgStream closed", zap.Any("ddNode node", ddn.Name()), zap.Int64("collection", ddn.collectionID))
return []Msg{&fgMsg}
}
if load := ddn.dropMode.Load(); load != nil && load.(bool) {
log.RatedInfo(1.0, "ddNode in dropMode",
zap.String("vChannelName", ddn.vChannelName),
zap.Int64("collectionID", ddn.collectionID))
log.RatedInfo(1.0, "ddNode in dropMode")
return []Msg{}
}
@ -147,12 +146,10 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
switch msg.Type() {
case commonpb.MsgType_DropCollection:
if msg.(*msgstream.DropCollectionMsg).GetCollectionID() == ddn.collectionID {
log.Info("Receiving DropCollection msg",
zap.Int64("collectionID", ddn.collectionID),
zap.String("vChannelName", ddn.vChannelName))
log.Info("Receiving DropCollection msg")
ddn.dropMode.Store(true)
log.Info("Stop compaction of vChannel", zap.String("vChannelName", ddn.vChannelName))
log.Info("Stop compaction for dropped channel")
ddn.compactionExecutor.discardByDroppedChannel(ddn.vChannelName)
fgMsg.dropCollection = true
}
@ -160,10 +157,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
case commonpb.MsgType_DropPartition:
dpMsg := msg.(*msgstream.DropPartitionMsg)
if dpMsg.GetCollectionID() == ddn.collectionID {
log.Info("drop partition msg received",
zap.Int64("collectionID", dpMsg.GetCollectionID()),
zap.Int64("partitionID", dpMsg.GetPartitionID()),
zap.String("vChanneName", ddn.vChannelName))
log.Info("drop partition msg received", zap.Int64("partitionID", dpMsg.GetPartitionID()))
fgMsg.dropPartitions = append(fgMsg.dropPartitions, dpMsg.PartitionID)
}
@ -180,8 +174,7 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
log.Debug("filter insert messages",
zap.Int64("filter segmentID", imsg.GetSegmentID()),
zap.Uint64("message timestamp", msg.EndTs()),
zap.String("segment's vChannel", imsg.GetShardName()),
zap.String("current vChannel", ddn.vChannelName))
)
continue
}
@ -200,15 +193,12 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
Add(float64(imsg.GetNumRows()))
log.Debug("DDNode receive insert messages",
zap.Int("numRows", len(imsg.GetRowIDs())),
zap.String("vChannelName", ddn.vChannelName))
zap.Int64("segmentID", imsg.GetSegmentID()),
zap.Int("numRows", len(imsg.GetRowIDs())))
fgMsg.insertMessages = append(fgMsg.insertMessages, imsg)
case commonpb.MsgType_Delete:
dmsg := msg.(*msgstream.DeleteMsg)
log.Debug("DDNode receive delete messages",
zap.Int64("numRows", dmsg.NumRows),
zap.String("vChannelName", ddn.vChannelName))
if dmsg.CollectionID != ddn.collectionID {
log.Warn("filter invalid DeleteMsg, collection mis-match",
@ -216,6 +206,8 @@ func (ddn *ddNode) Operate(in []Msg) []Msg {
zap.Int64("Expected collID", ddn.collectionID))
continue
}
log.Debug("DDNode receive delete messages", zap.Int64("numRows", dmsg.NumRows))
rateCol.Add(metricsinfo.DeleteConsumeThroughput, float64(proto.Size(&dmsg.DeleteRequest)))
metrics.DataNodeConsumeBytesCount.