mirror of https://github.com/milvus-io/milvus.git
Fix queryCoord init deadlock when restart (#19402)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com> Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/19476/head
parent
6e9c8f7cf3
commit
b85118cc03
|
@ -159,6 +159,7 @@ queryCoord:
|
|||
overloadedMemoryThresholdPercentage: 90 # The threshold percentage that memory overload
|
||||
balanceIntervalSeconds: 60
|
||||
memoryUsageMaxDifferencePercentage: 30
|
||||
maxTask: 2048
|
||||
|
||||
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.
|
||||
queryNode:
|
||||
|
|
|
@ -163,8 +163,8 @@ func (queue *taskQueue) popTask() task {
|
|||
func newTaskQueue() *taskQueue {
|
||||
return &taskQueue{
|
||||
tasks: list.New(),
|
||||
maxTask: 1024,
|
||||
taskChan: make(chan int, 1024),
|
||||
maxTask: Params.QueryCoordCfg.MaxTask,
|
||||
taskChan: make(chan int, Params.QueryCoordCfg.MaxTask),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,7 +193,7 @@ func newTaskScheduler(ctx context.Context,
|
|||
broker *globalMetaBroker,
|
||||
idAllocator func() (UniqueID, error)) (*TaskScheduler, error) {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
taskChan := make(chan task, 1024)
|
||||
taskChan := make(chan task, Params.QueryCoordCfg.MaxTask)
|
||||
stopTaskLoopChan := make(chan int, 1)
|
||||
s := &TaskScheduler{
|
||||
ctx: ctx1,
|
||||
|
@ -279,33 +279,32 @@ func (scheduler *TaskScheduler) reloadFromKV() error {
|
|||
triggerTasks[taskID].setState(state)
|
||||
}
|
||||
|
||||
// triggerTaskQueue's size is 1024, if the size of triggerTasks if large than 1024 the loop will be blocked,
|
||||
// so run it in a standalone goroutine
|
||||
go func() {
|
||||
var doneTriggerTask task
|
||||
var allTriggerTasks []task
|
||||
for _, t := range triggerTasks {
|
||||
if t.getState() != taskDone {
|
||||
allTriggerTasks = append(allTriggerTasks, t)
|
||||
} else {
|
||||
doneTriggerTask = t
|
||||
}
|
||||
var doneTriggerTask task
|
||||
var allTriggerTasks []task
|
||||
for _, t := range triggerTasks {
|
||||
if t.getState() != taskDone {
|
||||
allTriggerTasks = append(allTriggerTasks, t)
|
||||
} else {
|
||||
doneTriggerTask = t
|
||||
}
|
||||
if doneTriggerTask != nil {
|
||||
for _, childTask := range activeTasks {
|
||||
childTask.setParentTask(doneTriggerTask) //replace child task after reScheduler
|
||||
doneTriggerTask.addChildTask(childTask)
|
||||
}
|
||||
doneTriggerTask.setResultInfo(nil)
|
||||
scheduler.triggerTaskQueue.addTask(doneTriggerTask)
|
||||
}
|
||||
if doneTriggerTask != nil {
|
||||
for _, childTask := range activeTasks {
|
||||
childTask.setParentTask(doneTriggerTask) //replace child task after reScheduler
|
||||
doneTriggerTask.addChildTask(childTask)
|
||||
}
|
||||
sort.Slice(allTriggerTasks, func(i, j int) bool {
|
||||
return allTriggerTasks[i].getTaskID() < allTriggerTasks[j].getTaskID()
|
||||
})
|
||||
for _, t := range allTriggerTasks {
|
||||
scheduler.triggerTaskQueue.addTask(t)
|
||||
doneTriggerTask.setResultInfo(nil)
|
||||
scheduler.triggerTaskQueue.addTask(doneTriggerTask)
|
||||
}
|
||||
sort.Slice(allTriggerTasks, func(i, j int) bool {
|
||||
return allTriggerTasks[i].getTaskID() < allTriggerTasks[j].getTaskID()
|
||||
})
|
||||
for _, t := range allTriggerTasks {
|
||||
if scheduler.triggerTaskQueue.taskFull() {
|
||||
panic("Quercoord init failed: task sum out of range, should amplify queryCoord.maxTask")
|
||||
}
|
||||
}()
|
||||
scheduler.triggerTaskQueue.addTask(t)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -559,6 +559,7 @@ func TestTaskScheduler_BindContext(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestTaskScheduler_willLoadOrRelease(t *testing.T) {
|
||||
refreshParams()
|
||||
ctx := context.Background()
|
||||
queryCoord := &QueryCoord{}
|
||||
|
||||
|
|
|
@ -596,6 +596,9 @@ type queryCoordConfig struct {
|
|||
OverloadedMemoryThresholdPercentage float64
|
||||
BalanceIntervalSeconds int64
|
||||
MemoryUsageMaxDifferencePercentage float64
|
||||
|
||||
//---- Task ---
|
||||
MaxTask int64
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
|
@ -609,6 +612,9 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
|||
p.initOverloadedMemoryThresholdPercentage()
|
||||
p.initBalanceIntervalSeconds()
|
||||
p.initMemoryUsageMaxDifferencePercentage()
|
||||
|
||||
//---- Task ---
|
||||
p.initMaxTask()
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initAutoHandoff() {
|
||||
|
@ -658,6 +664,15 @@ func (p *queryCoordConfig) initMemoryUsageMaxDifferencePercentage() {
|
|||
p.MemoryUsageMaxDifferencePercentage = float64(diffPercentage) / 100
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initMaxTask() {
|
||||
maxDiff := p.Base.LoadWithDefault("queryCoord.maxTask", "2048")
|
||||
MaxTask, err := strconv.ParseInt(maxDiff, 10, 64)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
p.MaxTask = MaxTask
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) SetNodeID(id UniqueID) {
|
||||
p.NodeID.Store(id)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue