mirror of https://github.com/milvus-io/milvus.git
issue: #36536 pr: #36537 query coord use `segmentTaskDeleta/channelTaskDelta` to measure the executing workload for querynode in scheduler, and we maintains the `segmentTaskDeleta/channelTaskDelta` by `scheulder.Add(task)` and `scheduler.remove(task)`, but `scheduler.remove(task)` has been called in unexpected way, which cause a wrong `segmentTaskDeleta/channelTaskDelta` value and affect the segment assign logic, causes segment unbalance. This PR moves to compute the `segmentTaskDeleta/channelTaskDelta` when access, to avoid the wrong value affect. Signed-off-by: Wei Liu <wei.liu@zilliz.com>pull/36574/head
parent
07e1bc8c08
commit
74af00ba8c
|
@ -167,11 +167,6 @@ type taskScheduler struct {
|
|||
channelTasks map[replicaChannelIndex]Task
|
||||
processQueue *taskQueue
|
||||
waitQueue *taskQueue
|
||||
|
||||
// executing task delta changes on node: nodeID -> collectionID -> delta changes
|
||||
// delta changes measure by segment row count and channel num
|
||||
segmentExecutingTaskDelta map[int64]map[int64]int
|
||||
channelExecutingTaskDelta map[int64]map[int64]int
|
||||
}
|
||||
|
||||
func NewScheduler(ctx context.Context,
|
||||
|
@ -198,13 +193,11 @@ func NewScheduler(ctx context.Context,
|
|||
cluster: cluster,
|
||||
nodeMgr: nodeMgr,
|
||||
|
||||
tasks: make(UniqueSet),
|
||||
segmentTasks: make(map[replicaSegmentIndex]Task),
|
||||
channelTasks: make(map[replicaChannelIndex]Task),
|
||||
processQueue: newTaskQueue(),
|
||||
waitQueue: newTaskQueue(),
|
||||
segmentExecutingTaskDelta: make(map[int64]map[int64]int),
|
||||
channelExecutingTaskDelta: make(map[int64]map[int64]int),
|
||||
tasks: make(UniqueSet),
|
||||
segmentTasks: make(map[replicaSegmentIndex]Task),
|
||||
channelTasks: make(map[replicaChannelIndex]Task),
|
||||
processQueue: newTaskQueue(),
|
||||
waitQueue: newTaskQueue(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,8 +210,6 @@ func (scheduler *taskScheduler) Stop() {
|
|||
for nodeID, executor := range scheduler.executors {
|
||||
executor.Stop()
|
||||
delete(scheduler.executors, nodeID)
|
||||
delete(scheduler.segmentExecutingTaskDelta, nodeID)
|
||||
delete(scheduler.channelExecutingTaskDelta, nodeID)
|
||||
}
|
||||
|
||||
for _, task := range scheduler.segmentTasks {
|
||||
|
@ -244,8 +235,6 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
|
|||
scheduler.cluster,
|
||||
scheduler.nodeMgr)
|
||||
|
||||
scheduler.segmentExecutingTaskDelta[nodeID] = make(map[int64]int)
|
||||
scheduler.channelExecutingTaskDelta[nodeID] = make(map[int64]int)
|
||||
scheduler.executors[nodeID] = executor
|
||||
executor.Start(scheduler.ctx)
|
||||
log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
|
||||
|
@ -259,8 +248,6 @@ func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
|
|||
if ok {
|
||||
executor.Stop()
|
||||
delete(scheduler.executors, nodeID)
|
||||
delete(scheduler.segmentExecutingTaskDelta, nodeID)
|
||||
delete(scheduler.channelExecutingTaskDelta, nodeID)
|
||||
log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
|
||||
}
|
||||
}
|
||||
|
@ -293,52 +280,11 @@ func (scheduler *taskScheduler) Add(task Task) error {
|
|||
}
|
||||
|
||||
scheduler.updateTaskMetrics()
|
||||
scheduler.updateTaskDelta(task)
|
||||
|
||||
log.Ctx(task.Context()).Info("task added", zap.String("task", task.String()))
|
||||
task.RecordStartTs()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) updateTaskDelta(task Task) {
|
||||
var delta int
|
||||
var deltaMap map[int64]map[int64]int
|
||||
switch task := task.(type) {
|
||||
case *SegmentTask:
|
||||
// skip growing segment's count, cause doesn't know realtime row number of growing segment
|
||||
if task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Historical {
|
||||
segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst)
|
||||
if segment != nil {
|
||||
delta = int(segment.GetNumOfRows())
|
||||
}
|
||||
}
|
||||
|
||||
deltaMap = scheduler.segmentExecutingTaskDelta
|
||||
|
||||
case *ChannelTask:
|
||||
delta = 1
|
||||
deltaMap = scheduler.channelExecutingTaskDelta
|
||||
}
|
||||
|
||||
// turn delta to negative when try to remove task
|
||||
if task.Status() == TaskStatusSucceeded || task.Status() == TaskStatusFailed || task.Status() == TaskStatusCanceled {
|
||||
delta = -delta
|
||||
}
|
||||
|
||||
if delta != 0 {
|
||||
for _, action := range task.Actions() {
|
||||
if deltaMap[action.Node()] == nil {
|
||||
deltaMap[action.Node()] = make(map[int64]int)
|
||||
}
|
||||
if action.Type() == ActionTypeGrow {
|
||||
deltaMap[action.Node()][task.CollectionID()] += delta
|
||||
} else if action.Type() == ActionTypeReduce {
|
||||
deltaMap[action.Node()][task.CollectionID()] -= delta
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) updateTaskMetrics() {
|
||||
segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
|
||||
channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
|
||||
|
@ -533,34 +479,63 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64)
|
|||
scheduler.rwmutex.RLock()
|
||||
defer scheduler.rwmutex.RUnlock()
|
||||
|
||||
return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.segmentExecutingTaskDelta)
|
||||
targetActions := make([]Action, 0)
|
||||
for _, t := range scheduler.segmentTasks {
|
||||
if collectionID != -1 && collectionID != t.CollectionID() {
|
||||
continue
|
||||
}
|
||||
for _, action := range t.Actions() {
|
||||
if action.Node() == nodeID {
|
||||
targetActions = append(targetActions, action)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return scheduler.calculateTaskDelta(collectionID, targetActions)
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int {
|
||||
scheduler.rwmutex.RLock()
|
||||
defer scheduler.rwmutex.RUnlock()
|
||||
|
||||
return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.channelExecutingTaskDelta)
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) calculateTaskDelta(nodeID, collectionID int64, deltaMap map[int64]map[int64]int) int {
|
||||
if nodeID == -1 && collectionID == -1 {
|
||||
return 0
|
||||
}
|
||||
|
||||
sum := 0
|
||||
for nid, nInfo := range deltaMap {
|
||||
if nid != nodeID && -1 != nodeID {
|
||||
targetActions := make([]Action, 0)
|
||||
for _, t := range scheduler.channelTasks {
|
||||
if collectionID != -1 && collectionID != t.CollectionID() {
|
||||
continue
|
||||
}
|
||||
|
||||
for cid, cInfo := range nInfo {
|
||||
if cid == collectionID || -1 == collectionID {
|
||||
sum += cInfo
|
||||
for _, action := range t.Actions() {
|
||||
if action.Node() == nodeID {
|
||||
targetActions = append(targetActions, action)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return scheduler.calculateTaskDelta(collectionID, targetActions)
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) calculateTaskDelta(collectionID int64, targetActions []Action) int {
|
||||
sum := 0
|
||||
for _, action := range targetActions {
|
||||
delta := 0
|
||||
if action.Type() == ActionTypeGrow {
|
||||
delta = 1
|
||||
} else if action.Type() == ActionTypeReduce {
|
||||
delta = -1
|
||||
}
|
||||
|
||||
switch action := action.(type) {
|
||||
case *SegmentAction:
|
||||
// skip growing segment's count, cause doesn't know realtime row number of growing segment
|
||||
if action.Scope() == querypb.DataScope_Historical {
|
||||
segment := scheduler.targetMgr.GetSealedSegment(collectionID, action.segmentID, meta.NextTargetFirst)
|
||||
if segment != nil {
|
||||
sum += int(segment.GetNumOfRows()) * delta
|
||||
}
|
||||
}
|
||||
case *ChannelAction:
|
||||
sum += delta
|
||||
}
|
||||
}
|
||||
return sum
|
||||
}
|
||||
|
||||
|
@ -836,7 +811,6 @@ func (scheduler *taskScheduler) remove(task Task) {
|
|||
log = log.With(zap.Int64("segmentID", task.SegmentID()))
|
||||
}
|
||||
|
||||
scheduler.updateTaskDelta(task)
|
||||
scheduler.updateTaskMetrics()
|
||||
log.Info("task removed")
|
||||
|
||||
|
|
Loading…
Reference in New Issue