milvus/internal/datacoord/compaction_policy.go

114 lines
3.4 KiB
Go

package datacoord
import (
"sort"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
type singleCompactionPolicy interface {
// shouldSingleCompaction generates a compaction plan for single comapction, return nil if no plan can be generated.
generatePlan(segment *SegmentInfo, timetravel *timetravel) *datapb.CompactionPlan
}
type mergeCompactionPolicy interface {
// shouldMergeCompaction generates a compaction plan for merge compaction, return nil if no plan can be generated.
generatePlan(segments []*SegmentInfo, timetravel *timetravel) []*datapb.CompactionPlan
}
type singleCompactionFunc func(segment *SegmentInfo, timetravel *timetravel) *datapb.CompactionPlan
func (f singleCompactionFunc) generatePlan(segment *SegmentInfo, timetravel *timetravel) *datapb.CompactionPlan {
return f(segment, timetravel)
}
func chooseAllBinlogs(segment *SegmentInfo, timetravel *timetravel) *datapb.CompactionPlan {
var deltaLogs []*datapb.DeltaLogInfo
for _, l := range segment.GetDeltalogs() {
if l.TimestampTo < timetravel.time {
deltaLogs = append(deltaLogs, l)
}
}
if len(deltaLogs) == 0 {
return nil
}
return &datapb.CompactionPlan{
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: segment.GetID(),
FieldBinlogs: segment.GetBinlogs(),
Field2StatslogPaths: segment.GetStatslogs(),
Deltalogs: deltaLogs,
},
},
Type: datapb.CompactionType_InnerCompaction,
Timetravel: timetravel.time,
Channel: segment.GetInsertChannel(),
}
}
type mergeCompactionFunc func(segments []*SegmentInfo, timetravel *timetravel) []*datapb.CompactionPlan
func (f mergeCompactionFunc) generatePlan(segments []*SegmentInfo, timetravel *timetravel) []*datapb.CompactionPlan {
return f(segments, timetravel)
}
func greedyMergeCompaction(segments []*SegmentInfo, timetravel *timetravel) []*datapb.CompactionPlan {
if len(segments) == 0 {
return nil
}
sort.Slice(segments, func(i, j int) bool {
return segments[i].NumOfRows < segments[j].NumOfRows
})
return greedyGeneratePlans(segments, timetravel)
}
func greedyGeneratePlans(sortedSegments []*SegmentInfo, timetravel *timetravel) []*datapb.CompactionPlan {
maxRowNumPerSegment := sortedSegments[0].MaxRowNum
plans := make([]*datapb.CompactionPlan, 0)
free := maxRowNumPerSegment
plan := &datapb.CompactionPlan{
Timetravel: timetravel.time,
Type: datapb.CompactionType_MergeCompaction,
Channel: sortedSegments[0].GetInsertChannel(),
}
for _, s := range sortedSegments {
segmentBinlogs := &datapb.CompactionSegmentBinlogs{
SegmentID: s.GetID(),
FieldBinlogs: s.GetBinlogs(),
Field2StatslogPaths: s.GetStatslogs(),
Deltalogs: s.GetDeltalogs(),
}
if s.NumOfRows > free {
// if the plan size is less than or equal to 1, it means that every unchecked segment is larger than half of max segment size
// so there's no need to merge them
if len(plan.SegmentBinlogs) <= 1 {
break
}
plans = append(plans, plan)
plan = &datapb.CompactionPlan{
Timetravel: timetravel.time,
Type: datapb.CompactionType_MergeCompaction,
Channel: sortedSegments[0].GetInsertChannel(),
}
free = maxRowNumPerSegment
}
plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinlogs)
free -= s.GetNumOfRows()
}
// if plan contains zero or one segment, dont need to merge
if len(plan.SegmentBinlogs) > 1 {
plans = append(plans, plan)
}
return plans
}