enhance: Datacoord to support prioritization of compaction tasks (#36547)

See #36550

This PR made 2 changes:

1. Introducing a prioritization mechanism, if
`dataCoord.compaction.taskPrioritizer` is set to `level`, compaction
tasks are always executed as the priority of L0>Mix>Clustering
2. `dataCoord.compaction.maxParallelTaskNum` now controls the
parallelism of executing tasks, not the task number of queue +
executing.

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
pull/36628/head
Ted Xu 2024-10-09 19:11:20 +08:00 committed by GitHub
parent e1b511a219
commit 5fc731795b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 418 additions and 120 deletions

View File

@ -550,6 +550,7 @@ dataCoord:
# This configuration takes effect only when dataCoord.enableCompaction is set as true.
enableAutoCompaction: true
indexBasedCompaction: true
taskPrioritizer: default # compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2

View File

@ -24,9 +24,7 @@ import (
"time"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
@ -64,7 +62,6 @@ type compactionPlanContext interface {
var (
errChannelNotWatched = errors.New("channel is not watched")
errChannelInBuffer = errors.New("channel is in buffer")
errCompactionBusy = errors.New("compaction task queue is full")
)
var _ compactionPlanContext = (*compactionPlanHandler)(nil)
@ -79,8 +76,7 @@ type compactionInfo struct {
}
type compactionPlanHandler struct {
queueGuard lock.RWMutex
queueTasks map[int64]CompactionTask // planID -> task
queueTasks CompactionQueue
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task
@ -96,8 +92,6 @@ type compactionPlanHandler struct {
stopCh chan struct{}
stopOnce sync.Once
stopWg sync.WaitGroup
taskNumber *atomic.Int32
}
func (c *compactionPlanHandler) getCompactionInfo(triggerID int64) *compactionInfo {
@ -168,13 +162,11 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo {
func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int {
cnt := 0
c.queueGuard.RLock()
for _, t := range c.queueTasks {
if t.GetTriggerID() == triggerID {
c.queueTasks.ForEach(func(ct CompactionTask) {
if ct.GetTriggerID() == triggerID {
cnt += 1
}
}
c.queueGuard.RUnlock()
})
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.GetTriggerID() == triggerID {
@ -185,10 +177,11 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
return cnt
}
func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta, allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta,
allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
) *compactionPlanHandler {
return &compactionPlanHandler{
queueTasks: make(map[int64]CompactionTask),
queueTasks: *NewCompactionQueue(256, getPrioritizer()), // Higher capacity will have better ordering in priority, but consumes more memory.
chManager: cm,
meta: meta,
sessions: sessions,
@ -196,20 +189,12 @@ func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
taskNumber: atomic.NewInt32(0),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
}
func (c *compactionPlanHandler) schedule() []CompactionTask {
c.queueGuard.RLock()
if len(c.queueTasks) == 0 {
c.queueGuard.RUnlock()
return nil
}
c.queueGuard.RUnlock()
l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
clusterChannelExcludes := typeutil.NewSet[string]()
@ -231,42 +216,66 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
}
c.executingGuard.RUnlock()
var picked []CompactionTask
c.queueGuard.RLock()
defer c.queueGuard.RUnlock()
keys := lo.Keys(c.queueTasks)
sort.SliceStable(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
for _, planID := range keys {
t := c.queueTasks[planID]
excluded := make([]CompactionTask, 0)
defer func() {
// Add back the excluded tasks
for _, t := range excluded {
c.queueTasks.Enqueue(t)
}
}()
selected := make([]CompactionTask, 0)
p := getPrioritizer()
if &c.queueTasks.prioritizer != &p {
c.queueTasks.UpdatePrioritizer(p)
}
c.executingGuard.Lock()
tasksToGo := Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() - len(c.executingTasks)
c.executingGuard.Unlock()
for len(selected) < tasksToGo && c.queueTasks.Len() > 0 {
t, err := c.queueTasks.Dequeue()
if err != nil {
// Will never go here
return selected
}
switch t.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) ||
mixChannelExcludes.Contain(t.GetChannel()) {
excluded = append(excluded, t)
continue
}
picked = append(picked, t)
l0ChannelExcludes.Insert(t.GetChannel())
selected = append(selected, t)
case datapb.CompactionType_MixCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) {
excluded = append(excluded, t)
continue
}
picked = append(picked, t)
mixChannelExcludes.Insert(t.GetChannel())
mixLabelExcludes.Insert(t.GetLabel())
selected = append(selected, t)
case datapb.CompactionType_ClusteringCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) ||
mixLabelExcludes.Contain(t.GetLabel()) ||
clusterLabelExcludes.Contain(t.GetLabel()) {
excluded = append(excluded, t)
continue
}
picked = append(picked, t)
clusterChannelExcludes.Insert(t.GetChannel())
clusterLabelExcludes.Insert(t.GetLabel())
selected = append(selected, t)
}
c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc()
}
return picked
return selected
}
func (c *compactionPlanHandler) start() {
@ -325,26 +334,6 @@ func (c *compactionPlanHandler) loadMeta() {
}
}
func (c *compactionPlanHandler) doSchedule() {
picked := c.schedule()
if len(picked) > 0 {
c.executingGuard.Lock()
for _, t := range picked {
c.executingTasks[t.GetPlanID()] = t
}
c.executingGuard.Unlock()
c.queueGuard.Lock()
for _, t := range picked {
delete(c.queueTasks, t.GetPlanID())
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc()
}
c.queueGuard.Unlock()
}
}
func (c *compactionPlanHandler) loopSchedule() {
log.Info("compactionPlanHandler start loop schedule")
defer c.stopWg.Done()
@ -358,7 +347,7 @@ func (c *compactionPlanHandler) loopSchedule() {
return
case <-scheduleTicker.C:
c.doSchedule()
c.schedule()
}
}
}
@ -484,22 +473,20 @@ func (c *compactionPlanHandler) stop() {
}
func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.queueGuard.Lock()
for id, task := range c.queueTasks {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel), zap.Any("id", id), zap.Any("task_channel", task.GetChannel()))
log.Info("removing tasks by channel", zap.String("channel", channel))
c.queueTasks.RemoveAll(func(task CompactionTask) bool {
if task.GetChannel() == channel {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel),
zap.Int64("planID", task.GetPlanID()),
zap.Int64("node", task.GetNodeID()),
)
delete(c.queueTasks, id)
c.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Pending).Dec()
return true
}
}
c.queueGuard.Unlock()
return false
})
c.executingGuard.Lock()
for id, task := range c.executingTasks {
log.Info("Compaction handler removing tasks by channel",
@ -511,7 +498,6 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
zap.Int64("node", task.GetNodeID()),
)
delete(c.executingTasks, id)
c.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Executing).Dec()
}
}
@ -521,10 +507,7 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
func (c *compactionPlanHandler) submitTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.queueGuard.Lock()
c.queueTasks[t.GetPlanID()] = t
c.queueGuard.Unlock()
c.taskNumber.Add(1)
c.queueTasks.Enqueue(t)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc()
}
@ -535,26 +518,24 @@ func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingGuard.Unlock()
c.taskNumber.Add(1)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
}
// getCompactionTask return compaction
func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask {
c.queueGuard.RLock()
t, ok := c.queueTasks[planID]
if ok {
c.queueGuard.RUnlock()
var t CompactionTask = nil
c.queueTasks.ForEach(func(task CompactionTask) {
if task.GetPlanID() == planID {
t = task
}
})
if t != nil {
return t
}
c.queueGuard.RUnlock()
c.executingGuard.RLock()
t, ok = c.executingTasks[planID]
if ok {
c.executingGuard.RUnlock()
return t
}
c.executingGuard.RUnlock()
defer c.executingGuard.RUnlock()
t = c.executingTasks[planID]
return t
}
@ -669,7 +650,6 @@ func (c *compactionPlanHandler) checkCompaction() error {
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()
c.taskNumber.Sub(int32(len(finishedTasks)))
return nil
}
@ -708,23 +688,7 @@ func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t Compa
// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
return c.getTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}
func (c *compactionPlanHandler) getTaskCount() int {
return int(c.taskNumber.Load())
}
func (c *compactionPlanHandler) getTasksByState(state datapb.CompactionTaskState) []CompactionTask {
c.queueGuard.RLock()
defer c.queueGuard.RUnlock()
tasks := make([]CompactionTask, 0, len(c.queueTasks))
for _, t := range c.queueTasks {
if t.GetState() == state {
tasks = append(tasks, t)
}
}
return tasks
return c.queueTasks.Len() >= c.queueTasks.capacity
}
func (c *compactionPlanHandler) checkDelay(t CompactionTask) {

View File

@ -0,0 +1,187 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"container/heap"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/lock"
)
type Item[T any] struct {
value T
priority int // The priority of the item in the queue.
// The index is needed by update and is maintained by the heap.Interface methods.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue[T any] []*Item[T]
var _ heap.Interface = (*PriorityQueue[any])(nil)
func (pq PriorityQueue[T]) Len() int { return len(pq) }
func (pq PriorityQueue[T]) Less(i, j int) bool {
return pq[i].priority < pq[j].priority
}
func (pq PriorityQueue[T]) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue[T]) Push(x any) {
n := len(*pq)
item := x.(*Item[T])
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue[T]) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // don't stop the GC from reclaiming the item eventually
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// update modifies the priority and value of an Item in the queue.
func (pq *PriorityQueue[T]) Update(item *Item[T], value T, priority int) {
item.value = value
item.priority = priority
heap.Fix(pq, item.index)
}
var (
ErrFull = errors.New("compaction queue is full")
ErrNoSuchElement = errors.New("compaction queue has no element")
)
type Prioritizer func(t CompactionTask) int
type CompactionQueue struct {
pq PriorityQueue[CompactionTask]
lock lock.RWMutex
prioritizer Prioritizer
capacity int
}
func NewCompactionQueue(capacity int, prioritizer Prioritizer) *CompactionQueue {
return &CompactionQueue{
pq: make(PriorityQueue[CompactionTask], 0),
lock: lock.RWMutex{},
prioritizer: prioritizer,
capacity: capacity,
}
}
func (q *CompactionQueue) Enqueue(t CompactionTask) error {
q.lock.Lock()
defer q.lock.Unlock()
if q.capacity > 0 && len(q.pq) >= q.capacity {
return ErrFull
}
heap.Push(&q.pq, &Item[CompactionTask]{value: t, priority: q.prioritizer(t)})
return nil
}
func (q *CompactionQueue) Dequeue() (CompactionTask, error) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.pq) == 0 {
return nil, ErrNoSuchElement
}
item := heap.Pop(&q.pq).(*Item[CompactionTask])
return item.value, nil
}
func (q *CompactionQueue) UpdatePrioritizer(prioritizer Prioritizer) {
q.prioritizer = prioritizer
q.lock.Lock()
defer q.lock.Unlock()
for i := range q.pq {
q.pq[i].priority = q.prioritizer(q.pq[i].value)
}
heap.Init(&q.pq)
}
func (q *CompactionQueue) RemoveAll(predicate func(CompactionTask) bool) {
q.lock.Lock()
defer q.lock.Unlock()
f := lo.Filter[*Item[CompactionTask]](q.pq, func(i1 *Item[CompactionTask], _ int) bool {
return !predicate(i1.value)
})
q.pq = f
heap.Init(&q.pq)
}
// ForEach calls f on each item in the queue.
func (q *CompactionQueue) ForEach(f func(CompactionTask)) {
q.lock.RLock()
defer q.lock.RUnlock()
lo.ForEach[*Item[CompactionTask]](q.pq, func(i *Item[CompactionTask], _ int) {
f(i.value)
})
}
func (q *CompactionQueue) Len() int {
q.lock.RLock()
defer q.lock.RUnlock()
return len(q.pq)
}
var (
DefaultPrioritizer Prioritizer = func(task CompactionTask) int {
return int(task.GetPlanID())
}
LevelPrioritizer Prioritizer = func(task CompactionTask) int {
switch task.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
return 1
case datapb.CompactionType_MixCompaction:
return 10
case datapb.CompactionType_ClusteringCompaction:
return 100
case datapb.CompactionType_MinorCompaction:
case datapb.CompactionType_MajorCompaction:
return 1000
}
return 0xffff
}
)
func getPrioritizer() Prioritizer {
p := Params.DataCoordCfg.CompactionTaskPrioritizer.GetValue()
switch p {
case "level":
return LevelPrioritizer
default:
return DefaultPrioritizer
}
}

