enhance: [10kcp] Remove scheduler and target manager mutex (#38968)

supplement to PR https://github.com/milvus-io/milvus/pull/38566

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
pull/38972/head
yihao.dai 2025-01-03 11:18:52 +08:00 committed by GitHub
parent 663ec6f822
commit 9b2b2a2689
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 152 additions and 181 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -218,28 +219,33 @@ func (p *CollectionTarget) Ready() bool {
}
type target struct {
keyLock *lock.KeyLock[int64] // guards updateCollectionTarget
// just maintain target at collection level
collectionTargetMap map[int64]*CollectionTarget
collectionTargetMap *typeutil.ConcurrentMap[int64, *CollectionTarget]
}
func newTarget() *target {
return &target{
collectionTargetMap: make(map[int64]*CollectionTarget),
keyLock: lock.NewKeyLock[int64](),
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
}
}
func (t *target) updateCollectionTarget(collectionID int64, target *CollectionTarget) {
if t.collectionTargetMap[collectionID] != nil && target.GetTargetVersion() <= t.collectionTargetMap[collectionID].GetTargetVersion() {
t.keyLock.Lock(collectionID)
defer t.keyLock.Unlock(collectionID)
if old, ok := t.collectionTargetMap.Get(collectionID); ok && old != nil && target.GetTargetVersion() <= old.GetTargetVersion() {
return
}
t.collectionTargetMap[collectionID] = target
t.collectionTargetMap.Insert(collectionID, target)
}
func (t *target) removeCollectionTarget(collectionID int64) {
delete(t.collectionTargetMap, collectionID)
t.collectionTargetMap.Remove(collectionID)
}
func (t *target) getCollectionTarget(collectionID int64) *CollectionTarget {
return t.collectionTargetMap[collectionID]
ret, _ := t.collectionTargetMap.Get(collectionID)
return ret
}

View File

@ -72,9 +72,8 @@ type TargetManagerInterface interface {
}
type TargetManager struct {
rwMutex sync.RWMutex
broker Broker
meta *Meta
broker Broker
meta *Meta
// all read segment/channel operation happens on current -> only current target are visible to outer
// all add segment/channel operation happens on next -> changes can only happen on next target
@ -96,8 +95,6 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager {
// WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update,
// which may make the current target not available
func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64) bool {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
log := log.With(zap.Int64("collectionID", collectionID))
log.Debug("start to update current target for collection")
@ -192,9 +189,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
allocatedTarget := NewCollectionTarget(segments, dmChannels, partitionIDs)
mgr.rwMutex.Lock()
mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
mgr.rwMutex.Unlock()
log.Debug("finish to update next targets for collection",
zap.Int64("collectionID", collectionID),
@ -225,8 +220,6 @@ func mergeDmChannelInfo(infos []*datapb.VchannelInfo) *DmChannel {
// RemoveCollection removes all channels and segments in the given collection
func (mgr *TargetManager) RemoveCollection(collectionID int64) {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
log.Info("remove collection from targets",
zap.Int64("collectionID", collectionID))
@ -247,9 +240,6 @@ func (mgr *TargetManager) RemoveCollection(collectionID int64) {
// RemovePartition removes all segment in the given partition,
// NOTE: this doesn't remove any channel even the given one is the only partition
func (mgr *TargetManager) RemovePartition(collectionID int64, partitionIDs ...int64) {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
log := log.With(zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs))
@ -356,9 +346,6 @@ func (mgr *TargetManager) getCollectionTarget(scope TargetScope, collectionID in
func (mgr *TargetManager) GetGrowingSegmentsByCollection(collectionID int64,
scope TargetScope,
) typeutil.UniqueSet {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
@ -379,9 +366,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
channelName string,
scope TargetScope,
) typeutil.UniqueSet {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := typeutil.NewUniqueSet()
@ -402,9 +386,6 @@ func (mgr *TargetManager) GetGrowingSegmentsByChannel(collectionID int64,
func (mgr *TargetManager) GetSealedSegmentsByCollection(collectionID int64,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
@ -418,9 +399,6 @@ func (mgr *TargetManager) GetSealedSegmentsByChannel(collectionID int64,
channelName string,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
ret := lo.KeyBy(t.GetChannelSegments(channelName), func(s *datapb.SegmentInfo) int64 {
@ -439,9 +417,6 @@ func (mgr *TargetManager) GetDroppedSegmentsByChannel(collectionID int64,
channelName string,
scope TargetScope,
) []int64 {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if channel, ok := t.dmChannels[channelName]; ok {
@ -456,9 +431,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
partitionID int64,
scope TargetScope,
) map[int64]*datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := make(map[int64]*datapb.SegmentInfo)
@ -475,9 +447,6 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
}
func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope TargetScope) map[string]*DmChannel {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
@ -488,9 +457,6 @@ func (mgr *TargetManager) GetDmChannelsByCollection(collectionID int64, scope Ta
}
func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope TargetScope) *DmChannel {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if ch, ok := t.GetAllDmChannels()[channel]; ok {
@ -501,9 +467,6 @@ func (mgr *TargetManager) GetDmChannel(collectionID int64, channel string, scope
}
func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope TargetScope) *datapb.SegmentInfo {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if s, ok := t.GetAllSegments()[id]; ok {
@ -515,9 +478,6 @@ func (mgr *TargetManager) GetSealedSegment(collectionID int64, id int64, scope T
}
func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope TargetScope) int64 {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
if t.GetTargetVersion() > 0 {
@ -529,9 +489,6 @@ func (mgr *TargetManager) GetCollectionTargetVersion(collectionID int64, scope T
}
func (mgr *TargetManager) IsCurrentTargetExist(collectionID int64, partitionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
targets := mgr.getCollectionTarget(CurrentTarget, collectionID)
return len(targets) > 0 && (targets[0].partitions.Contain(partitionID) || partitionID == common.AllPartitionsID) && len(targets[0].dmChannels) > 0
@ -544,8 +501,6 @@ func (mgr *TargetManager) IsNextTargetExist(collectionID int64) bool {
}
func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog) {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
if mgr.current != nil {
// use pool here to control maximal writer used by save target
pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2)
@ -569,13 +524,14 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
})
}
tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
for id, target := range mgr.current.collectionTargetMap {
mgr.current.collectionTargetMap.Range(func(id int64, target *CollectionTarget) bool {
tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg()))
if len(tasks) >= batchSize {
submit(tasks)
tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
}
}
return true
})
if len(tasks) > 0 {
submit(tasks)
}
@ -584,9 +540,6 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
}
func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
targets, err := catalog.GetCollectionTargets()
if err != nil {
log.Warn("failed to recover collection target from etcd", zap.Error(err))
@ -615,8 +568,6 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
// if segment isn't l0 segment, and exist in current/next target, then it can be moved
func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
current := mgr.current.getCollectionTarget(collectionID)
if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 {
return true
@ -631,9 +582,7 @@ func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool
}
func (mgr *TargetManager) IsCurrentTargetReady(collectionID int64) bool {
mgr.rwMutex.RLock()
defer mgr.rwMutex.RUnlock()
target, ok := mgr.current.collectionTargetMap[collectionID]
target, ok := mgr.current.collectionTargetMap.Get(collectionID)
if !ok {
return false
}

View File

@ -412,33 +412,38 @@ func (suite *TargetManagerSuite) TestGetTarget() {
current := &CollectionTarget{}
next := &CollectionTarget{}
t1 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t2 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t3 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t4 := typeutil.NewConcurrentMap[int64, *CollectionTarget]()
t1.Insert(1000, current)
t2.Insert(1000, next)
t3.Insert(1000, current)
t4.Insert(1000, current)
bothMgr := &TargetManager{
current: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
collectionTargetMap: t1,
},
next: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: next,
},
collectionTargetMap: t2,
},
}
currentMgr := &TargetManager{
current: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
collectionTargetMap: t3,
},
next: &target{
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
},
next: &target{},
}
nextMgr := &TargetManager{
next: &target{
collectionTargetMap: map[int64]*CollectionTarget{
1000: current,
},
collectionTargetMap: t4,
},
current: &target{
collectionTargetMap: typeutil.NewConcurrentMap[int64, *CollectionTarget](),
},
current: &target{},
}
cases := []testCase{
@ -647,7 +652,7 @@ func BenchmarkTargetManager(b *testing.B) {
collectionNum := 10000
for i := 0; i < collectionNum; i++ {
mgr.current.collectionTargetMap[int64(i)] = NewCollectionTarget(segments, channels, nil)
mgr.current.collectionTargetMap.Insert(int64(i), NewCollectionTarget(segments, channels, nil))
}
b.ResetTimer()
for i := 0; i < b.N; i++ {

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/timerecord"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
@ -90,6 +91,7 @@ type replicaChannelIndex struct {
}
type taskQueue struct {
mu sync.RWMutex
// TaskPriority -> TaskID -> Task
buckets []map[int64]Task
}
@ -105,6 +107,8 @@ func newTaskQueue() *taskQueue {
}
func (queue *taskQueue) Len() int {
queue.mu.RLock()
defer queue.mu.RUnlock()
taskNum := 0
for _, tasks := range queue.buckets {
taskNum += len(tasks)
@ -114,17 +118,23 @@ func (queue *taskQueue) Len() int {
}
func (queue *taskQueue) Add(task Task) {
queue.mu.Lock()
defer queue.mu.Unlock()
bucket := queue.buckets[task.Priority()]
bucket[task.ID()] = task
}
func (queue *taskQueue) Remove(task Task) {
queue.mu.Lock()
defer queue.mu.Unlock()
bucket := queue.buckets[task.Priority()]
delete(bucket, task.ID())
}
// Range iterates all tasks in the queue ordered by priority from high to low
func (queue *taskQueue) Range(fn func(task Task) bool) {
queue.mu.RLock()
defer queue.mu.RUnlock()
for priority := len(queue.buckets) - 1; priority >= 0; priority-- {
for _, task := range queue.buckets[priority] {
if !fn(task) {
@ -151,9 +161,8 @@ type Scheduler interface {
}
type taskScheduler struct {
rwmutex sync.RWMutex
ctx context.Context
executors map[int64]*Executor // NodeID -> Executor
executors *ConcurrentMap[int64, *Executor] // NodeID -> Executor
idAllocator func() UniqueID
distMgr *meta.DistributionManager
@ -163,9 +172,10 @@ type taskScheduler struct {
cluster session.Cluster
nodeMgr *session.NodeManager
tasks UniqueSet
segmentTasks map[replicaSegmentIndex]Task
channelTasks map[replicaChannelIndex]Task
collKeyLock *lock.KeyLock[int64] // guards Add()
tasks *ConcurrentMap[UniqueID, struct{}]
segmentTasks *ConcurrentMap[replicaSegmentIndex, Task]
channelTasks *ConcurrentMap[replicaChannelIndex, Task]
processQueue *taskQueue
waitQueue *taskQueue
@ -183,7 +193,7 @@ func NewScheduler(ctx context.Context,
id := time.Now().UnixMilli()
return &taskScheduler{
ctx: ctx,
executors: make(map[int64]*Executor),
executors: NewConcurrentMap[int64, *Executor](),
idAllocator: func() UniqueID {
id++
return id
@ -196,9 +206,10 @@ func NewScheduler(ctx context.Context,
cluster: cluster,
nodeMgr: nodeMgr,
tasks: make(UniqueSet),
segmentTasks: make(map[replicaSegmentIndex]Task),
channelTasks: make(map[replicaChannelIndex]Task),
collKeyLock: lock.NewKeyLock[int64](),
tasks: NewConcurrentMap[UniqueID, struct{}](),
segmentTasks: NewConcurrentMap[replicaSegmentIndex, Task](),
channelTasks: NewConcurrentMap[replicaChannelIndex, Task](),
processQueue: newTaskQueue(),
waitQueue: newTaskQueue(),
}
@ -207,30 +218,22 @@ func NewScheduler(ctx context.Context,
func (scheduler *taskScheduler) Start() {}
func (scheduler *taskScheduler) Stop() {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
for nodeID, executor := range scheduler.executors {
scheduler.executors.Range(func(nodeID int64, executor *Executor) bool {
executor.Stop()
delete(scheduler.executors, nodeID)
}
return true
})
for _, task := range scheduler.segmentTasks {
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
scheduler.remove(task)
}
for _, task := range scheduler.channelTasks {
return true
})
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
scheduler.remove(task)
}
return true
})
}
func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
if _, exist := scheduler.executors[nodeID]; exist {
return
}
executor := NewExecutor(scheduler.meta,
scheduler.distMgr,
scheduler.broker,
@ -238,27 +241,24 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
scheduler.cluster,
scheduler.nodeMgr)
scheduler.executors[nodeID] = executor
if _, exist := scheduler.executors.GetOrInsert(nodeID, executor); exist {
return
}
executor.Start(scheduler.ctx)
log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
}
func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
executor, ok := scheduler.executors[nodeID]
executor, ok := scheduler.executors.GetAndRemove(nodeID)
if ok {
executor.Stop()
delete(scheduler.executors, nodeID)
log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
log.Ctx(scheduler.ctx).Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
}
}
func (scheduler *taskScheduler) Add(task Task) error {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
scheduler.collKeyLock.Lock(task.CollectionID())
defer scheduler.collKeyLock.Unlock(task.CollectionID())
err := scheduler.preAdd(task)
if err != nil {
task.Cancel(err)
@ -267,19 +267,19 @@ func (scheduler *taskScheduler) Add(task Task) error {
task.SetID(scheduler.idAllocator())
scheduler.waitQueue.Add(task)
scheduler.tasks.Insert(task.ID())
scheduler.tasks.Insert(task.ID(), struct{}{})
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)
scheduler.segmentTasks[index] = task
scheduler.segmentTasks.Insert(index, task)
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
scheduler.channelTasks[index] = task
scheduler.channelTasks.Insert(index, task)
case *LeaderTask:
index := NewReplicaLeaderIndex(task)
scheduler.segmentTasks[index] = task
scheduler.segmentTasks.Insert(index, task)
}
log.Ctx(task.Context()).Info("task added", zap.String("task", task.String()))
@ -288,24 +288,42 @@ func (scheduler *taskScheduler) Add(task Task) error {
}
func (scheduler *taskScheduler) updateTaskMetrics() {
if time.Since(scheduler.lastUpdateMetricTime) < 30*time.Second {
if time.Since(scheduler.lastUpdateMetricTime) < 60*time.Second {
return
}
segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
segmentGrowNum, segmentReduceNum, segmentUpdateNum, segmentMoveNum := 0, 0, 0, 0
leaderGrowNum, leaderReduceNum, leaderUpdateNum := 0, 0, 0
channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
for _, task := range scheduler.segmentTasks {
taskType := GetTaskType(task)
switch taskType {
case TaskTypeGrow:
segmentGrowNum++
case TaskTypeReduce:
segmentReduceNum++
case TaskTypeMove:
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
switch {
case len(task.Actions()) > 1:
segmentMoveNum++
case task.Actions()[0].Type() == ActionTypeGrow:
if _, ok := task.Actions()[0].(*SegmentAction); ok {
segmentGrowNum++
}
if _, ok := task.Actions()[0].(*LeaderAction); ok {
leaderGrowNum++
}
case task.Actions()[0].Type() == ActionTypeReduce:
if _, ok := task.Actions()[0].(*SegmentAction); ok {
segmentReduceNum++
}
if _, ok := task.Actions()[0].(*LeaderAction); ok {
leaderReduceNum++
}
case task.Actions()[0].Type() == ActionTypeUpdate:
if _, ok := task.Actions()[0].(*SegmentAction); ok {
segmentUpdateNum++
}
if _, ok := task.Actions()[0].(*LeaderAction); ok {
leaderUpdateNum++
}
}
}
return true
})
for _, task := range scheduler.channelTasks {
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
taskType := GetTaskType(task)
switch taskType {
case TaskTypeGrow:
@ -315,11 +333,18 @@ func (scheduler *taskScheduler) updateTaskMetrics() {
case TaskTypeMove:
channelMoveNum++
}
}
return true
})
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentGrowTaskLabel).Set(float64(segmentGrowNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentReduceTaskLabel).Set(float64(segmentReduceNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentMoveTaskLabel).Set(float64(segmentMoveNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.SegmentUpdateTaskLabel).Set(float64(segmentUpdateNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderGrowTaskLabel).Set(float64(leaderGrowNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderReduceTaskLabel).Set(float64(leaderReduceNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.LeaderUpdateTaskLabel).Set(float64(leaderUpdateNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelGrowTaskLabel).Set(float64(channelGrowNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelReduceTaskLabel).Set(float64(channelReduceNum))
metrics.QueryCoordTaskNum.WithLabelValues(metrics.ChannelMoveTaskLabel).Set(float64(channelMoveNum))
@ -332,7 +357,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)
if old, ok := scheduler.segmentTasks[index]; ok {
if old, ok := scheduler.segmentTasks.Get(index); ok {
if task.Priority() > old.Priority() {
log.Info("replace old task, the new one with higher priority",
zap.Int64("oldID", old.ID()),
@ -365,7 +390,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
if old, ok := scheduler.channelTasks[index]; ok {
if old, ok := scheduler.channelTasks.Get(index); ok {
if task.Priority() > old.Priority() {
log.Info("replace old task, the new one with higher priority",
zap.Int64("oldID", old.ID()),
@ -398,7 +423,7 @@ func (scheduler *taskScheduler) preAdd(task Task) error {
}
case *LeaderTask:
index := NewReplicaLeaderIndex(task)
if old, ok := scheduler.segmentTasks[index]; ok {
if old, ok := scheduler.segmentTasks.Get(index); ok {
if task.Priority() > old.Priority() {
log.Info("replace old task, the new one with higher priority",
zap.Int64("oldID", old.ID()),
@ -477,46 +502,40 @@ func (scheduler *taskScheduler) Dispatch(node int64) {
log.Info("scheduler stopped")
default:
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
scheduler.schedule(node)
}
}
func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
targetActions := make([]Action, 0)
for _, t := range scheduler.segmentTasks {
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, t Task) bool {
if collectionID != -1 && collectionID != t.CollectionID() {
continue
return true
}
for _, action := range t.Actions() {
if action.Node() == nodeID {
targetActions = append(targetActions, action)
}
}
}
return true
})
return scheduler.calculateTaskDelta(collectionID, targetActions)
}
func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
targetActions := make([]Action, 0)
for _, t := range scheduler.channelTasks {
scheduler.channelTasks.Range(func(_ replicaChannelIndex, t Task) bool {
if collectionID != -1 && collectionID != t.CollectionID() {
continue
return true
}
for _, action := range t.Actions() {
if action.Node() == nodeID {
targetActions = append(targetActions, action)
}
}
}
return true
})
return scheduler.calculateTaskDelta(collectionID, targetActions)
}
@ -548,10 +567,7 @@ func (scheduler *taskScheduler) calculateTaskDelta(collectionID int64, targetAct
}
func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
executor, ok := scheduler.executors[nodeID]
executor, ok := scheduler.executors.Get(nodeID)
if !ok {
return nil
}
@ -560,17 +576,11 @@ func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} {
}
func (scheduler *taskScheduler) GetChannelTaskNum() int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
return len(scheduler.channelTasks)
return scheduler.channelTasks.Len()
}
func (scheduler *taskScheduler) GetSegmentTaskNum() int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()
return len(scheduler.segmentTasks)
return scheduler.segmentTasks.Len()
}
// schedule selects some tasks to execute, follow these steps for each started selected tasks:
@ -593,8 +603,8 @@ func (scheduler *taskScheduler) schedule(node int64) {
log.Debug("process tasks related to node",
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
zap.Int("channelTaskNum", len(scheduler.channelTasks)),
zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()),
zap.Int("channelTaskNum", scheduler.channelTasks.Len()),
)
// Process tasks
@ -642,8 +652,8 @@ func (scheduler *taskScheduler) schedule(node int64) {
log.Info("process tasks related to node done",
zap.Int("processingTaskNum", scheduler.processQueue.Len()),
zap.Int("waitingTaskNum", scheduler.waitQueue.Len()),
zap.Int("segmentTaskNum", len(scheduler.segmentTasks)),
zap.Int("channelTaskNum", len(scheduler.channelTasks)),
zap.Int("segmentTaskNum", scheduler.segmentTasks.Len()),
zap.Int("channelTaskNum", scheduler.channelTasks.Len()),
)
}
@ -739,7 +749,7 @@ func (scheduler *taskScheduler) process(task Task) bool {
)
actions, step := task.Actions(), task.Step()
executor, ok := scheduler.executors[actions[step].Node()]
executor, ok := scheduler.executors.Get(actions[step].Node())
if !ok {
log.Warn("no executor for QueryNode",
zap.Int("step", step),
@ -760,19 +770,18 @@ func (scheduler *taskScheduler) check(task Task) error {
}
func (scheduler *taskScheduler) RemoveByNode(node int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
for _, task := range scheduler.segmentTasks {
scheduler.segmentTasks.Range(func(_ replicaSegmentIndex, task Task) bool {
if scheduler.isRelated(task, node) {
scheduler.remove(task)
}
}
for _, task := range scheduler.channelTasks {
return true
})
scheduler.channelTasks.Range(func(_ replicaChannelIndex, task Task) bool {
if scheduler.isRelated(task, node) {
scheduler.remove(task)
}
}
return true
})
}
func (scheduler *taskScheduler) recordSegmentTaskError(task *SegmentTask) {
@ -808,7 +817,7 @@ func (scheduler *taskScheduler) remove(task Task) {
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)
delete(scheduler.segmentTasks, index)
scheduler.segmentTasks.Remove(index)
log = log.With(zap.Int64("segmentID", task.SegmentID()))
if task.Status() == TaskStatusFailed &&
task.Err() != nil &&
@ -818,12 +827,12 @@ func (scheduler *taskScheduler) remove(task Task) {
case *ChannelTask:
index := replicaChannelIndex{task.ReplicaID(), task.Channel()}
delete(scheduler.channelTasks, index)
scheduler.channelTasks.Remove(index)
log = log.With(zap.String("channel", task.Channel()))
case *LeaderTask:
index := NewReplicaLeaderIndex(task)
delete(scheduler.segmentTasks, index)
scheduler.segmentTasks.Remove(index)
log = log.With(zap.Int64("segmentID", task.SegmentID()))
}

View File

@ -1475,10 +1475,10 @@ func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) {
suite.Equal(process, scheduler.processQueue.Len())
suite.Equal(wait, scheduler.waitQueue.Len())
suite.Len(scheduler.segmentTasks, segment)
suite.Len(scheduler.channelTasks, channel)
suite.Equal(len(scheduler.tasks), process+wait)
suite.Equal(len(scheduler.tasks), segment+channel)
suite.Equal(scheduler.segmentTasks.Len(), segment)
suite.Equal(scheduler.channelTasks.Len(), channel)
suite.Equal(scheduler.tasks.Len(), process+wait)
suite.Equal(scheduler.tasks.Len(), segment+channel)
}
func (suite *TaskSuite) dispatchAndWait(node int64) {
@ -1490,13 +1490,14 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
count = 0
keys = make([]any, 0)
for _, executor := range suite.scheduler.executors {
suite.scheduler.executors.Range(func(_ int64, executor *Executor) bool {
executor.executingTasks.Range(func(taskIndex string) bool {
keys = append(keys, taskIndex)
count++
return true
})
}
return true
})
if count == 0 {
return

View File

@ -36,6 +36,7 @@ const (
LeaderGrowTaskLabel = "leader_grow"
LeaderReduceTaskLabel = "leader_reduce"
LeaderUpdateTaskLabel = "leader_update"
UnknownTaskLabel = "unknown"