Fix compaction unable to notify datanode (#28409)

See also: #28214, #28368

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
pull/28425/head
XuanYang-cn 2023-11-14 16:38:18 +08:00 committed by GitHub
parent 40d5c902b6
commit 0be22ccacd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 7 additions and 5 deletions

View File

@ -349,9 +349,11 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
for _, task := range tasks {
// avoid closure capture iteration variable
innerTask := task
getOrCreateIOPool().Submit(func() (any, error) {
plan := task.plan
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", task.dataNodeID))
plan := innerTask.plan
log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", innerTask.dataNodeID))
log.Info("Notify compaction task to DataNode")
ts, err := c.allocator.allocTimestamp(context.TODO())
if err != nil {
@ -360,9 +362,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout))
return nil, err
}
c.updateTask(task.plan.PlanID, setStartTime(ts))
err = c.sessions.Compaction(task.dataNodeID, task.plan)
c.updateTask(task.plan.PlanID, setState(executing))
c.updateTask(plan.PlanID, setStartTime(ts))
err = c.sessions.Compaction(innerTask.dataNodeID, plan)
c.updateTask(plan.PlanID, setState(executing))
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
return nil, err