mirror of https://github.com/milvus-io/milvus.git
Fix DataNode panic when compact empty segment (#15580)
remove the TODO in compactor See also: #15573 Signed-off-by: yangxuan <xuan.yang@zilliz.com>pull/15707/head
parent
142848fcc3
commit
d011073269
|
@ -379,11 +379,22 @@ func (t *compactionTask) compact() error {
|
|||
g, gCtx := errgroup.WithContext(ctxTimeout)
|
||||
for _, s := range t.plan.GetSegmentBinlogs() {
|
||||
|
||||
// TODO may panic
|
||||
fieldNum := len(s.GetFieldBinlogs()[0].GetBinlogs())
|
||||
// Get the number of field binlog files from non-empty segment
|
||||
var binlogNum int
|
||||
for _, b := range s.GetFieldBinlogs() {
|
||||
if b != nil {
|
||||
binlogNum = len(b.GetBinlogs())
|
||||
break
|
||||
}
|
||||
}
|
||||
// Unable to deal with all empty segments cases, so return error
|
||||
if binlogNum == 0 {
|
||||
log.Error("compact wrong, all segments' binlogs are empty", zap.Int64("planID", t.plan.GetPlanID()))
|
||||
return errIllegalCompactionPlan
|
||||
}
|
||||
|
||||
for idx := 0; idx < fieldNum; idx++ {
|
||||
ps := make([]string, 0, fieldNum)
|
||||
for idx := 0; idx < binlogNum; idx++ {
|
||||
var ps []string
|
||||
for _, f := range s.GetFieldBinlogs() {
|
||||
ps = append(ps, f.GetBinlogs()[idx].GetLogPath())
|
||||
}
|
||||
|
|
|
@ -435,17 +435,17 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
|||
cpaths, err := mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 11, len(cpaths.inPaths))
|
||||
segBinlogs := []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: segID,
|
||||
FieldBinlogs: cpaths.inPaths,
|
||||
Field2StatslogPaths: cpaths.statsPaths,
|
||||
Deltalogs: cpaths.deltaInfo,
|
||||
}}
|
||||
|
||||
plan := &datapb.CompactionPlan{
|
||||
PlanID: 10080,
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: segID,
|
||||
FieldBinlogs: cpaths.inPaths,
|
||||
Field2StatslogPaths: cpaths.statsPaths,
|
||||
Deltalogs: cpaths.deltaInfo,
|
||||
},
|
||||
},
|
||||
PlanID: 10080,
|
||||
SegmentBinlogs: segBinlogs,
|
||||
StartTime: 0,
|
||||
TimeoutInSeconds: 1,
|
||||
Type: datapb.CompactionType_InnerCompaction,
|
||||
|
@ -473,6 +473,21 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
|||
planID := task.getPlanID()
|
||||
assert.Equal(t, plan.GetPlanID(), planID)
|
||||
|
||||
err = mockKv.RemoveWithPrefix("/")
|
||||
require.NoError(t, err)
|
||||
cpaths, err = mockbIO.upload(context.TODO(), segID, partID, []*InsertData{iData}, dData, meta)
|
||||
require.NoError(t, err)
|
||||
plan.PlanID = 999876
|
||||
segmentBinlogsWithEmptySegment := []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: segID,
|
||||
},
|
||||
}
|
||||
plan.SegmentBinlogs = segmentBinlogsWithEmptySegment
|
||||
err = task.compact()
|
||||
assert.Error(t, err)
|
||||
|
||||
plan.SegmentBinlogs = segBinlogs
|
||||
// New test, remove all the binlogs in memkv
|
||||
// Deltas in timetravel range
|
||||
err = mockKv.RemoveWithPrefix("/")
|
||||
|
@ -500,6 +515,7 @@ func TestCompactorInterfaceMethods(t *testing.T) {
|
|||
mockfm.sleepSeconds = plan.TimeoutInSeconds + int32(1)
|
||||
err = task.compact()
|
||||
assert.Error(t, err)
|
||||
|
||||
})
|
||||
|
||||
t.Run("Test typeII compact valid", func(t *testing.T) {
|
||||
|
|
Loading…
Reference in New Issue