fix: task delta cache leak due to duplicate task id (#40184)

issue: #40052
pr: #40183

task delta cache rely on the taskID is unique, so it incDeltaCache at
AddTask, and decDeltaCache at RemoveTask, but the taskID allocator is
not atomic, which cause two task with same taskID, in such case, it will
call incDeltaCache twice, but call decDeltaCacheOnce, which cause delta
cache leak.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
pull/40269/head
wei liu 2025-02-28 10:22:08 +08:00 committed by GitHub
parent 00eda6fe31
commit 82c000a4b2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 95 additions and 45 deletions

View File

@ -163,39 +163,53 @@ func NewExecutingTaskDelta() *ExecutingTaskDelta {
}
// Add updates the taskDelta for the given nodeID and collectionID
func (etd *ExecutingTaskDelta) Add(nodeID int64, collectionID int64, taskID int64, delta int) {
func (etd *ExecutingTaskDelta) Add(task Task) {
etd.mu.Lock()
defer etd.mu.Unlock()
if etd.taskIDRecords.Contain(taskID) {
log.Warn("task already exists in delta cache", zap.Int64("taskID", taskID))
if etd.taskIDRecords.Contain(task.ID()) {
log.Warn("task already exists in delta cache",
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("taskID", task.ID()))
return
}
etd.taskIDRecords.Insert(taskID)
etd.taskIDRecords.Insert(task.ID())
if _, exists := etd.data[nodeID]; !exists {
etd.data[nodeID] = make(map[int64]int)
collectionID := task.CollectionID()
for _, action := range task.Actions() {
nodeID := action.Node()
delta := action.WorkLoadEffect()
if _, exists := etd.data[nodeID]; !exists {
etd.data[nodeID] = make(map[int64]int)
}
etd.data[nodeID][collectionID] += delta
}
etd.data[nodeID][collectionID] += delta
}
// Sub updates the taskDelta for the given nodeID and collectionID by subtracting delta
func (etd *ExecutingTaskDelta) Sub(nodeID int64, collectionID int64, taskID int64, delta int) {
func (etd *ExecutingTaskDelta) Sub(task Task) {
etd.mu.Lock()
defer etd.mu.Unlock()
if !etd.taskIDRecords.Contain(taskID) {
log.Warn("task doesn't exists in delta cache", zap.Int64("taskID", taskID))
if !etd.taskIDRecords.Contain(task.ID()) {
log.Warn("task already exists in delta cache",
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.Int64("taskID", task.ID()))
return
}
etd.taskIDRecords.Remove(taskID)
etd.taskIDRecords.Remove(task.ID())
collectionID := task.CollectionID()
for _, action := range task.Actions() {
nodeID := action.Node()
delta := action.WorkLoadEffect()
if _, exists := etd.data[nodeID]; !exists {
etd.data[nodeID] = make(map[int64]int)
}
if _, exists := etd.data[nodeID]; exists {
etd.data[nodeID][collectionID] -= delta
if etd.data[nodeID][collectionID] == 0 {
delete(etd.data[nodeID], collectionID)
}
if len(etd.data[nodeID]) == 0 {
delete(etd.data, nodeID)
}
}
}
@ -233,6 +247,13 @@ func (etd *ExecutingTaskDelta) printDetailInfos() {
}
}
func (etd *ExecutingTaskDelta) Clear() {
etd.mu.RLock()
defer etd.mu.RUnlock()
etd.data = make(map[int64]map[int64]int)
etd.taskIDRecords.Clear()
}
type Scheduler interface {
Start()
Stop()
@ -286,13 +307,12 @@ func NewScheduler(ctx context.Context,
cluster session.Cluster,
nodeMgr *session.NodeManager,
) *taskScheduler {
id := time.Now().UnixMilli()
id := atomic.NewInt64(time.Now().UnixMilli())
return &taskScheduler{
ctx: ctx,
executors: NewConcurrentMap[int64, *Executor](),
idAllocator: func() UniqueID {
id++
return id
return id.Inc()
},
distMgr: distMgr,
@ -619,26 +639,20 @@ func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64)
}
func (scheduler *taskScheduler) incExecutingTaskDelta(task Task) {
for _, action := range task.Actions() {
delta := action.WorkLoadEffect()
switch action.(type) {
case *SegmentAction:
scheduler.segmentTaskDelta.Add(action.Node(), task.CollectionID(), task.ID(), delta)
case *ChannelAction:
scheduler.channelTaskDelta.Add(action.Node(), task.CollectionID(), task.ID(), delta)
}
switch task := task.(type) {
case *SegmentTask:
scheduler.segmentTaskDelta.Add(task)
case *ChannelTask:
scheduler.channelTaskDelta.Add(task)
}
}
func (scheduler *taskScheduler) decExecutingTaskDelta(task Task) {
for _, action := range task.Actions() {
delta := action.WorkLoadEffect()
switch action.(type) {
case *SegmentAction:
scheduler.segmentTaskDelta.Sub(action.Node(), task.CollectionID(), task.ID(), delta)
case *ChannelAction:
scheduler.channelTaskDelta.Sub(action.Node(), task.CollectionID(), task.ID(), delta)
}
switch task := task.(type) {
case *SegmentTask:
scheduler.segmentTaskDelta.Sub(task)
case *ChannelTask:
scheduler.channelTaskDelta.Sub(task)
}
}
@ -956,11 +970,14 @@ func (scheduler *taskScheduler) remove(task Task) {
scheduler.processQueue.Remove(task)
if ok {
scheduler.decExecutingTaskDelta(task)
}
if scheduler.tasks.Len() == 0 {
scheduler.segmentTaskDelta.printDetailInfos()
scheduler.channelTaskDelta.printDetailInfos()
if scheduler.tasks.Len() == 0 {
// in case of task delta leak, try to print detail info before clear
scheduler.segmentTaskDelta.printDetailInfos()
scheduler.segmentTaskDelta.Clear()
scheduler.channelTaskDelta.printDetailInfos()
scheduler.channelTaskDelta.Clear()
}
}
switch task := task.(type) {

View File

@ -1832,6 +1832,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
suite.replica,
NewSegmentActionWithScope(nodeID, ActionTypeGrow, "", segmentID, querypb.DataScope_Historical, 100),
)
task1.SetID(1)
suite.NoError(err)
err = scheduler.Add(task1)
suite.NoError(err)
@ -1843,6 +1844,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
suite.replica,
NewChannelAction(nodeID, ActionTypeGrow, channelName),
)
task2.SetID(2)
suite.NoError(err)
err = scheduler.Add(task2)
suite.NoError(err)
@ -1860,6 +1862,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
NewSegmentActionWithScope(nodeID2, ActionTypeGrow, "", segmentID2, querypb.DataScope_Historical, 100),
)
suite.NoError(err)
task3.SetID(3)
err = scheduler.Add(task3)
suite.NoError(err)
task4, err := NewChannelTask(
@ -1871,6 +1874,7 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
NewChannelAction(nodeID2, ActionTypeGrow, channelName2),
)
suite.NoError(err)
task4.SetID(4)
err = scheduler.Add(task4)
suite.NoError(err)
@ -1906,6 +1910,21 @@ func (suite *TaskSuite) TestCalculateTaskDelta() {
suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID, coll))
suite.Equal(0, scheduler.GetSegmentTaskDelta(nodeID2, coll2))
suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID2, coll2))
task5, err := NewChannelTask(
ctx,
10*time.Second,
WrapIDSource(0),
coll2,
suite.replica,
NewChannelAction(nodeID2, ActionTypeGrow, channelName2),
)
suite.NoError(err)
task4.SetID(5)
scheduler.incExecutingTaskDelta(task5)
suite.Equal(1, scheduler.GetChannelTaskDelta(nodeID2, coll2))
scheduler.decExecutingTaskDelta(task5)
suite.Equal(0, scheduler.GetChannelTaskDelta(nodeID2, coll2))
}
func (suite *TaskSuite) TestTaskDeltaCache() {
@ -1916,14 +1935,28 @@ func (suite *TaskSuite) TestTaskDeltaCache() {
nodeID := int64(1)
collectionID := int64(100)
taskDelta = lo.Shuffle(taskDelta)
tasks := make([]Task, 0)
for i := 0; i < len(taskDelta); i++ {
etd.Add(nodeID, collectionID, int64(i), taskDelta[i])
task, _ := NewChannelTask(
context.TODO(),
10*time.Second,
WrapIDSource(0),
1,
suite.replica,
NewChannelAction(nodeID, ActionTypeGrow, "channel"),
)
task.SetID(int64(i))
tasks = append(tasks, task)
}
taskDelta = lo.Shuffle(taskDelta)
tasks = lo.Shuffle(tasks)
for i := 0; i < len(taskDelta); i++ {
etd.Sub(nodeID, collectionID, int64(i), taskDelta[i])
etd.Add(tasks[i])
}
tasks = lo.Shuffle(tasks)
for i := 0; i < len(taskDelta); i++ {
etd.Sub(tasks[i])
}
suite.Equal(0, etd.Get(nodeID, collectionID))
}