mirror of https://github.com/milvus-io/milvus.git
Refine compaction selection poliy (#17486)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>pull/17562/head
parent
ea5041aec2
commit
54aeb077e9
|
@ -30,6 +30,7 @@ import (
|
|||
)
|
||||
|
||||
// TODO this num should be determined by resources of datanode, for now, we set to a fixed value for simple
|
||||
// TODO we should split compaction into different priorities, small compaction helps to merge segment, large compaction helps to handle delta and expiration of large segments
|
||||
const (
|
||||
maxParallelCompactionTaskNum = 100
|
||||
compactionTimeout = 10 * time.Second
|
||||
|
|
|
@ -1,138 +0,0 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"sort"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
)
|
||||
|
||||
type singleCompactionPolicy interface {
|
||||
// generatePlan generates a compaction plan for single compaction, return nil if no plan can be generated.
|
||||
generatePlan(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan
|
||||
}
|
||||
|
||||
type mergeCompactionPolicy interface {
|
||||
// generatePlan generates a compaction plan for merge compaction, return nil if no plan can be generated.
|
||||
generatePlan(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan
|
||||
}
|
||||
|
||||
type singleCompactionFunc func(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan
|
||||
|
||||
func (f singleCompactionFunc) generatePlan(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan {
|
||||
return f(segment, timeTravel)
|
||||
}
|
||||
|
||||
func chooseAllBinlogs(segment *SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan {
|
||||
var deltaLogs []*datapb.FieldBinlog
|
||||
for _, fieldBinlog := range segment.GetDeltalogs() {
|
||||
fbl := &datapb.FieldBinlog{
|
||||
FieldID: fieldBinlog.GetFieldID(),
|
||||
Binlogs: make([]*datapb.Binlog, 0, len(fieldBinlog.GetBinlogs())),
|
||||
}
|
||||
for _, binlog := range fieldBinlog.GetBinlogs() {
|
||||
if binlog.TimestampTo < timeTravel.time {
|
||||
fbl.Binlogs = append(fbl.Binlogs, binlog)
|
||||
}
|
||||
}
|
||||
if len(fbl.Binlogs) > 0 {
|
||||
deltaLogs = append(deltaLogs, fbl)
|
||||
}
|
||||
}
|
||||
|
||||
if len(deltaLogs) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &datapb.CompactionPlan{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{
|
||||
SegmentID: segment.GetID(),
|
||||
FieldBinlogs: segment.GetBinlogs(),
|
||||
Field2StatslogPaths: segment.GetStatslogs(),
|
||||
Deltalogs: deltaLogs,
|
||||
},
|
||||
},
|
||||
Type: datapb.CompactionType_InnerCompaction,
|
||||
Timetravel: timeTravel.time,
|
||||
Channel: segment.GetInsertChannel(),
|
||||
}
|
||||
}
|
||||
|
||||
type mergeCompactionFunc func(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan
|
||||
|
||||
func (f mergeCompactionFunc) generatePlan(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan {
|
||||
return f(segments, timeTravel)
|
||||
}
|
||||
|
||||
func greedyMergeCompaction(segments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan {
|
||||
if len(segments) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
sort.Slice(segments, func(i, j int) bool {
|
||||
return segments[i].NumOfRows < segments[j].NumOfRows
|
||||
})
|
||||
|
||||
return greedyGeneratePlans(segments, timeTravel)
|
||||
}
|
||||
|
||||
func greedyGeneratePlans(sortedSegments []*SegmentInfo, timeTravel *timetravel) []*datapb.CompactionPlan {
|
||||
maxRowNumPerSegment := sortedSegments[0].MaxRowNum
|
||||
|
||||
plans := make([]*datapb.CompactionPlan, 0)
|
||||
free := maxRowNumPerSegment
|
||||
plan := &datapb.CompactionPlan{
|
||||
Timetravel: timeTravel.time,
|
||||
Type: datapb.CompactionType_MergeCompaction,
|
||||
Channel: sortedSegments[0].GetInsertChannel(),
|
||||
}
|
||||
|
||||
for _, s := range sortedSegments {
|
||||
segmentBinlogs := &datapb.CompactionSegmentBinlogs{
|
||||
SegmentID: s.GetID(),
|
||||
FieldBinlogs: s.GetBinlogs(),
|
||||
Field2StatslogPaths: s.GetStatslogs(),
|
||||
Deltalogs: s.GetDeltalogs(),
|
||||
}
|
||||
|
||||
if s.NumOfRows > free {
|
||||
// if the plan size is less than or equal to 1, it means that every unchecked segment is larger than half of max segment size
|
||||
// so there's no need to merge them
|
||||
if len(plan.SegmentBinlogs) <= 1 {
|
||||
break
|
||||
}
|
||||
plans = append(plans, plan)
|
||||
plan = &datapb.CompactionPlan{
|
||||
Timetravel: timeTravel.time,
|
||||
Type: datapb.CompactionType_MergeCompaction,
|
||||
Channel: sortedSegments[0].GetInsertChannel(),
|
||||
}
|
||||
free = maxRowNumPerSegment
|
||||
}
|
||||
plan.SegmentBinlogs = append(plan.SegmentBinlogs, segmentBinlogs)
|
||||
free -= s.GetNumOfRows()
|
||||
}
|
||||
|
||||
// if plan contains zero or one segment, dont need to merge
|
||||
if len(plan.SegmentBinlogs) > 1 {
|
||||
plans = append(plans, plan)
|
||||
}
|
||||
|
||||
return plans
|
||||
}
|
|
@ -1,168 +0,0 @@
|
|||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package datacoord
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog {
|
||||
l := &datapb.FieldBinlog{
|
||||
FieldID: id,
|
||||
Binlogs: make([]*datapb.Binlog, 0, len(paths)),
|
||||
}
|
||||
for _, path := range paths {
|
||||
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path})
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
||||
func Test_greedyMergeCompaction(t *testing.T) {
|
||||
type args struct {
|
||||
segments []*SegmentInfo
|
||||
timetravel *timetravel
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want []*datapb.CompactionPlan
|
||||
}{
|
||||
{
|
||||
"test normal merge",
|
||||
args{
|
||||
[]*SegmentInfo{
|
||||
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 1, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}}},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 1, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")}}},
|
||||
},
|
||||
&timetravel{1000},
|
||||
},
|
||||
[]*datapb.CompactionPlan{
|
||||
{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}},
|
||||
{SegmentID: 2, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")}},
|
||||
},
|
||||
Type: datapb.CompactionType_MergeCompaction,
|
||||
Timetravel: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"test unmergable segments",
|
||||
args{
|
||||
[]*SegmentInfo{
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 1, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []*datapb.Binlog{{LogPath: "log1"}}}}}},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 99, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []*datapb.Binlog{{LogPath: "log2"}}}}}},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 99, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{{FieldID: 1, Binlogs: []*datapb.Binlog{{LogPath: "log3"}}}}}},
|
||||
},
|
||||
&timetravel{1000},
|
||||
},
|
||||
[]*datapb.CompactionPlan{
|
||||
{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}},
|
||||
{SegmentID: 2, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log2")}},
|
||||
},
|
||||
Type: datapb.CompactionType_MergeCompaction,
|
||||
Timetravel: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"test multi plans",
|
||||
args{
|
||||
[]*SegmentInfo{
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 50, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}}},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 50, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log2")}}},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 50, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")}}},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 50, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log4")}}},
|
||||
},
|
||||
&timetravel{1000},
|
||||
},
|
||||
[]*datapb.CompactionPlan{
|
||||
{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{SegmentID: 1, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}},
|
||||
{SegmentID: 2, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log2")}},
|
||||
},
|
||||
Type: datapb.CompactionType_MergeCompaction,
|
||||
Timetravel: 1000,
|
||||
},
|
||||
{
|
||||
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
||||
{SegmentID: 3, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")}},
|
||||
{SegmentID: 4, FieldBinlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log4")}},
|
||||
},
|
||||
Type: datapb.CompactionType_MergeCompaction,
|
||||
Timetravel: 1000,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"test empty plan",
|
||||
args{
|
||||
[]*SegmentInfo{
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 50, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log1")}}},
|
||||
{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 51, MaxRowNum: 100, Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "log3")}}},
|
||||
},
|
||||
&timetravel{1000},
|
||||
},
|
||||
[]*datapb.CompactionPlan{},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := greedyMergeCompaction(tt.args.segments, tt.args.timetravel)
|
||||
assert.EqualValues(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_chooseAllBinlogs(t *testing.T) {
|
||||
type args struct {
|
||||
segment *SegmentInfo
|
||||
timetravel *timetravel
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want *datapb.CompactionPlan
|
||||
}{
|
||||
{
|
||||
"test no delta logs",
|
||||
args{
|
||||
segment: &SegmentInfo{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
Deltalogs: nil,
|
||||
},
|
||||
},
|
||||
},
|
||||
nil,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := chooseAllBinlogs(tt.args.segment, tt.args.timetravel)
|
||||
assert.EqualValues(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -474,3 +474,14 @@ func Test_getCompactionTasksBySignalID(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog {
|
||||
l := &datapb.FieldBinlog{
|
||||
FieldID: id,
|
||||
Binlogs: make([]*datapb.Binlog, 0, len(paths)),
|
||||
}
|
||||
for _, path := range paths {
|
||||
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path})
|
||||
}
|
||||
return l
|
||||
}
|
||||
|
|
|
@ -17,8 +17,8 @@
|
|||
package datacoord
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -29,28 +29,20 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
signalBufferSize = 100
|
||||
maxLittleSegmentNum = 10
|
||||
maxCompactionTimeoutInSeconds = 60
|
||||
singleCompactionRatioThreshold = 0.2
|
||||
singleCompactionDeltaLogMaxSize = 10 * 1024 * 1024 //10MiB
|
||||
globalCompactionInterval = 60 * time.Second
|
||||
)
|
||||
|
||||
type timetravel struct {
|
||||
time Timestamp
|
||||
type compactTime struct {
|
||||
travelTime Timestamp
|
||||
expireTime Timestamp
|
||||
}
|
||||
|
||||
type trigger interface {
|
||||
start()
|
||||
stop()
|
||||
// triggerCompaction triggers a compaction if any compaction condition satisfy.
|
||||
triggerCompaction(timetravel *timetravel) error
|
||||
triggerCompaction(compactTime *compactTime) error
|
||||
// triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment
|
||||
triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, timetravel *timetravel) error
|
||||
triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, compactTime *compactTime) error
|
||||
// forceTriggerCompaction force to start a compaction
|
||||
forceTriggerCompaction(collectionID int64, timetravel *timetravel) (UniqueID, error)
|
||||
forceTriggerCompaction(collectionID int64, compactTime *compactTime) (UniqueID, error)
|
||||
}
|
||||
|
||||
type compactionSignal struct {
|
||||
|
@ -61,40 +53,34 @@ type compactionSignal struct {
|
|||
partitionID UniqueID
|
||||
segmentID UniqueID
|
||||
channel string
|
||||
timetravel *timetravel
|
||||
compactTime *compactTime
|
||||
}
|
||||
|
||||
var _ trigger = (*compactionTrigger)(nil)
|
||||
|
||||
type compactionTrigger struct {
|
||||
meta *meta
|
||||
allocator allocator
|
||||
signals chan *compactionSignal
|
||||
singleCompactionPolicy singleCompactionPolicy
|
||||
mergeCompactionPolicy mergeCompactionPolicy
|
||||
compactionHandler compactionPlanContext
|
||||
globalTrigger *time.Ticker
|
||||
forceMu sync.Mutex
|
||||
mergeCompactionSegmentThreshold int
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
meta *meta
|
||||
allocator allocator
|
||||
signals chan *compactionSignal
|
||||
compactionHandler compactionPlanContext
|
||||
globalTrigger *time.Ticker
|
||||
forceMu sync.Mutex
|
||||
quit chan struct{}
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func newCompactionTrigger(meta *meta, compactionHandler compactionPlanContext, allocator allocator) *compactionTrigger {
|
||||
return &compactionTrigger{
|
||||
meta: meta,
|
||||
allocator: allocator,
|
||||
signals: make(chan *compactionSignal, signalBufferSize),
|
||||
singleCompactionPolicy: (singleCompactionFunc)(chooseAllBinlogs),
|
||||
mergeCompactionPolicy: (mergeCompactionFunc)(greedyMergeCompaction),
|
||||
compactionHandler: compactionHandler,
|
||||
mergeCompactionSegmentThreshold: maxLittleSegmentNum,
|
||||
meta: meta,
|
||||
allocator: allocator,
|
||||
signals: make(chan *compactionSignal, 100),
|
||||
compactionHandler: compactionHandler,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) start() {
|
||||
t.quit = make(chan struct{})
|
||||
t.globalTrigger = time.NewTicker(globalCompactionInterval)
|
||||
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval)
|
||||
t.wg.Add(2)
|
||||
go func() {
|
||||
defer logutil.LogPanic()
|
||||
|
@ -111,7 +97,8 @@ func (t *compactionTrigger) start() {
|
|||
t.handleGlobalSignal(signal)
|
||||
default:
|
||||
t.handleSignal(signal)
|
||||
t.globalTrigger.Reset(globalCompactionInterval)
|
||||
// shouldn't reset, otherwise a frequent flushed collection will affect other collections
|
||||
// t.globalTrigger.Reset(Params.DataCoordCfg.GlobalCompactionInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -137,14 +124,14 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
|
|||
return
|
||||
case <-t.globalTrigger.C:
|
||||
cctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
tt, err := getTimetravelReverseTime(cctx, t.allocator)
|
||||
ct, err := getCompactTime(cctx, t.allocator)
|
||||
if err != nil {
|
||||
log.Warn("unbale to get compaction timetravel", zap.Error(err))
|
||||
log.Warn("unbale to get compaction time", zap.Error(err))
|
||||
cancel()
|
||||
continue
|
||||
}
|
||||
cancel()
|
||||
err = t.triggerCompaction(tt)
|
||||
err = t.triggerCompaction(ct)
|
||||
if err != nil {
|
||||
log.Warn("unable to triggerCompaction", zap.Error(err))
|
||||
}
|
||||
|
@ -158,23 +145,23 @@ func (t *compactionTrigger) stop() {
|
|||
}
|
||||
|
||||
// triggerCompaction trigger a compaction if any compaction condition satisfy.
|
||||
func (t *compactionTrigger) triggerCompaction(timetravel *timetravel) error {
|
||||
func (t *compactionTrigger) triggerCompaction(compactTime *compactTime) error {
|
||||
id, err := t.allocSignalID()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
signal := &compactionSignal{
|
||||
id: id,
|
||||
isForce: false,
|
||||
isGlobal: true,
|
||||
timetravel: timetravel,
|
||||
id: id,
|
||||
isForce: false,
|
||||
isGlobal: true,
|
||||
compactTime: compactTime,
|
||||
}
|
||||
t.signals <- signal
|
||||
return nil
|
||||
}
|
||||
|
||||
// triggerSingleCompaction triger a compaction bundled with collection-partiiton-channel-segment
|
||||
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, timetravel *timetravel) error {
|
||||
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, compactTime *compactTime) error {
|
||||
// If AutoCompaction diabled, flush request will not trigger compaction
|
||||
if !Params.DataCoordCfg.GetEnableAutoCompaction() {
|
||||
return nil
|
||||
|
@ -192,7 +179,7 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s
|
|||
partitionID: partitionID,
|
||||
segmentID: segmentID,
|
||||
channel: channel,
|
||||
timetravel: timetravel,
|
||||
compactTime: compactTime,
|
||||
}
|
||||
t.signals <- signal
|
||||
return nil
|
||||
|
@ -200,7 +187,7 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s
|
|||
|
||||
// forceTriggerCompaction force to start a compaction
|
||||
// invoked by user `ManualCompaction` operation
|
||||
func (t *compactionTrigger) forceTriggerCompaction(collectionID int64, timetravel *timetravel) (UniqueID, error) {
|
||||
func (t *compactionTrigger) forceTriggerCompaction(collectionID int64, compactTime *compactTime) (UniqueID, error) {
|
||||
id, err := t.allocSignalID()
|
||||
if err != nil {
|
||||
return -1, err
|
||||
|
@ -210,7 +197,7 @@ func (t *compactionTrigger) forceTriggerCompaction(collectionID int64, timetrave
|
|||
isForce: true,
|
||||
isGlobal: true,
|
||||
collectionID: collectionID,
|
||||
timetravel: timetravel,
|
||||
compactTime: compactTime,
|
||||
}
|
||||
t.handleGlobalSignal(signal)
|
||||
return id, nil
|
||||
|
@ -245,8 +232,10 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) {
|
|||
break
|
||||
}
|
||||
|
||||
plans := t.generatePlans(group.segments, signal.isForce, signal.timetravel)
|
||||
log.Info("global generated plans", zap.Int64("collection", signal.collectionID), zap.Int("plan count", len(plans)))
|
||||
plans := t.generatePlans(group.segments, signal.isForce, signal.compactTime)
|
||||
if len(plans) != 0 {
|
||||
log.Info("global generated plans", zap.Int64("collection", signal.collectionID), zap.Int("plan count", len(plans)))
|
||||
}
|
||||
for _, plan := range plans {
|
||||
if !signal.isForce && t.compactionHandler.isFull() {
|
||||
log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID))
|
||||
|
@ -284,7 +273,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
|
|||
channel := segment.GetInsertChannel()
|
||||
partitionID := segment.GetPartitionID()
|
||||
segments := t.getCandidateSegments(channel, partitionID)
|
||||
plans := t.generatePlans(segments, signal.isForce, signal.timetravel)
|
||||
plans := t.generatePlans(segments, signal.isForce, signal.compactTime)
|
||||
log.Info("single generated plans", zap.Int64("collection", signal.collectionID), zap.Int("plan count", len(plans)))
|
||||
for _, plan := range plans {
|
||||
if t.compactionHandler.isFull() {
|
||||
|
@ -303,136 +292,95 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isForce bool, collections ...UniqueID) []*datapb.CompactionPlan {
|
||||
colls := make(map[int64]struct{})
|
||||
for _, collID := range collections {
|
||||
colls[collID] = struct{}{}
|
||||
}
|
||||
m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool {
|
||||
_, has := colls[segment.GetCollectionID()]
|
||||
return (has || len(collections) == 0) && // if filters collection
|
||||
isSegmentHealthy(segment) &&
|
||||
isFlush(segment) &&
|
||||
!segment.isCompacting // not compacting now
|
||||
}) // m is list of chanPartSegments, which is channel-partition organized segments
|
||||
plans := make([]*datapb.CompactionPlan, 0)
|
||||
for _, segments := range m {
|
||||
if !isForce && t.compactionHandler.isFull() {
|
||||
return plans
|
||||
}
|
||||
mplans := t.mergeCompaction(segments.segments, signal, isForce)
|
||||
plans = append(plans, mplans...)
|
||||
}
|
||||
|
||||
return plans
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) mergeCompaction(segments []*SegmentInfo, signal *compactionSignal, isForce bool) []*datapb.CompactionPlan {
|
||||
if !isForce && !t.shouldDoMergeCompaction(segments) {
|
||||
return nil
|
||||
}
|
||||
|
||||
plans := t.mergeCompactionPolicy.generatePlan(segments, signal.timetravel)
|
||||
if len(plans) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
res := make([]*datapb.CompactionPlan, 0, len(plans))
|
||||
for _, plan := range plans {
|
||||
if !isForce && t.compactionHandler.isFull() {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := t.fillOriginPlan(plan); err != nil {
|
||||
log.Warn("failed to fill plan", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debug("exec merge compaction plan", zap.Any("plan", plan))
|
||||
if err := t.compactionHandler.execCompactionPlan(signal, plan); err != nil {
|
||||
log.Warn("failed to execute compaction plan", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
res = append(res, plan)
|
||||
}
|
||||
return res
|
||||
}*/
|
||||
|
||||
type SegmentHeap []*SegmentInfo
|
||||
|
||||
func (h *SegmentHeap) Len() int {
|
||||
return len(*h)
|
||||
}
|
||||
|
||||
func (h *SegmentHeap) Less(i, j int) bool {
|
||||
return (*h)[i].GetNumOfRows() < (*h)[j].GetNumOfRows()
|
||||
}
|
||||
|
||||
func (h *SegmentHeap) Swap(i, j int) {
|
||||
(*h)[i], (*h)[j] = (*h)[j], (*h)[i]
|
||||
}
|
||||
|
||||
func (h *SegmentHeap) Push(x interface{}) {
|
||||
*h = append(*h, x.(*SegmentInfo))
|
||||
}
|
||||
|
||||
func (h *SegmentHeap) Pop() interface{} {
|
||||
old := *h
|
||||
n := len(old)
|
||||
x := old[n-1]
|
||||
*h = old[:n-1]
|
||||
return x
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, timetravel *timetravel) []*datapb.CompactionPlan {
|
||||
func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, compactTime *compactTime) []*datapb.CompactionPlan {
|
||||
// find segments need internal compaction
|
||||
internalCandidates := &SegmentHeap{}
|
||||
mergeCandidates := &SegmentHeap{}
|
||||
// TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively
|
||||
var prioritizedCandidates []*SegmentInfo
|
||||
var smallCandidates []*SegmentInfo
|
||||
|
||||
// TODO, currently we lack of the measurement of data distribution, there should be another compaction help on redistributing segment based on scalar/vector field distribution
|
||||
for _, segment := range segments {
|
||||
segment := segment.ShadowClone()
|
||||
if (force && t.hasValidDeltaLogs(segment, timetravel)) || t.shouldDoSingleCompaction(segment, timetravel) {
|
||||
heap.Push(internalCandidates, segment)
|
||||
continue
|
||||
}
|
||||
if t.isSmallSegment(segment) {
|
||||
heap.Push(mergeCandidates, segment)
|
||||
// TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
|
||||
if force || t.ShouldDoSingleCompaction(segment, compactTime) {
|
||||
prioritizedCandidates = append(prioritizedCandidates, segment)
|
||||
} else if t.isSmallSegment(segment) {
|
||||
smallCandidates = append(smallCandidates, segment)
|
||||
}
|
||||
}
|
||||
|
||||
var plans []*datapb.CompactionPlan
|
||||
// sort segment from large to small
|
||||
sort.Slice(prioritizedCandidates, func(i, j int) bool {
|
||||
if prioritizedCandidates[i].getSegmentSize() != prioritizedCandidates[i].getSegmentSize() {
|
||||
return prioritizedCandidates[i].getSegmentSize() > prioritizedCandidates[i].getSegmentSize()
|
||||
}
|
||||
return prioritizedCandidates[i].GetID() < prioritizedCandidates[j].GetID()
|
||||
})
|
||||
|
||||
generatePlan := func(segment *SegmentInfo) {
|
||||
sort.Slice(smallCandidates, func(i, j int) bool {
|
||||
if smallCandidates[i].getSegmentSize() != smallCandidates[i].getSegmentSize() {
|
||||
return smallCandidates[i].getSegmentSize() > smallCandidates[i].getSegmentSize()
|
||||
}
|
||||
return smallCandidates[i].GetID() < smallCandidates[j].GetID()
|
||||
})
|
||||
|
||||
// greedy pick from large segment to small, the goal is to fill each segment to reach 512M
|
||||
// we must ensure all prioritized candidates is in a plan
|
||||
//TODO the compaction policy should consider segment with similar timestamp together so timetravel and data expiration could work better.
|
||||
//TODO the compaction selection policy should consider if compaction workload is high
|
||||
for len(prioritizedCandidates) > 0 {
|
||||
var bucket []*SegmentInfo
|
||||
// pop out the first element
|
||||
segment := prioritizedCandidates[0]
|
||||
bucket = append(bucket, segment)
|
||||
free := segment.GetMaxRowNum()
|
||||
result, free := greedySelect(internalCandidates, free)
|
||||
bucket = append(bucket, result...)
|
||||
result, _ = greedySelect(mergeCandidates, free)
|
||||
bucket = append(bucket, result...)
|
||||
prioritizedCandidates = prioritizedCandidates[1:]
|
||||
|
||||
plans = append(plans, segmentsToPlan(bucket, timetravel))
|
||||
// only do single file compaction if segment is already large enough
|
||||
if segment.getSegmentSize() < int64(Params.DataCoordCfg.SegmentMaxSize)*1024*1024 {
|
||||
var result []*SegmentInfo
|
||||
free := int64(Params.DataCoordCfg.SegmentMaxSize)*1024*1024 - segment.getSegmentSize()
|
||||
maxNum := Params.DataCoordCfg.MaxSegmentToMerge - 1
|
||||
prioritizedCandidates, result, free = greedySelect(prioritizedCandidates, free, maxNum)
|
||||
bucket = append(bucket, result...)
|
||||
maxNum -= len(result)
|
||||
if maxNum > 0 {
|
||||
smallCandidates, result, _ = greedySelect(smallCandidates, free, maxNum)
|
||||
bucket = append(bucket, result...)
|
||||
}
|
||||
}
|
||||
// since this is priority compaction, we will execute even if there is only segment
|
||||
plans = append(plans, segmentsToPlan(bucket, compactTime))
|
||||
}
|
||||
|
||||
var segment *SegmentInfo
|
||||
for internalCandidates.Len() > 0 {
|
||||
segment = heap.Pop(internalCandidates).(*SegmentInfo)
|
||||
generatePlan(segment)
|
||||
}
|
||||
// check if there are small candidates left can be merged into large segments
|
||||
for len(smallCandidates) > 0 {
|
||||
var bucket []*SegmentInfo
|
||||
// pop out the first element
|
||||
segment := smallCandidates[0]
|
||||
bucket = append(bucket, segment)
|
||||
smallCandidates = smallCandidates[1:]
|
||||
|
||||
// merge compaction need 2 or more segment candidates
|
||||
for mergeCandidates.Len() > 1 &&
|
||||
(mergeCandidates.Len() >= t.mergeCompactionSegmentThreshold || force) {
|
||||
segment = heap.Pop(mergeCandidates).(*SegmentInfo)
|
||||
generatePlan(segment)
|
||||
var result []*SegmentInfo
|
||||
free := int64(Params.DataCoordCfg.SegmentMaxSize*1024*1024) - segment.getSegmentSize()
|
||||
// for small segment merge, we pick one largest segment and merge as much as small segment together with it
|
||||
// Why reverse? try to merge as many segments as expected.
|
||||
// for instance, if a 255M and 255M is the largest small candidates, they will never be merged because of the MinSegmentToMerge limit.
|
||||
smallCandidates, result, _ = reverseGreedySelect(smallCandidates, free, Params.DataCoordCfg.MaxSegmentToMerge-1)
|
||||
bucket = append(bucket, result...)
|
||||
|
||||
// only merge if candidate number is large than MinSegmentToMerge
|
||||
if len(bucket) >= Params.DataCoordCfg.MinSegmentToMerge {
|
||||
plans = append(plans, segmentsToPlan(bucket, compactTime))
|
||||
}
|
||||
}
|
||||
|
||||
return plans
|
||||
}
|
||||
|
||||
func segmentsToPlan(segments []*SegmentInfo, timeTravel *timetravel) *datapb.CompactionPlan {
|
||||
func segmentsToPlan(segments []*SegmentInfo, compactTime *compactTime) *datapb.CompactionPlan {
|
||||
plan := &datapb.CompactionPlan{
|
||||
Timetravel: timeTravel.time,
|
||||
Timetravel: compactTime.travelTime,
|
||||
Type: datapb.CompactionType_MixCompaction,
|
||||
Channel: segments[0].GetInsertChannel(),
|
||||
}
|
||||
|
@ -450,14 +398,35 @@ func segmentsToPlan(segments []*SegmentInfo, timeTravel *timetravel) *datapb.Com
|
|||
return plan
|
||||
}
|
||||
|
||||
func greedySelect(candidates *SegmentHeap, free int64) ([]*SegmentInfo, int64) {
|
||||
func greedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) {
|
||||
var result []*SegmentInfo
|
||||
for candidates.Len() > 0 && (*candidates)[0].GetNumOfRows() < free {
|
||||
segment := heap.Pop(candidates).(*SegmentInfo)
|
||||
result = append(result, segment)
|
||||
free -= segment.GetNumOfRows()
|
||||
|
||||
for i := 0; i < len(candidates); {
|
||||
candidate := candidates[i]
|
||||
if len(result) < maxSegment && candidate.getSegmentSize() < free {
|
||||
result = append(result, candidate)
|
||||
free -= candidate.getSegmentSize()
|
||||
candidates = append(candidates[:i], candidates[i+1:]...)
|
||||
} else {
|
||||
i++
|
||||
}
|
||||
}
|
||||
return result, free
|
||||
|
||||
return candidates, result, free
|
||||
}
|
||||
|
||||
func reverseGreedySelect(candidates []*SegmentInfo, free int64, maxSegment int) ([]*SegmentInfo, []*SegmentInfo, int64) {
|
||||
var result []*SegmentInfo
|
||||
|
||||
for i := len(candidates) - 1; i >= 0; i-- {
|
||||
candidate := candidates[i]
|
||||
if (len(result) < maxSegment) && (candidate.getSegmentSize() < free) {
|
||||
result = append(result, candidate)
|
||||
free -= candidate.getSegmentSize()
|
||||
candidates = append(candidates[:i], candidates[i+1:]...)
|
||||
}
|
||||
}
|
||||
return candidates, result, free
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) getCandidateSegments(channel string, partitionID UniqueID) []*SegmentInfo {
|
||||
|
@ -474,17 +443,7 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
|
|||
}
|
||||
|
||||
func (t *compactionTrigger) isSmallSegment(segment *SegmentInfo) bool {
|
||||
return segment.GetNumOfRows() < segment.GetMaxRowNum()/2
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) shouldDoMergeCompaction(segments []*SegmentInfo) bool {
|
||||
littleSegmentNum := 0
|
||||
for _, s := range segments {
|
||||
if t.isSmallSegment(s) {
|
||||
littleSegmentNum++
|
||||
}
|
||||
}
|
||||
return littleSegmentNum >= t.mergeCompactionSegmentThreshold
|
||||
return segment.getSegmentSize() < int64(Params.DataCoordCfg.SegmentMaxSize*Params.DataCoordCfg.SegmentSmallProportion*1024*1024)
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error {
|
||||
|
@ -499,24 +458,62 @@ func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error {
|
|||
}
|
||||
plan.PlanID = id
|
||||
plan.StartTime = ts
|
||||
plan.TimeoutInSeconds = maxCompactionTimeoutInSeconds
|
||||
plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) shouldDoSingleCompaction(segment *SegmentInfo, timetravel *timetravel) bool {
|
||||
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool {
|
||||
// count all the binlog file count
|
||||
var totalLogNum int
|
||||
for _, binlogs := range segment.GetBinlogs() {
|
||||
totalLogNum += len(binlogs.GetBinlogs())
|
||||
}
|
||||
|
||||
for _, deltaLogs := range segment.GetDeltalogs() {
|
||||
totalLogNum += len(deltaLogs.GetBinlogs())
|
||||
}
|
||||
|
||||
for _, statsLogs := range segment.GetStatslogs() {
|
||||
totalLogNum += len(statsLogs.GetBinlogs())
|
||||
}
|
||||
// avoid segment has too many bin logs and the etcd meta is too large, force trigger compaction
|
||||
if totalLogNum > int(Params.DataCoordCfg.SingleCompactionBinlogMaxNum) {
|
||||
log.Info("total binlog number is too much, trigger compaction", zap.Int64("segment", segment.ID),
|
||||
zap.Int("Delta logs", len(segment.GetDeltalogs())), zap.Int("Bin Logs", len(segment.GetBinlogs())), zap.Int("Stat logs", len(segment.GetStatslogs())))
|
||||
return true
|
||||
}
|
||||
|
||||
// if expire time is enabled, put segment into compaction candidate
|
||||
totalExpiredSize := int64(0)
|
||||
for _, binlogs := range segment.GetBinlogs() {
|
||||
for _, l := range binlogs.GetBinlogs() {
|
||||
// TODO, we should probably estimate expired log entries by total rows in binlog and the ralationship of timeTo, timeFrom and expire time
|
||||
if l.TimestampTo < compactTime.expireTime {
|
||||
totalExpiredSize += l.GetLogSize()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if totalExpiredSize > Params.DataCoordCfg.SingleCompactionExpiredLogMaxSize {
|
||||
log.Info("total expired entities is too much, trigger compation", zap.Int64("segment", segment.ID), zap.Int64("expired log size", totalExpiredSize))
|
||||
return true
|
||||
}
|
||||
|
||||
// single compaction only merge insert and delta log beyond the timetravel
|
||||
// segment's insert binlogs dont have time range info, so we wait until the segment's last expire time is less than timetravel
|
||||
// to ensure that all insert logs is beyond the timetravel.
|
||||
// TODO: add meta in insert binlog
|
||||
if segment.LastExpireTime >= timetravel.time {
|
||||
if segment.LastExpireTime >= compactTime.travelTime {
|
||||
log.Debug("compaction is not triggered", zap.Int64("segment", segment.ID), zap.Int64("expired log size", totalExpiredSize),
|
||||
zap.Uint64("Expire", segment.LastExpireTime), zap.Uint64("Travel", compactTime.travelTime))
|
||||
return false
|
||||
}
|
||||
|
||||
totalDeletedRows := 0
|
||||
totalDeleteLogSize := int64(0)
|
||||
for _, fbl := range segment.GetDeltalogs() {
|
||||
for _, l := range fbl.GetBinlogs() {
|
||||
if l.TimestampTo < timetravel.time {
|
||||
for _, deltaLogs := range segment.GetDeltalogs() {
|
||||
for _, l := range deltaLogs.GetBinlogs() {
|
||||
if l.TimestampTo < compactTime.travelTime {
|
||||
totalDeletedRows += int(l.GetEntriesNum())
|
||||
totalDeleteLogSize += l.GetLogSize()
|
||||
}
|
||||
|
@ -524,65 +521,17 @@ func (t *compactionTrigger) shouldDoSingleCompaction(segment *SegmentInfo, timet
|
|||
}
|
||||
|
||||
// currently delta log size and delete ratio policy is applied
|
||||
return float32(totalDeletedRows)/float32(segment.NumOfRows) >= singleCompactionRatioThreshold || totalDeleteLogSize > singleCompactionDeltaLogMaxSize
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) hasValidDeltaLogs(segment *SegmentInfo, timetravel *timetravel) bool {
|
||||
if segment.LastExpireTime >= timetravel.time {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, fbl := range segment.GetDeltalogs() {
|
||||
for _, l := range fbl.GetBinlogs() {
|
||||
if l.TimestampTo < timetravel.time {
|
||||
return true
|
||||
}
|
||||
}
|
||||
if float32(totalDeletedRows)/float32(segment.NumOfRows) >= Params.DataCoordCfg.SingleCompactionRatioThreshold || totalDeleteLogSize > Params.DataCoordCfg.SingleCompactionDeltaLogMaxSize {
|
||||
log.Info("total delete entities is too much, trigger compation", zap.Int64("segment", segment.ID),
|
||||
zap.Int("deleted rows", totalDeletedRows), zap.Int64("delete log size", totalDeleteLogSize))
|
||||
return true
|
||||
}
|
||||
|
||||
log.Debug("compaction is not triggered", zap.Int64("segment", segment.ID), zap.Int64("expired log size", totalExpiredSize),
|
||||
zap.Int("deleted rows", totalDeletedRows), zap.Int64("delete log size", totalDeleteLogSize))
|
||||
return false
|
||||
}
|
||||
|
||||
/*
|
||||
func (t *compactionTrigger) globalSingleCompaction(segments []*SegmentInfo, isForce bool, signal *compactionSignal) []*datapb.CompactionPlan {
|
||||
plans := make([]*datapb.CompactionPlan, 0)
|
||||
for _, segment := range segments {
|
||||
if !isForce && t.compactionHandler.isFull() {
|
||||
return plans
|
||||
}
|
||||
plan, err := t.singleCompaction(segment, isForce, signal)
|
||||
if err != nil {
|
||||
log.Warn("failed to exec single compaction", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if plan != nil {
|
||||
plans = append(plans, plan)
|
||||
log.Debug("exec single compaction plan", zap.Any("plan", plan))
|
||||
}
|
||||
}
|
||||
return plans
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool, signal *compactionSignal) (*datapb.CompactionPlan, error) {
|
||||
if segment == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if !isForce && !t.shouldDoSingleCompaction(segment, signal.timetravel) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
plan := t.singleCompactionPolicy.generatePlan(segment, signal.timetravel)
|
||||
if plan == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if err := t.fillOriginPlan(plan); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return plan, t.compactionHandler.execCompactionPlan(signal, plan)
|
||||
}*/
|
||||
|
||||
func isFlush(segment *SegmentInfo) bool {
|
||||
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -586,30 +586,30 @@ type mockCompactionTrigger struct {
|
|||
}
|
||||
|
||||
// triggerCompaction trigger a compaction if any compaction condition satisfy.
|
||||
func (t *mockCompactionTrigger) triggerCompaction(tt *timetravel) error {
|
||||
func (t *mockCompactionTrigger) triggerCompaction(ct *compactTime) error {
|
||||
if f, ok := t.methods["triggerCompaction"]; ok {
|
||||
if ff, ok := f.(func(tt *timetravel) error); ok {
|
||||
return ff(tt)
|
||||
if ff, ok := f.(func(ct *compactTime) error); ok {
|
||||
return ff(ct)
|
||||
}
|
||||
}
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// triggerSingleCompaction trigerr a compaction bundled with collection-partiiton-channel-segment
|
||||
func (t *mockCompactionTrigger) triggerSingleCompaction(collectionID int64, partitionID int64, segmentID int64, channel string, tt *timetravel) error {
|
||||
func (t *mockCompactionTrigger) triggerSingleCompaction(collectionID int64, partitionID int64, segmentID int64, channel string, ct *compactTime) error {
|
||||
if f, ok := t.methods["triggerSingleCompaction"]; ok {
|
||||
if ff, ok := f.(func(collectionID int64, partitionID int64, segmentID int64, channel string, tt *timetravel) error); ok {
|
||||
return ff(collectionID, partitionID, segmentID, channel, tt)
|
||||
if ff, ok := f.(func(collectionID int64, partitionID int64, segmentID int64, channel string, ct *compactTime) error); ok {
|
||||
return ff(collectionID, partitionID, segmentID, channel, ct)
|
||||
}
|
||||
}
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
// forceTriggerCompaction force to start a compaction
|
||||
func (t *mockCompactionTrigger) forceTriggerCompaction(collectionID int64, tt *timetravel) (UniqueID, error) {
|
||||
func (t *mockCompactionTrigger) forceTriggerCompaction(collectionID int64, ct *compactTime) (UniqueID, error) {
|
||||
if f, ok := t.methods["forceTriggerCompaction"]; ok {
|
||||
if ff, ok := f.(func(collectionID int64, tt *timetravel) (UniqueID, error)); ok {
|
||||
return ff(collectionID, tt)
|
||||
if ff, ok := f.(func(collectionID int64, ct *compactTime) (UniqueID, error)); ok {
|
||||
return ff(collectionID, ct)
|
||||
}
|
||||
}
|
||||
panic("not implemented")
|
||||
|
|
|
@ -37,6 +37,8 @@ type SegmentInfo struct {
|
|||
allocations []*Allocation
|
||||
lastFlushTime time.Time
|
||||
isCompacting bool
|
||||
// a cache to avoid calculate twice
|
||||
size int64
|
||||
}
|
||||
|
||||
// NewSegmentInfo create `SegmentInfo` wrapper from `datapb.SegmentInfo`
|
||||
|
@ -315,5 +317,30 @@ func addSegmentBinlogs(field2Binlogs map[UniqueID][]*datapb.Binlog) SegmentInfoO
|
|||
}
|
||||
}
|
||||
|
||||
func (s *SegmentInfo) getSegmentSize() int64 {
|
||||
if s.size <= 0 {
|
||||
var size int64
|
||||
for _, binlogs := range s.GetBinlogs() {
|
||||
for _, l := range binlogs.GetBinlogs() {
|
||||
size += l.GetLogSize()
|
||||
}
|
||||
}
|
||||
|
||||
for _, deltaLogs := range s.GetDeltalogs() {
|
||||
for _, l := range deltaLogs.GetBinlogs() {
|
||||
size += l.GetLogSize()
|
||||
}
|
||||
}
|
||||
|
||||
for _, statsLogs := range s.GetStatslogs() {
|
||||
for _, l := range statsLogs.GetBinlogs() {
|
||||
size += l.GetLogSize()
|
||||
}
|
||||
}
|
||||
s.size = size
|
||||
}
|
||||
return s.size
|
||||
}
|
||||
|
||||
// SegmentInfoSelector is the function type to select SegmentInfo from meta
|
||||
type SegmentInfoSelector func(*SegmentInfo) bool
|
||||
|
|
|
@ -2114,7 +2114,7 @@ func TestManualCompaction(t *testing.T) {
|
|||
svr.isServing = ServerStateHealthy
|
||||
svr.compactionTrigger = &mockCompactionTrigger{
|
||||
methods: map[string]interface{}{
|
||||
"forceTriggerCompaction": func(collectionID int64, tt *timetravel) (UniqueID, error) {
|
||||
"forceTriggerCompaction": func(collectionID int64, ct *compactTime) (UniqueID, error) {
|
||||
return 1, nil
|
||||
},
|
||||
},
|
||||
|
@ -2133,7 +2133,7 @@ func TestManualCompaction(t *testing.T) {
|
|||
svr.isServing = ServerStateHealthy
|
||||
svr.compactionTrigger = &mockCompactionTrigger{
|
||||
methods: map[string]interface{}{
|
||||
"forceTriggerCompaction": func(collectionID int64, tt *timetravel) (UniqueID, error) {
|
||||
"forceTriggerCompaction": func(collectionID int64, ct *compactTime) (UniqueID, error) {
|
||||
return 0, errors.New("mock error")
|
||||
},
|
||||
},
|
||||
|
@ -2152,7 +2152,7 @@ func TestManualCompaction(t *testing.T) {
|
|||
svr.isServing = ServerStateStopped
|
||||
svr.compactionTrigger = &mockCompactionTrigger{
|
||||
methods: map[string]interface{}{
|
||||
"forceTriggerCompaction": func(collectionID int64, tt *timetravel) (UniqueID, error) {
|
||||
"forceTriggerCompaction": func(collectionID int64, ct *compactTime) (UniqueID, error) {
|
||||
return 1, nil
|
||||
},
|
||||
},
|
||||
|
|
|
@ -391,10 +391,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
|
|||
cctx, cancel := context.WithTimeout(s.ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
tt, err := getTimetravelReverseTime(cctx, s.allocator)
|
||||
ct, err := getCompactTime(cctx, s.allocator)
|
||||
if err == nil {
|
||||
err = s.compactionTrigger.triggerSingleCompaction(segment.GetCollectionID(),
|
||||
segment.GetPartitionID(), segmentID, segment.GetInsertChannel(), tt)
|
||||
segment.GetPartitionID(), segmentID, segment.GetInsertChannel(), ct)
|
||||
if err != nil {
|
||||
log.Warn("failed to trigger single compaction", zap.Int64("segment ID", segmentID))
|
||||
} else {
|
||||
|
@ -828,14 +828,14 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
tt, err := getTimetravelReverseTime(ctx, s.allocator)
|
||||
ct, err := getCompactTime(ctx, s.allocator)
|
||||
if err != nil {
|
||||
log.Warn("failed to get timetravel reverse time", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err))
|
||||
resp.Status.Reason = err.Error()
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID, tt)
|
||||
id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID, ct)
|
||||
if err != nil {
|
||||
log.Error("failed to trigger manual compaction", zap.Int64("collectionID", req.GetCollectionID()), zap.Error(err))
|
||||
resp.Status.Reason = err.Error()
|
||||
|
|
|
@ -68,16 +68,24 @@ func FailResponse(status *commonpb.Status, reason string) {
|
|||
status.Reason = reason
|
||||
}
|
||||
|
||||
func getTimetravelReverseTime(ctx context.Context, allocator allocator) (*timetravel, error) {
|
||||
func getCompactTime(ctx context.Context, allocator allocator) (*compactTime, error) {
|
||||
ts, err := allocator.allocTimestamp(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pts, _ := tsoutil.ParseTS(ts)
|
||||
ttpts := pts.Add(-time.Duration(Params.CommonCfg.RetentionDuration) * time.Second)
|
||||
tt := tsoutil.ComposeTS(ttpts.UnixNano()/int64(time.Millisecond), 0)
|
||||
return &timetravel{tt}, nil
|
||||
ttRetention := pts.Add(-time.Duration(Params.CommonCfg.RetentionDuration) * time.Second)
|
||||
ttRetentionLogic := tsoutil.ComposeTS(ttRetention.UnixNano()/int64(time.Millisecond), 0)
|
||||
|
||||
// TODO, change to collection level
|
||||
if Params.CommonCfg.EntityExpirationTTL > 0 {
|
||||
ttexpired := pts.Add(-Params.CommonCfg.EntityExpirationTTL)
|
||||
ttexpiredLogic := tsoutil.ComposeTS(ttexpired.UnixNano()/int64(time.Millisecond), 0)
|
||||
return &compactTime{ttRetentionLogic, ttexpiredLogic}, nil
|
||||
}
|
||||
// no expiration time
|
||||
return &compactTime{ttRetentionLogic, 0}, nil
|
||||
}
|
||||
|
||||
func parseSegmentIDByBinlog(path string) (UniqueID, error) {
|
||||
|
|
|
@ -111,7 +111,7 @@ func TestVerifyResponse(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func Test_getTimetravelReverseTime(t *testing.T) {
|
||||
func Test_getCompactTime(t *testing.T) {
|
||||
Params.Init()
|
||||
Params.CommonCfg.RetentionDuration = 43200 // 5 days
|
||||
|
||||
|
@ -124,19 +124,19 @@ func Test_getTimetravelReverseTime(t *testing.T) {
|
|||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
want *timetravel
|
||||
want *compactTime
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
"test get timetravel",
|
||||
args{&fixedTSOAllocator{fixedTime: tFixed}},
|
||||
&timetravel{tsoutil.ComposeTS(tBefore.UnixNano()/int64(time.Millisecond), 0)},
|
||||
&compactTime{tsoutil.ComposeTS(tBefore.UnixNano()/int64(time.Millisecond), 0), 0},
|
||||
false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := getTimetravelReverseTime(context.TODO(), tt.args.allocator)
|
||||
got, err := getCompactTime(context.TODO(), tt.args.allocator)
|
||||
assert.Equal(t, tt.wantErr, err != nil)
|
||||
assert.EqualValues(t, tt.want, got)
|
||||
})
|
||||
|
|
|
@ -894,14 +894,25 @@ type dataCoordConfig struct {
|
|||
CreatedTime time.Time
|
||||
UpdatedTime time.Time
|
||||
|
||||
EnableCompaction bool
|
||||
EnableAutoCompaction atomic.Value
|
||||
EnableGarbageCollection bool
|
||||
// compaction
|
||||
EnableCompaction bool
|
||||
EnableAutoCompaction atomic.Value
|
||||
|
||||
MinSegmentToMerge int
|
||||
MaxSegmentToMerge int
|
||||
SegmentSmallProportion float64
|
||||
CompactionTimeoutInSeconds int32
|
||||
SingleCompactionRatioThreshold float32
|
||||
SingleCompactionDeltaLogMaxSize int64
|
||||
SingleCompactionExpiredLogMaxSize int64
|
||||
SingleCompactionBinlogMaxNum int64
|
||||
GlobalCompactionInterval time.Duration
|
||||
|
||||
// Garbage Collection
|
||||
GCInterval time.Duration
|
||||
GCMissingTolerance time.Duration
|
||||
GCDropTolerance time.Duration
|
||||
EnableGarbageCollection bool
|
||||
GCInterval time.Duration
|
||||
GCMissingTolerance time.Duration
|
||||
GCDropTolerance time.Duration
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) init(base *BaseTable) {
|
||||
|
@ -916,6 +927,16 @@ func (p *dataCoordConfig) init(base *BaseTable) {
|
|||
p.initEnableCompaction()
|
||||
p.initEnableAutoCompaction()
|
||||
|
||||
p.initCompactionMinSegment()
|
||||
p.initCompactionMaxSegment()
|
||||
p.initSegmentSmallProportion()
|
||||
p.initCompactionTimeoutInSeconds()
|
||||
p.initSingleCompactionRatioThreshold()
|
||||
p.initSingleCompactionDeltaLogMaxSize()
|
||||
p.initSingleCompactionExpiredLogMaxSize()
|
||||
p.initSingleCompactionBinlogMaxNum()
|
||||
p.initGlobalCompactionInterval()
|
||||
|
||||
p.initEnableGarbageCollection()
|
||||
p.initGCInterval()
|
||||
p.initGCMissingTolerance()
|
||||
|
@ -948,6 +969,52 @@ func (p *dataCoordConfig) initEnableCompaction() {
|
|||
p.EnableCompaction = p.Base.ParseBool("dataCoord.enableCompaction", false)
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initEnableAutoCompaction() {
|
||||
p.EnableAutoCompaction.Store(p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false))
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initCompactionMinSegment() {
|
||||
p.MinSegmentToMerge = p.Base.ParseIntWithDefault("dataCoord.compaction.min.segment", 4)
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initCompactionMaxSegment() {
|
||||
p.MaxSegmentToMerge = p.Base.ParseIntWithDefault("dataCoord.compaction.max.segment", 30)
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initSegmentSmallProportion() {
|
||||
p.SegmentSmallProportion = p.Base.ParseFloatWithDefault("dataCoord.segment.smallProportion", 0.5)
|
||||
}
|
||||
|
||||
// compaction execution timeout
|
||||
func (p *dataCoordConfig) initCompactionTimeoutInSeconds() {
|
||||
p.CompactionTimeoutInSeconds = p.Base.ParseInt32WithDefault("dataCoord.compaction.timeout", 60*3)
|
||||
}
|
||||
|
||||
// if total delete entities is large than a ratio of total entities, trigger single compaction.
|
||||
func (p *dataCoordConfig) initSingleCompactionRatioThreshold() {
|
||||
p.SingleCompactionRatioThreshold = float32(p.Base.ParseFloatWithDefault("dataCoord.compaction.single.ratio.threshold", 0.2))
|
||||
}
|
||||
|
||||
// if total delta file size > SingleCompactionDeltaLogMaxSize, trigger single compaction
|
||||
func (p *dataCoordConfig) initSingleCompactionDeltaLogMaxSize() {
|
||||
p.SingleCompactionDeltaLogMaxSize = p.Base.ParseInt64WithDefault("dataCoord.compaction.single.deltalog.maxsize", 2*1024*1024)
|
||||
}
|
||||
|
||||
// if total expired file size > SingleCompactionExpiredLogMaxSize, trigger single compaction
|
||||
func (p *dataCoordConfig) initSingleCompactionExpiredLogMaxSize() {
|
||||
p.SingleCompactionExpiredLogMaxSize = p.Base.ParseInt64WithDefault("dataCoord.compaction.single.expiredlog.maxsize", 10*1024*1024)
|
||||
}
|
||||
|
||||
// if total binlog number > SingleCompactionBinlogMaxNum, trigger single compaction to ensure binlog number per segment is limited
|
||||
func (p *dataCoordConfig) initSingleCompactionBinlogMaxNum() {
|
||||
p.SingleCompactionBinlogMaxNum = p.Base.ParseInt64WithDefault("dataCoord.compaction.single.binlog.maxnum", 1000)
|
||||
}
|
||||
|
||||
// interval we check and trigger global compaction
|
||||
func (p *dataCoordConfig) initGlobalCompactionInterval() {
|
||||
p.GlobalCompactionInterval = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.compaction.global.interval", int64(60*time.Second)))
|
||||
}
|
||||
|
||||
// -- GC --
|
||||
func (p *dataCoordConfig) initEnableGarbageCollection() {
|
||||
p.EnableGarbageCollection = p.Base.ParseBool("dataCoord.enableGarbageCollection", false)
|
||||
|
@ -965,10 +1032,6 @@ func (p *dataCoordConfig) initGCDropTolerance() {
|
|||
p.GCDropTolerance = time.Duration(p.Base.ParseInt64WithDefault("dataCoord.gc.dropTolerance", 24*60*60)) * time.Second
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) initEnableAutoCompaction() {
|
||||
p.EnableAutoCompaction.Store(p.Base.ParseBool("dataCoord.compaction.enableAutoCompaction", false))
|
||||
}
|
||||
|
||||
func (p *dataCoordConfig) SetEnableAutoCompaction(enable bool) {
|
||||
p.EnableAutoCompaction.Store(enable)
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ binary_vec_field_desc = "binary vector type field"
|
|||
max_dim = 32768
|
||||
gracefulTime = 1
|
||||
default_nlist = 128
|
||||
compact_segment_num_threshold = 10
|
||||
compact_segment_num_threshold = 4
|
||||
compact_delta_ratio_reciprocal = 5 # compact_delta_binlog_ratio is 0.2
|
||||
compact_retention_duration = 40 # compaction travel time retention range 20s
|
||||
max_compaction_interval = 60 # the max time interval (s) from the last compaction
|
||||
|
|
|
@ -966,7 +966,7 @@ class TestCompactionOperation(TestcaseBase):
|
|||
c_plans = collection_w.get_compaction_plans()[0]
|
||||
|
||||
# Actually no merged
|
||||
assert len(c_plans.plans) == 0
|
||||
assert len(c_plans.plans) == 2
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L3)
|
||||
def test_compact_delete_cross_shards(self):
|
||||
|
@ -1020,11 +1020,8 @@ class TestCompactionOperation(TestcaseBase):
|
|||
collection_w.wait_for_compaction_completed()
|
||||
c_plans = collection_w.get_compaction_plans()[0]
|
||||
|
||||
# Actually no merged
|
||||
assert len(c_plans.plans) == 0
|
||||
collection_w.load()
|
||||
segments_info = self.utility_wrap.get_query_segment_info(collection_w.name)[0]
|
||||
assert segments_info[0].partitionID != segments_info[-1].partitionID
|
||||
# since manual compaction, segment should be compacted any way
|
||||
assert len(c_plans.plans) != 0
|
||||
|
||||
@pytest.mark.tags(CaseLabel.L2)
|
||||
def test_compact_during_insert(self):
|
||||
|
|
Loading…
Reference in New Issue