mirror of https://github.com/milvus-io/milvus.git
Compaction bypass stale segments (#20202)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com> Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>pull/20240/head
parent
9293524de4
commit
5f5fdb0789
|
@ -449,6 +449,11 @@ func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, c
|
|||
// TODO, currently we lack of the measurement of data distribution, there should be another compaction help on redistributing segment based on scalar/vector field distribution
|
||||
for _, segment := range segments {
|
||||
segment := segment.ShadowClone()
|
||||
// by-pass stale segments
|
||||
if !force && t.isStaleSegment(segment) {
|
||||
log.Debug("generate plans skip stale segment", zap.Int64("segmentID", segment.GetID()), zap.Time("lastFlushTime", segment.lastFlushTime))
|
||||
continue
|
||||
}
|
||||
// TODO should we trigger compaction periodically even if the segment has no obvious reason to be compacted?
|
||||
if force || t.ShouldDoSingleCompaction(segment, compactTime) {
|
||||
prioritizedCandidates = append(prioritizedCandidates, segment)
|
||||
|
@ -630,6 +635,10 @@ func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) isStaleSegment(segment *SegmentInfo) bool {
|
||||
return time.Since(segment.lastFlushTime).Minutes() >= segmentTimedFlushDuration
|
||||
}
|
||||
|
||||
func (t *compactionTrigger) ShouldDoSingleCompaction(segment *SegmentInfo, compactTime *compactTime) bool {
|
||||
// count all the binlog file count
|
||||
var totalLogNum int
|
||||
|
|
|
@ -855,6 +855,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
2: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
@ -881,6 +882,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
3: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
@ -900,6 +902,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
4: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
@ -919,6 +922,27 @@ func Test_compactionTrigger_noplan(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
5: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: 4,
|
||||
CollectionID: 2,
|
||||
PartitionID: 1,
|
||||
LastExpireTime: 100,
|
||||
NumOfRows: 3,
|
||||
MaxRowNum: 300,
|
||||
InsertChannel: "ch1",
|
||||
State: commonpb.SegmentState_Flushed,
|
||||
Binlogs: []*datapb.FieldBinlog{
|
||||
{
|
||||
Binlogs: []*datapb.Binlog{
|
||||
{EntriesNum: 5, LogPath: "log1", LogSize: 100},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Unix(0, 0),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1033,6 +1057,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
2: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
@ -1059,6 +1084,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
3: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
@ -1085,6 +1111,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
4: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
@ -1111,6 +1138,7 @@ func Test_compactionTrigger_smallfiles(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -1220,6 +1248,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
lastFlushTime: time.Now(),
|
||||
}
|
||||
segmentInfos.segments[i] = info
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue