mirror of https://github.com/milvus-io/milvus.git
fix goroutine leak in querycoord (#19067)
Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com> Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>pull/19044/head
parent
39b847b67a
commit
2d733bcdc1
|
@ -457,6 +457,7 @@ func (scheduler *TaskScheduler) unmarshalTask(taskID UniqueID, t string) (task,
|
|||
meta: scheduler.meta,
|
||||
}
|
||||
newTask = handoffTask
|
||||
|
||||
default:
|
||||
err = errors.New("inValid msg type when unMarshal task")
|
||||
log.Error(err.Error())
|
||||
|
@ -732,7 +733,7 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
|
||||
// triggerTask may be LoadCollection, LoadPartitions, LoadBalance, Handoff
|
||||
if triggerTask.getResultInfo().ErrorCode == commonpb.ErrorCode_Success || triggerTask.getTriggerCondition() == querypb.TriggerCondition_NodeDown {
|
||||
err = updateSegmentInfoFromTask(scheduler.ctx, triggerTask, scheduler.meta)
|
||||
err = updateSegmentInfoFromTask(triggerTask, scheduler.meta)
|
||||
if err != nil {
|
||||
log.Warn("failed to update segment info", zap.Int64("taskID", triggerTask.getTaskID()), zap.Error(err))
|
||||
triggerTask.setResultInfo(err)
|
||||
|
@ -755,7 +756,7 @@ func (scheduler *TaskScheduler) scheduleLoop() {
|
|||
triggerTask.notify(errors.New(resultInfo.Reason))
|
||||
alreadyNotify = true
|
||||
}
|
||||
rollBackTasks := triggerTask.rollBack(scheduler.ctx)
|
||||
rollBackTasks := triggerTask.rollBack(triggerTask.traceCtx())
|
||||
log.Info("scheduleLoop: start rollBack after triggerTask failed",
|
||||
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
|
||||
zap.Any("rollBackTasks", rollBackTasks),
|
||||
|
@ -803,7 +804,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
|
|||
log.Info("waitActivateTaskDone: reSchedule the activate task",
|
||||
zap.Int64("taskID", t.getTaskID()),
|
||||
zap.Int64("triggerTaskID", triggerTask.getTaskID()))
|
||||
reScheduledTasks, err := t.reschedule(scheduler.ctx)
|
||||
reScheduledTasks, err := t.reschedule(triggerTask.traceCtx())
|
||||
if err != nil {
|
||||
log.Error("waitActivateTaskDone: reschedule task error",
|
||||
zap.Int64("taskID", t.getTaskID()),
|
||||
|
@ -1006,7 +1007,7 @@ func (scheduler *TaskScheduler) BindContext(ctx context.Context) (context.Contex
|
|||
return nCtx, cancel
|
||||
}
|
||||
|
||||
func updateSegmentInfoFromTask(ctx context.Context, triggerTask task, meta Meta) error {
|
||||
func updateSegmentInfoFromTask(triggerTask task, meta Meta) error {
|
||||
segmentInfosToSave := make(map[UniqueID][]*querypb.SegmentInfo)
|
||||
segmentInfosToRemove := make(map[UniqueID][]*querypb.SegmentInfo)
|
||||
var err error
|
||||
|
|
Loading…
Reference in New Issue