milvus/internal/datacoord/compaction_scheduler.go

212 lines
6.0 KiB
Go

package datacoord
import (
"fmt"
"sync"
"github.com/samber/lo"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
type Scheduler interface {
Submit(t ...*compactionTask)
Schedule() []*compactionTask
Finish(nodeID int64, plan *datapb.CompactionPlan)
GetTaskCount() int
LogStatus()
// Start()
// Stop()
// IsFull() bool
// GetCompactionTasksBySignalID(signalID int64) []compactionTask
}
type CompactionScheduler struct {
taskNumber *atomic.Int32
queuingTasks []*compactionTask
parallelTasks map[int64][]*compactionTask // parallel by nodeID
mu sync.RWMutex
planHandler *compactionPlanHandler
}
var _ Scheduler = (*CompactionScheduler)(nil)
func NewCompactionScheduler() *CompactionScheduler {
return &CompactionScheduler{
taskNumber: atomic.NewInt32(0),
queuingTasks: make([]*compactionTask, 0),
parallelTasks: make(map[int64][]*compactionTask),
}
}
func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {
s.mu.Lock()
s.queuingTasks = append(s.queuingTasks, tasks...)
s.mu.Unlock()
s.taskNumber.Add(int32(len(tasks)))
lo.ForEach(tasks, func(t *compactionTask, _ int) {
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(t.dataNodeID), t.plan.GetType().String(), metrics.Pending).Inc()
})
s.LogStatus()
}
// Schedule pick 1 or 0 tasks for 1 node
func (s *CompactionScheduler) Schedule() []*compactionTask {
nodeTasks := make(map[int64][]*compactionTask) // nodeID
s.mu.Lock()
defer s.mu.Unlock()
for _, task := range s.queuingTasks {
if _, ok := nodeTasks[task.dataNodeID]; !ok {
nodeTasks[task.dataNodeID] = make([]*compactionTask, 0)
}
nodeTasks[task.dataNodeID] = append(nodeTasks[task.dataNodeID], task)
}
executable := make(map[int64]*compactionTask)
pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask {
for _, task := range tasks {
if lo.Contains(exclusiveChannels, task.plan.GetChannel()) {
continue
}
if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
// Channel of LevelZeroCompaction task with no executing compactions
if !lo.Contains(executing, task.plan.GetChannel()) {
return task
}
// Don't schedule any tasks for channel with LevelZeroCompaction task
// when there're executing compactions
exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel())
continue
}
return task
}
return nil
}
// pick 1 or 0 task for 1 node
for node, tasks := range nodeTasks {
parallel := s.parallelTasks[node]
if len(parallel) >= calculateParallel() {
log.Info("Compaction parallel in DataNode reaches the limit", zap.Int64("nodeID", node), zap.Int("parallel", len(parallel)))
continue
}
var (
executing = typeutil.NewSet[string]()
channelsExecPrior = typeutil.NewSet[string]()
)
for _, t := range parallel {
executing.Insert(t.plan.GetChannel())
if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
channelsExecPrior.Insert(t.plan.GetChannel())
}
}
picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect())
if picked != nil {
executable[node] = picked
}
}
var pickPlans []int64
for node, task := range executable {
pickPlans = append(pickPlans, task.plan.PlanID)
if _, ok := s.parallelTasks[node]; !ok {
s.parallelTasks[node] = []*compactionTask{task}
} else {
s.parallelTasks[node] = append(s.parallelTasks[node], task)
}
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Executing).Inc()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(node), task.plan.GetType().String(), metrics.Pending).Dec()
}
s.queuingTasks = lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
return !lo.Contains(pickPlans, t.plan.PlanID)
})
// clean parallelTasks with nodes of no running tasks
for node, tasks := range s.parallelTasks {
if len(tasks) == 0 {
delete(s.parallelTasks, node)
}
}
return lo.Values(executable)
}
func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPlan) {
planID := plan.GetPlanID()
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID))
s.mu.Lock()
if parallel, ok := s.parallelTasks[nodeID]; ok {
tasks := lo.Filter(parallel, func(t *compactionTask, _ int) bool {
return t.plan.PlanID != planID
})
s.parallelTasks[nodeID] = tasks
s.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc()
log.Info("Compaction scheduler remove task from executing")
}
filtered := lo.Filter(s.queuingTasks, func(t *compactionTask, _ int) bool {
return t.plan.PlanID != planID
})
if len(filtered) < len(s.queuingTasks) {
s.queuingTasks = filtered
s.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.
WithLabelValues(fmt.Sprint(nodeID), plan.GetType().String(), metrics.Done).Inc()
log.Info("Compaction scheduler remove task from queue")
}
s.mu.Unlock()
s.LogStatus()
}
func (s *CompactionScheduler) LogStatus() {
s.mu.RLock()
defer s.mu.RUnlock()
waiting := lo.Map(s.queuingTasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})
var executing []int64
for _, tasks := range s.parallelTasks {
executing = append(executing, lo.Map(tasks, func(t *compactionTask, _ int) int64 {
return t.plan.PlanID
})...)
}
if len(waiting) > 0 || len(executing) > 0 {
log.Info("Compaction scheduler status", zap.Int64s("waiting", waiting), zap.Int64s("executing", executing))
}
}
func (s *CompactionScheduler) GetTaskCount() int {
return int(s.taskNumber.Load())
}