fix: Compaction trigger choose 2 same segments (#32800)

DataNode would stuck at compactor try to lock the
same segmentID

See also: #32765

---------

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/32830/head
XuanYang-cn 2024-05-07 19:01:31 +08:00 committed by GitHub
parent bcdbd1966e
commit 6843d6d376
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 63 additions and 30 deletions

View File

@ -537,17 +537,17 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
log.Warn("failed to execute compaction plan",
zap.Int64("collection", signal.collectionID),
zap.Int64("planID", plan.PlanID),
zap.Int64s("segment IDs", fetchSegIDs(plan.GetSegmentBinlogs())),
zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs())),
zap.Error(err))
continue
}
log.Info("time cost of generating compaction",
zap.Int64("plan ID", plan.PlanID),
zap.Int64("planID", plan.PlanID),
zap.Int64("time cost", time.Since(start).Milliseconds()),
zap.Int64("collectionID", signal.collectionID),
zap.String("channel", channel),
zap.Int64("partitionID", partitionID),
zap.Int64s("segment IDs", fetchSegIDs(plan.GetSegmentBinlogs())))
zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs())))
}
}
@ -626,6 +626,12 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
}
}
// since this is priority compaction, we will execute even if there is only segment
log.Info("pick priority candidate for compaction",
zap.Int64("prioritized segmentID", segment.GetID()),
zap.Int64s("picked segmentIDs", lo.Map(bucket, func(s *SegmentInfo, _ int) int64 { return s.GetID() })),
zap.Int64("target size", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })),
zap.Int64("target count", lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.GetNumOfRows() })),
)
buckets = append(buckets, bucket)
}
@ -646,13 +652,8 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge.GetAsInt()-1)
bucket = append(bucket, result...)
var targetSize int64
var targetRow int64
for _, s := range bucket {
targetSize += s.getSegmentSize()
targetRow += s.GetNumOfRows()
}
// only merge if candidate number is large than MinSegmentToMerge or if target row is large enough
// only merge if candidate number is large than MinSegmentToMerge or if target size is large enough
targetSize := lo.SumBy(bucket, func(s *SegmentInfo) int64 { return s.getSegmentSize() })
if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge.GetAsInt() ||
len(bucket) > 1 && t.isCompactableSegment(targetSize, expectedSize) {
buckets = append(buckets, bucket)
@ -660,24 +661,9 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
remainingSmallSegs = append(remainingSmallSegs, bucket...)
}
}
// Try adding remaining segments to existing plans.
for i := len(remainingSmallSegs) - 1; i >= 0; i-- {
s := remainingSmallSegs[i]
if !isExpandableSmallSegment(s, expectedSize) {
continue
}
// Try squeeze this segment into existing plans. This could cause segment size to exceed maxSize.
for i, b := range buckets {
totalSize := lo.SumBy(b, func(s *SegmentInfo) int64 { return s.getSegmentSize() })
if totalSize+s.getSegmentSize() > int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) {
continue
}
buckets[i] = append(buckets[i], s)
remainingSmallSegs = append(remainingSmallSegs[:i], remainingSmallSegs[i+1:]...)
break
}
}
remainingSmallSegs = t.squeezeSmallSegmentsToBuckets(remainingSmallSegs, buckets, expectedSize)
// If there are still remaining small segments, try adding them to non-planned segments.
for _, npSeg := range nonPlannedSegments {
bucket := []*SegmentInfo{npSeg}
@ -890,3 +876,26 @@ func fetchSegIDs(segBinLogs []*datapb.CompactionSegmentBinlogs) []int64 {
}
return segIDs
}
// buckets will be updated inplace
func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) {
for i := len(small) - 1; i >= 0; i-- {
s := small[i]
if !isExpandableSmallSegment(s, expectedSize) {
continue
}
// Try squeeze this segment into existing plans. This could cause segment size to exceed maxSize.
for bidx, b := range buckets {
totalSize := lo.SumBy(b, func(s *SegmentInfo) int64 { return s.getSegmentSize() })
if totalSize+s.getSegmentSize() > int64(Params.DataCoordCfg.SegmentExpansionRate.GetAsFloat()*float64(expectedSize)) {
continue
}
buckets[bidx] = append(buckets[bidx], s)
small = append(small[:i], small[i+1:]...)
break
}
}
return small
}

View File

@ -19,7 +19,7 @@ package datacoord
import (
"context"
"sort"
"sync/atomic"
satomic "sync/atomic"
"testing"
"time"
@ -27,6 +27,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
@ -35,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/model"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
@ -2074,7 +2077,7 @@ func Test_triggerSingleCompaction(t *testing.T) {
err := got.triggerSingleCompaction(2, 2, 2, "b", false)
assert.NoError(t, err)
}
var i atomic.Value
var i satomic.Value
i.Store(0)
check := func() {
for {
@ -2095,7 +2098,7 @@ func Test_triggerSingleCompaction(t *testing.T) {
err := got.triggerSingleCompaction(3, 3, 3, "c", true)
assert.NoError(t, err)
}
var j atomic.Value
var j satomic.Value
j.Store(0)
go func() {
timeoutCtx, cancelFunc := context.WithTimeout(context.Background(), time.Second)
@ -2600,6 +2603,27 @@ func (s *CompactionTriggerSuite) TestIsChannelCheckpointHealthy() {
})
}
func (s *CompactionTriggerSuite) TestSqueezeSmallSegments() {
expectedSize := int64(70000)
smallsegments := []*SegmentInfo{
{SegmentInfo: &datapb.SegmentInfo{ID: 3}, size: *atomic.NewInt64(69999)},
{SegmentInfo: &datapb.SegmentInfo{ID: 1}, size: *atomic.NewInt64(100)},
}
largeSegment := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 2}, size: *atomic.NewInt64(expectedSize)}
buckets := [][]*SegmentInfo{{largeSegment}}
s.Require().Equal(1, len(buckets))
s.Require().Equal(1, len(buckets[0]))
remaining := s.tr.squeezeSmallSegmentsToBuckets(smallsegments, buckets, expectedSize)
s.Equal(1, len(remaining))
s.EqualValues(3, remaining[0].ID)
s.Equal(1, len(buckets))
s.Equal(2, len(buckets[0]))
log.Info("buckets", zap.Any("buckets", buckets))
}
func TestCompactionTriggerSuite(t *testing.T) {
suite.Run(t, new(CompactionTriggerSuite))
}