mirror of https://github.com/milvus-io/milvus.git
139 lines
4.5 KiB
Go
139 lines
4.5 KiB
Go
// Licensed to the LF AI & Data foundation under one
|
|
// or more contributor license agreements. See the NOTICE file
|
|
// distributed with this work for additional information
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
// to you under the Apache License, Version 2.0 (the
|
|
// "License"); you may not use this file except in compliance
|
|
// with the License. You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datacoord
|
|
|
|
import (
|
|
"sort"
|
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
|
)
|
|
|
|
type singleCompactionPolicy interface {
|
|
// generatePlan generates a compaction plan for single compaction, return nil if no plan can be generated.
|
|
generatePlan(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan
|
|
}
|
|
|
|
type mergeCompactionPolicy interface {
|
|
// generatePlan generates a compaction plan for merge compaction, return nil if no plan can be generated.
|
|
generatePlan(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan
|
|
}
|
|
|
|
type singleCompactionFunc func(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan
|
|
|
|
func (f singleCompactionFunc) generatePlan(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan {
|
|
return f(segment, timeTravel)
|
|
}
|
|
|
|
func chooseAllBinlogs(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan {
|
|
var deltaLogs []*datapb.FieldBinlog
|
|
for _, fieldBinlog := range segment.GetDeltalogs() {
|
|
fbl := &datapb.FieldBinlog{
|
|
FieldID: fieldBinlog.GetFieldID(),
|
|
Binlogs: make([]*datapb.Binlog, 0, len(fieldBinlog.GetBinlogs())),
|
|
}
|
|
for _, binlog := range fieldBinlog.GetBinlogs() {
|
|
if binlog.TimestampTo < timeTravel.time {
|
|
fbl.Binlogs = append(fbl.Binlogs, binlog)
|
|
}
|
|
}
|
|
if len(fbl.Binlogs) > 0 {
|
|
deltaLogs = append(deltaLogs, fbl)
|
|
}
|
|
}
|
|
|
|
if len(deltaLogs) == 0 {
|
|
return nil
|
|
}
|
|
|
|
return &datapb.CompactionPlan{
|
|
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
|
{
|
|
SegmentID: segment.GetID(),
|
|
FieldBinlogs: segment.GetBinlogs(),
|
|
Field2StatslogPaths: segment.GetStatslogs(),
|
|
Deltalogs: deltaLogs,
|
|
},
|
|
},
|
|
Type: datapb.CompactionType_InnerCompaction,
|
|
Timetravel: timeTravel.time,
|
|
Channel: segment.GetInsertChannel(),
|
|
}
|
|
}
|
|
|
|
type mergeCompactionFunc func(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan
|
|
|
|
func (f mergeCompactionFunc) generatePlan(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan {
|
|
return f(segments, timeTravel)
|
|
}
|
|
|
|
func greedyMergeCompaction(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan {
|
|
if len(segments) == 0 {
|
|
return nil
|
|
}
|
|
|
|
sort.Slice(segments, func(i, j int) bool {
|
|
return segments[i].NumOfRows < segments[j].NumOfRows
|
|
})
|
|
|
|
return greedyGeneratePlans(segments, timeTravel)
|
|
}
|
|
|
|
func greedyGeneratePlans(sortedSegments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan {
|
|
maxRowNumPerSegment := sortedSegments[0].MaxRowNum
|
|
|
|
plans := make([]*datapb.CompactionPlan, 0)
|
|
free := maxRowNumPerSegment
|
|
plan := &datapb.CompactionPlan{
|
|
Timetravel: timeTravel.time,
|
|
Type: datapb.CompactionType_MergeCompaction,
|
|
Channel: sortedSegments[0].GetInsertChannel(),
|
|
}
|
|
|
|
for _, s := range sortedSegments {
|
|
segmentBinlogs := &datapb.CompactionSegmentBinlogs{
|
|
SegmentID: s.GetID(),
|
|
FieldBinlogs: s.GetBinlogs(),
|
|
Field2StatslogPaths: s.GetStatslogs(),
|
|
Deltalogs: s.GetDeltalogs(),
|
|
}
|
|
|
|
if s.NumOfRows > free {
|
|
// if the plan size is less than or equal to 1, it means that every unchecked segment is larger than half of max segment size
|
|
// so there's no need to merge them
|
|
if len(plan.SegmentBinlogs) <= 1 {
|
|
break
|
|
}
|
|
plans = append(plans, plan)
|
|
plan = &datapb.CompactionPlan{
|
|
Timetravel: timeTravel.time,
|
|
Type: datapb.CompactionType_MergeCompaction,
|
|
Channel: sortedSegments[0].GetInsertChannel(),
|
|
}
|
|
free = maxRowNumPerSegment
|
|
}
|
|
plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinlogs)
|
|
free -= s.GetNumOfRows()
|
|
}
|
|
|
|
// if plan contains zero or one segment, dont need to merge
|
|
if len(plan.SegmentBinlogs) > 1 {
|
|
plans = append(plans, plan)
|
|
}
|
|
|
|
return plans
|
|
}
|