From 82fb0bf9c118d8530011f9dd16f427d7b843d4fd Mon Sep 17 00:00:00 2001 From: wei liu Date: Fri, 21 Feb 2025 16:49:54 +0800 Subject: [PATCH] fix: [2.5] task delta cache leak on reduce task (#40056) issue: #40052 pr: #40055 Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 43 ++++++++++++++++++++----- internal/querycoordv2/task/task_test.go | 20 ++++++++++++ 2 files changed, 55 insertions(+), 8 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 020a22738c..cab991a0de 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -151,19 +151,27 @@ func (queue *taskQueue) Range(fn func(task Task) bool) { type ExecutingTaskDelta struct { data map[int64]map[int64]int // nodeID -> collectionID -> taskDelta mu sync.RWMutex // Mutex to protect the map + + taskIDRecords UniqueSet } func NewExecutingTaskDelta() *ExecutingTaskDelta { return &ExecutingTaskDelta{ - data: make(map[int64]map[int64]int), + data: make(map[int64]map[int64]int), + taskIDRecords: NewUniqueSet(), } } // Add updates the taskDelta for the given nodeID and collectionID -func (etd *ExecutingTaskDelta) Add(nodeID int64, collectionID int64, delta int) { +func (etd *ExecutingTaskDelta) Add(nodeID int64, collectionID int64, taskID int64, delta int) { etd.mu.Lock() defer etd.mu.Unlock() + if etd.taskIDRecords.Contain(taskID) { + log.Warn("task already exists in delta cache", zap.Int64("taskID", taskID)) + } + etd.taskIDRecords.Insert(taskID) + if _, exists := etd.data[nodeID]; !exists { etd.data[nodeID] = make(map[int64]int) } @@ -171,13 +179,18 @@ func (etd *ExecutingTaskDelta) Add(nodeID int64, collectionID int64, delta int) } // Sub updates the taskDelta for the given nodeID and collectionID by subtracting delta -func (etd *ExecutingTaskDelta) Sub(nodeID int64, collectionID int64, delta int) { +func (etd *ExecutingTaskDelta) Sub(nodeID int64, collectionID int64, taskID int64, delta int) { etd.mu.Lock() defer etd.mu.Unlock() + if !etd.taskIDRecords.Contain(taskID) { + log.Warn("task doesn't exists in delta cache", zap.Int64("taskID", taskID)) + } + etd.taskIDRecords.Remove(taskID) + if _, exists := etd.data[nodeID]; exists { etd.data[nodeID][collectionID] -= delta - if etd.data[nodeID][collectionID] <= 0 { + if etd.data[nodeID][collectionID] == 0 { delete(etd.data[nodeID], collectionID) } if len(etd.data[nodeID]) == 0 { @@ -211,6 +224,15 @@ func (etd *ExecutingTaskDelta) Get(nodeID, collectionID int64) int { return sum } +func (etd *ExecutingTaskDelta) printDetailInfos() { + etd.mu.RLock() + defer etd.mu.RUnlock() + + if etd.taskIDRecords.Len() > 0 { + log.Info("task delta cache info", zap.Any("taskIDRecords", etd.taskIDRecords.Collect()), zap.Any("data", etd.data)) + } +} + type Scheduler interface { Start() Stop() @@ -601,9 +623,9 @@ func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) { delta := action.WorkLoadEffect() switch action.(type) { case *SegmentAction: - scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), delta) + scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), task.ID(), delta) case *ChannelAction: - scheduler.channelTaskDelta.Add(action.Node(), task.CollectionID(), delta) + scheduler.channelTaskDelta.Add(action.Node(), task.CollectionID(), task.ID(), delta) } } } @@ -613,9 +635,9 @@ func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) { delta := action.WorkLoadEffect() switch action.(type) { case *SegmentAction: - scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), delta) + scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), task.ID(), delta) case *ChannelAction: - scheduler.channelTaskDelta.Sub(action.Node(), task.CollectionID(), delta) + scheduler.channelTaskDelta.Sub(action.Node(), task.CollectionID(), task.ID(), delta) } } } @@ -936,6 +958,11 @@ func (scheduler *taskScheduler) remove(task Task) { scheduler.decExecutingTaskDelta(task) } + if scheduler.tasks.Len() == 0 { + scheduler.segmentTaskDelta.printDetailInfos() + scheduler.channelTaskDelta.printDetailInfos() + } + switch task := task.(type) { case *SegmentTask: index := NewReplicaSegmentIndex(task) diff --git a/internal/querycoordv2/task/task_test.go b/internal/querycoordv2/task/task_test.go index 8b604824a6..fde2e516f7 100644 --- a/internal/querycoordv2/task/task_test.go +++ b/internal/querycoordv2/task/task_test.go @@ -1908,6 +1908,26 @@ func (suite *TaskSuite) TestCalculateTaskDelta() { suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID2, coll2)) } +func (suite *TaskSuite) TestTaskDeltaCache() { + etd := NewExecutingTaskDelta() + + taskDelta := []int{1, 2, 3, 4, 5, -6, -7, -8, -9, -10} + + nodeID := int64(1) + collectionID := int64(100) + + taskDelta = lo.Shuffle(taskDelta) + for i := 0; i < len(taskDelta); i++ { + etd.Add(nodeID, collectionID, int64(i), taskDelta[i]) + } + + taskDelta = lo.Shuffle(taskDelta) + for i := 0; i < len(taskDelta); i++ { + etd.Sub(nodeID, collectionID, int64(i), taskDelta[i]) + } + suite.Equal(0, etd.Get(nodeID, collectionID)) +} + func (suite *TaskSuite) TestRemoveTaskWithError() { ctx := context.Background() scheduler := suite.newScheduler()