fix: [2.5] task delta cache leak on reduce task (#40056)

issue: #40052
pr: #40055

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/40097/head
wei liu 2025-02-21 16:49:54 +08:00 committed by GitHub
parent e42c944e04
commit 82fb0bf9c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 55 additions and 8 deletions

View File

@ -151,19 +151,27 @@ func (queue *taskQueue) Range(fn func(task Task) bool) {
type ExecutingTaskDelta struct {
data map[int64]map[int64]int // nodeID -> collectionID -> taskDelta
mu sync.RWMutex // Mutex to protect the map
taskIDRecords UniqueSet
}
func NewExecutingTaskDelta() *ExecutingTaskDelta {
return &ExecutingTaskDelta{
data: make(map[int64]map[int64]int),
data: make(map[int64]map[int64]int),
taskIDRecords: NewUniqueSet(),
}
}
// Add updates the taskDelta for the given nodeID and collectionID
func (etd *ExecutingTaskDelta) Add(nodeID int64, collectionID int64, delta int) {
func (etd *ExecutingTaskDelta) Add(nodeID int64, collectionID int64, taskID int64, delta int) {
etd.mu.Lock()
defer etd.mu.Unlock()
if etd.taskIDRecords.Contain(taskID) {
log.Warn("task already exists in delta cache", zap.Int64("taskID", taskID))
}
etd.taskIDRecords.Insert(taskID)
if _, exists := etd.data[nodeID]; !exists {
etd.data[nodeID] = make(map[int64]int)
}
@ -171,13 +179,18 @@ func (etd *ExecutingTaskDelta) Add(nodeID int64, collectionID int64, delta int)
}
// Sub updates the taskDelta for the given nodeID and collectionID by subtracting delta
func (etd *ExecutingTaskDelta) Sub(nodeID int64, collectionID int64, delta int) {
func (etd *ExecutingTaskDelta) Sub(nodeID int64, collectionID int64, taskID int64, delta int) {
etd.mu.Lock()
defer etd.mu.Unlock()
if !etd.taskIDRecords.Contain(taskID) {
log.Warn("task doesn't exists in delta cache", zap.Int64("taskID", taskID))
}
etd.taskIDRecords.Remove(taskID)
if _, exists := etd.data[nodeID]; exists {
etd.data[nodeID][collectionID] -= delta
if etd.data[nodeID][collectionID] <= 0 {
if etd.data[nodeID][collectionID] == 0 {
delete(etd.data[nodeID], collectionID)
}
if len(etd.data[nodeID]) == 0 {
@ -211,6 +224,15 @@ func (etd *ExecutingTaskDelta) Get(nodeID, collectionID int64) int {
return sum
}
func (etd *ExecutingTaskDelta) printDetailInfos() {
etd.mu.RLock()
defer etd.mu.RUnlock()
if etd.taskIDRecords.Len() > 0 {
log.Info("task delta cache info", zap.Any("taskIDRecords", etd.taskIDRecords.Collect()), zap.Any("data", etd.data))
}
}
type Scheduler interface {
Start()
Stop()
@ -601,9 +623,9 @@ func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) {
delta := action.WorkLoadEffect()
switch action.(type) {
case *SegmentAction:
scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), delta)
scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), task.ID(), delta)
case *ChannelAction:
scheduler.channelTaskDelta.Add(action.Node(), task.CollectionID(), delta)
scheduler.channelTaskDelta.Add(action.Node(), task.CollectionID(), task.ID(), delta)
}
}
}
@ -613,9 +635,9 @@ func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) {
delta := action.WorkLoadEffect()
switch action.(type) {
case *SegmentAction:
scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), delta)
scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), task.ID(), delta)
case *ChannelAction:
scheduler.channelTaskDelta.Sub(action.Node(), task.CollectionID(), delta)
scheduler.channelTaskDelta.Sub(action.Node(), task.CollectionID(), task.ID(), delta)
}
}
}
@ -936,6 +958,11 @@ func (scheduler *taskScheduler) remove(task Task) {
scheduler.decExecutingTaskDelta(task)
}
if scheduler.tasks.Len() == 0 {
scheduler.segmentTaskDelta.printDetailInfos()
scheduler.channelTaskDelta.printDetailInfos()
}
switch task := task.(type) {
case *SegmentTask:
index := NewReplicaSegmentIndex(task)

View File

@ -1908,6 +1908,26 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID2, coll2))
}
func (suite *TaskSuite) TestTaskDeltaCache() {
etd := NewExecutingTaskDelta()
taskDelta := []int{1, 2, 3, 4, 5, -6, -7, -8, -9, -10}
nodeID := int64(1)
collectionID := int64(100)
taskDelta = lo.Shuffle(taskDelta)
for i := 0; i < len(taskDelta); i++ {
etd.Add(nodeID, collectionID, int64(i), taskDelta[i])
}
taskDelta = lo.Shuffle(taskDelta)
for i := 0; i < len(taskDelta); i++ {
etd.Sub(nodeID, collectionID, int64(i), taskDelta[i])
}
suite.Equal(0, etd.Get(nodeID, collectionID))
}
func (suite *TaskSuite) TestRemoveTaskWithError() {
ctx := context.Background()
scheduler := suite.newScheduler()