View File

@ -0,0 +1,151 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package datacoord
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus/internal/proto/datapb"
)
func TestCompactionQueue(t *testing.T) {
t1 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 3,
Type: datapb.CompactionType_MixCompaction,
},
}
t2 := &l0CompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 1,
Type: datapb.CompactionType_Level0DeleteCompaction,
},
}
t3 := &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: 2,
Type: datapb.CompactionType_MajorCompaction,
},
}
t.Run("default prioritizer", func(t *testing.T) {
cq := NewCompactionQueue(3, DefaultPrioritizer)
err := cq.Enqueue(t1)
assert.NoError(t, err)
err = cq.Enqueue(t2)
assert.NoError(t, err)
err = cq.Enqueue(t3)
assert.NoError(t, err)
err = cq.Enqueue(&mixCompactionTask{})
assert.Error(t, err)
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(1), task.GetPlanID())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(2), task.GetPlanID())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(3), task.GetPlanID())
})
t.Run("level prioritizer", func(t *testing.T) {
cq := NewCompactionQueue(3, LevelPrioritizer)
err := cq.Enqueue(t1)
assert.NoError(t, err)
err = cq.Enqueue(t2)
assert.NoError(t, err)
err = cq.Enqueue(t3)
assert.NoError(t, err)
err = cq.Enqueue(&mixCompactionTask{})
assert.Error(t, err)
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MixCompaction, task.GetType())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_MajorCompaction, task.GetType())
})
t.Run("update prioritizer", func(t *testing.T) {
cq := NewCompactionQueue(3, LevelPrioritizer)
err := cq.Enqueue(t1)
assert.NoError(t, err)
err = cq.Enqueue(t2)
assert.NoError(t, err)
err = cq.Enqueue(t3)
assert.NoError(t, err)
err = cq.Enqueue(&mixCompactionTask{})
assert.Error(t, err)
task, err := cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, datapb.CompactionType_Level0DeleteCompaction, task.GetType())
cq.UpdatePrioritizer(DefaultPrioritizer)
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(2), task.GetPlanID())
task, err = cq.Dequeue()
assert.NoError(t, err)
assert.Equal(t, int64(3), task.GetPlanID())
})
}
func TestConcurrency(t *testing.T) {
c := 10
cq := NewCompactionQueue(c, LevelPrioritizer)
wg := sync.WaitGroup{}
wg.Add(c)
for i := 0; i < c; i++ {
t1 := &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
PlanID: int64(i),
Type: datapb.CompactionType_MixCompaction,
},
}
go func() {
err := cq.Enqueue(t1)
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
wg.Add(c)
for i := 0; i < c; i++ {
go func() {
_, err := cq.Dequeue()
assert.NoError(t, err)
wg.Done()
}()
}
wg.Wait()
}

