enhance: adding mix compaction first prioritizer (#36956)

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/36999/head
Ted Xu 2024-10-18 11:37:24 +08:00 committed by GitHub
parent b7ffa8383c
commit 50da48a30d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 50 additions and 8 deletions

View File

@ -554,7 +554,11 @@ dataCoord:
# This configuration takes effect only when dataCoord.enableCompaction is set as true.
enableAutoCompaction: true
indexBasedCompaction: true
taskPrioritizer: default # compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.
# compaction task prioritizer, options: [default, level, mix].
# default is FIFO.
# level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.
# mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.
taskPrioritizer: default
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2

View File

@ -168,11 +168,22 @@ var (
return 10
case datapb.CompactionType_ClusteringCompaction:
return 100
case datapb.CompactionType_MinorCompaction:
case datapb.CompactionType_MajorCompaction:
default:
return 1000
}
}
MixFirstPrioritizer Prioritizer = func(task CompactionTask) int {
switch task.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
return 10
case datapb.CompactionType_MixCompaction:
return 1
case datapb.CompactionType_ClusteringCompaction:
return 100
default:
return 1000
}
return 0xffff
}
)
@ -181,6 +192,8 @@ func getPrioritizer() Prioritizer {
switch p {
case "level":
return LevelPrioritizer
case "mix":
return MixFirstPrioritizer
default:
return DefaultPrioritizer
}

View File

@ -43,7 +43,7 @@ func TestCompactionQueue(t *testing.T) {
t3 := &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 2,
Type: datapb.CompactionType_MajorCompaction,
Type: datapb.CompactionType_ClusteringCompaction,
},
}
@ -88,7 +88,29 @@ func TestCompactionQueue(t *testing.T) {
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MajorCompaction, task.GetType())
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType())
})
t.Run("mix first prioritizer", func(t *testing.T) {
cq := NewCompactionQueue(3, MixFirstPrioritizer)
err := cq.Enqueue(t1)
assert.NoError(t, err)
err = cq.Enqueue(t2)
assert.NoError(t, err)
err = cq.Enqueue(t3)
assert.NoError(t, err)
err = cq.Enqueue(&mixCompactionTask{})
assert.Error(t, err)
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_ClusteringCompaction, task.GetType())
})
t.Run("update prioritizer", func(t *testing.T) {

View File

@ -3465,8 +3465,11 @@ This configuration takes effect only when dataCoord.enableCompaction is set as t
Key: "dataCoord.compaction.taskPrioritizer",
Version: "2.5.0",
DefaultValue: "default",
Doc: "compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.",
Export: true,
Doc: `compaction task prioritizer, options: [default, level, mix].
default is FIFO.
level is prioritized by level: L0 compactions first, then mix compactions, then clustering compactions.
mix is prioritized by level: mix compactions first, then L0 compactions, then clustering compactions.`,
Export: true,
}
p.CompactionTaskPrioritizer.Init(base.mgr)