From d01107326957822f5a6494810b57ff4d22c3b10f Mon Sep 17 00:00:00 2001 From: XuanYang-cn Date: Fri, 18 Feb 2022 18:47:51 +0800 Subject: [PATCH] Fix DataNode panic when compact empty segment (#15580) remove the TODO in compactor See also: #15573 Signed-off-by: yangxuan --- internal/datanode/compactor.go | 19 ++++++++++++---- internal/datanode/compactor_test.go | 34 +++++++++++++++++++++-------- 2 files changed, 40 insertions(+), 13 deletions(-) diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index 93b6ce163e..4348093b36 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -379,11 +379,22 @@ func (t *compactionTask) compact() error { g, gCtx := errgroup.WithContext(ctxTimeout) for _, s := range t.plan.GetSegmentBinlogs() { - // TODO may panic - fieldNum := len(s.GetFieldBinlogs()[0].GetBinlogs()) + // Get the number of field binlog files from non-empty segment + var binlogNum int + for _, b := range s.GetFieldBinlogs() { + if b != nil { + binlogNum = len(b.GetBinlogs()) + break + } + } + // Unable to deal with all empty segments cases, so return error + if binlogNum == 0 { + log.Error("compact wrong, all segments' binlogs are empty", zap.Int64("planID", t.plan.GetPlanID())) + return errIllegalCompactionPlan + } - for idx := 0; idx < fieldNum; idx++ { - ps := make([]string, 0, fieldNum) + for idx := 0; idx < binlogNum; idx++ { + var ps []string for _, f := range s.GetFieldBinlogs() { ps = append(ps, f.GetBinlogs()[idx].GetLogPath()) } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index 36bef344fd..f1617c0993 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -435,17 +435,17 @@ func TestCompactorInterfaceMethods(t *testing.T) { cpaths, err := mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta) require.NoError(t, err) require.Equal(t, 11, len(cpaths.inPaths)) + segBinlogs := []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: segID, + FieldBinlogs: cpaths.inPaths, + Field2StatslogPaths: cpaths.statsPaths, + Deltalogs: cpaths.deltaInfo, + }} plan := &datapb.CompactionPlan{ - PlanID: 10080, - SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ - { - SegmentID: segID, - FieldBinlogs: cpaths.inPaths, - Field2StatslogPaths: cpaths.statsPaths, - Deltalogs: cpaths.deltaInfo, - }, - }, + PlanID: 10080, + SegmentBinlogs: segBinlogs, StartTime: 0, TimeoutInSeconds: 1, Type: datapb.CompactionType_InnerCompaction, @@ -473,6 +473,21 @@ func TestCompactorInterfaceMethods(t *testing.T) { planID := task.getPlanID() assert.Equal(t, plan.GetPlanID(), planID) + err = mockKv.RemoveWithPrefix("/") + require.NoError(t, err) + cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta) + require.NoError(t, err) + plan.PlanID = 999876 + segmentBinlogsWithEmptySegment := []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: segID, + }, + } + plan.SegmentBinlogs = segmentBinlogsWithEmptySegment + err = task.compact() + assert.Error(t, err) + + plan.SegmentBinlogs = segBinlogs // New test, remove all the binlogs in memkv // Deltas in timetravel range err = mockKv.RemoveWithPrefix("/") @@ -500,6 +515,7 @@ func TestCompactorInterfaceMethods(t *testing.T) { mockfm.sleepSeconds = plan.TimeoutInSeconds + int32(1) err = task.compact() assert.Error(t, err) + }) t.Run("Test typeII compact valid", func(t *testing.T) {