View File

@ -214,19 +214,15 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWith1ParallelTask() {
s.Run(test.description, func() {
s.SetupTest()
s.generateInitTasksForSchedule()
s.Require().Equal(4, s.handler.getTaskCount())
// submit the testing tasks
for _, t := range test.tasks {
s.handler.submitTask(t)
}
s.Equal(4+len(test.tasks), s.handler.getTaskCount())
gotTasks := s.handler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 {
return t.GetPlanID()
}))
s.Equal(4+len(test.tasks), s.handler.getTaskCount())
})
}
}
@ -332,13 +328,11 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
for _, test := range tests {
s.Run(test.description, func() {
s.SetupTest()
s.Require().Equal(0, s.handler.getTaskCount())
// submit the testing tasks
for _, t := range test.tasks {
s.handler.submitTask(t)
}
s.Equal(len(test.tasks), s.handler.getTaskCount())
gotTasks := s.handler.schedule()
s.Equal(test.expectedOut, lo.Map(gotTasks, func(t CompactionTask, _ int) int64 {
@ -531,7 +525,6 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
s.handler.submitTask(t1)
s.handler.restoreTask(t2)
s.handler.removeTasksByChannel(ch)
s.Equal(0, s.handler.getTaskCount())
}
func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
@ -601,9 +594,7 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
s.handler.submitTask(t)
}
s.Equal(3, s.handler.getTaskCount())
s.handler.doSchedule()
s.Equal(3, s.handler.getTaskCount())
s.handler.schedule()
info := s.handler.getCompactionInfo(1)
s.Equal(1, info.completedCnt)
@ -627,7 +618,6 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
s.NoError(err)
t := handler.getCompactionTask(1)
s.NotNil(t)
s.handler.taskNumber.Add(1000)
task.PlanID = 2
err = s.handler.enqueueCompaction(task)
s.NoError(err)
@ -759,10 +749,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckCompaction() {
s.handler.submitTask(t)
}
picked := s.handler.schedule()
s.NotEmpty(picked)
s.handler.doSchedule()
s.handler.schedule()
// time.Sleep(2 * time.Second)
s.handler.checkCompaction()
@ -903,11 +890,9 @@ func (s *CompactionPlanHandlerSuite) TestProcessCompleteCompaction() {
s.mockSessMgr.EXPECT().DropCompactionPlan(mock.Anything, mock.Anything).Return(nil)
s.handler.submitTask(task)
s.handler.doSchedule()
s.Equal(1, s.handler.getTaskCount())
s.handler.schedule()
err := s.handler.checkCompaction()
s.NoError(err)
s.Equal(0, len(s.handler.getTasksByState(datapb.CompactionTaskState_completed)))
}
func getFieldBinlogIDs(fieldID int64, logIDs ...int64) *datapb.FieldBinlog {

View File

@ -3205,9 +3205,10 @@ type dataCoordConfig struct {
SegmentFlushInterval ParamItem `refreshable:"true"`
// compaction
EnableCompaction ParamItem `refreshable:"false"`
EnableAutoCompaction ParamItem `refreshable:"true"`
IndexBasedCompaction ParamItem `refreshable:"true"`
EnableCompaction ParamItem `refreshable:"false"`
EnableAutoCompaction ParamItem `refreshable:"true"`
IndexBasedCompaction ParamItem `refreshable:"true"`
CompactionTaskPrioritizer ParamItem `refreshable:"true"`
CompactionRPCTimeout ParamItem `refreshable:"true"`
CompactionMaxParallelTasks ParamItem `refreshable:"true"`
@ -3482,6 +3483,15 @@ This configuration takes effect only when dataCoord.enableCompaction is set as t
}
p.IndexBasedCompaction.Init(base.mgr)
p.CompactionTaskPrioritizer = ParamItem{
Key: "dataCoord.compaction.taskPrioritizer",
Version: "2.5.0",
DefaultValue: "default",
Doc: "compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.",
Export: true,
}
p.CompactionTaskPrioritizer.Init(base.mgr)
p.CompactionRPCTimeout = ParamItem{
Key: "dataCoord.compaction.rpcTimeout",
Version: "2.2.12",