From 6335c8bf69e4fd6327064c73684f52d927a445fb Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Sat, 28 Sep 2024 17:29:21 +0800 Subject: [PATCH] fix: [cherry-pick] Record the nodeID before assigning tasks (#36493) issue: #33744 master pr: #36371 Signed-off-by: Cai Zhang --- internal/datacoord/analyze_meta.go | 11 ++++++----- internal/datacoord/analyze_meta_test.go | 12 ++++++------ internal/datacoord/index_meta.go | 8 ++++---- internal/datacoord/index_meta_test.go | 12 ++++++------ internal/datacoord/task_analyze.go | 15 +++++++++------ internal/datacoord/task_index.go | 13 ++++++++----- internal/datacoord/task_scheduler.go | 4 ++-- internal/datacoord/types.go | 4 ++-- 8 files changed, 43 insertions(+), 36 deletions(-) diff --git a/internal/datacoord/analyze_meta.go b/internal/datacoord/analyze_meta.go index 02b4d92d2e..83871069cb 100644 --- a/internal/datacoord/analyze_meta.go +++ b/internal/datacoord/analyze_meta.go @@ -110,7 +110,7 @@ func (m *analyzeMeta) DropAnalyzeTask(taskID int64) error { return nil } -func (m *analyzeMeta) UpdateVersion(taskID int64) error { +func (m *analyzeMeta) UpdateVersion(taskID, nodeID int64) error { m.Lock() defer m.Unlock() @@ -121,11 +121,13 @@ func (m *analyzeMeta) UpdateVersion(taskID int64) error { cloneT := proto.Clone(t).(*indexpb.AnalyzeTask) cloneT.Version++ - log.Info("update task version", zap.Int64("taskID", taskID), zap.Int64("newVersion", cloneT.Version)) + cloneT.NodeID = nodeID + log.Info("update task version", zap.Int64("taskID", taskID), + zap.Int64("newVersion", cloneT.Version), zap.Int64("nodeID", nodeID)) return m.saveTask(cloneT) } -func (m *analyzeMeta) BuildingTask(taskID, nodeID int64) error { +func (m *analyzeMeta) BuildingTask(taskID int64) error { m.Lock() defer m.Unlock() @@ -135,9 +137,8 @@ func (m *analyzeMeta) BuildingTask(taskID, nodeID int64) error { } cloneT := proto.Clone(t).(*indexpb.AnalyzeTask) - cloneT.NodeID = nodeID cloneT.State = indexpb.JobState_JobStateInProgress - log.Info("task will be building", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID)) + log.Info("task will be building", zap.Int64("taskID", taskID)) return m.saveTask(cloneT) } diff --git a/internal/datacoord/analyze_meta_test.go b/internal/datacoord/analyze_meta_test.go index fdecb64796..ee06c59797 100644 --- a/internal/datacoord/analyze_meta_test.go +++ b/internal/datacoord/analyze_meta_test.go @@ -141,13 +141,13 @@ func (s *AnalyzeMetaSuite) Test_AnalyzeMeta() { }) s.Run("UpdateVersion", func() { - err := am.UpdateVersion(1) + err := am.UpdateVersion(1, 1) s.NoError(err) s.Equal(int64(1), am.GetTask(1).Version) }) s.Run("BuildingTask", func() { - err := am.BuildingTask(1, 1) + err := am.BuildingTask(1) s.NoError(err) s.Equal(indexpb.JobState_JobStateInProgress, am.GetTask(1).State) }) @@ -217,19 +217,19 @@ func (s *AnalyzeMetaSuite) Test_failCase() { }) s.Run("UpdateVersion", func() { - err := am.UpdateVersion(777) + err := am.UpdateVersion(777, 1) s.Error(err) - err = am.UpdateVersion(1) + err = am.UpdateVersion(1, 1) s.Error(err) s.Equal(int64(0), am.GetTask(1).Version) }) s.Run("BuildingTask", func() { - err := am.BuildingTask(777, 1) + err := am.BuildingTask(777) s.Error(err) - err = am.BuildingTask(1, 1) + err = am.BuildingTask(1) s.Error(err) s.Equal(int64(0), am.GetTask(1).NodeID) s.Equal(indexpb.JobState_JobStateInit, am.GetTask(1).State) diff --git a/internal/datacoord/index_meta.go b/internal/datacoord/index_meta.go index d7d0652f87..4027de6f35 100644 --- a/internal/datacoord/index_meta.go +++ b/internal/datacoord/index_meta.go @@ -674,11 +674,11 @@ func (m *indexMeta) IsIndexExist(collID, indexID UniqueID) bool { } // UpdateVersion updates the version and nodeID of the index meta, whenever the task is built once, the version will be updated once. -func (m *indexMeta) UpdateVersion(buildID UniqueID) error { +func (m *indexMeta) UpdateVersion(buildID, nodeID UniqueID) error { m.Lock() defer m.Unlock() - log.Debug("IndexCoord metaTable UpdateVersion receive", zap.Int64("buildID", buildID)) + log.Debug("IndexCoord metaTable UpdateVersion receive", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID)) segIdx, ok := m.buildID2SegmentIndex[buildID] if !ok { return fmt.Errorf("there is no index with buildID: %d", buildID) @@ -686,6 +686,7 @@ func (m *indexMeta) UpdateVersion(buildID UniqueID) error { updateFunc := func(segIdx *model.SegmentIndex) error { segIdx.IndexVersion++ + segIdx.NodeID = nodeID return m.alterSegmentIndexes([]*model.SegmentIndex{segIdx}) } @@ -748,7 +749,7 @@ func (m *indexMeta) DeleteTask(buildID int64) error { } // BuildIndex set the index state to be InProgress. It means IndexNode is building the index. -func (m *indexMeta) BuildIndex(buildID, nodeID UniqueID) error { +func (m *indexMeta) BuildIndex(buildID UniqueID) error { m.Lock() defer m.Unlock() @@ -758,7 +759,6 @@ func (m *indexMeta) BuildIndex(buildID, nodeID UniqueID) error { } updateFunc := func(segIdx *model.SegmentIndex) error { - segIdx.NodeID = nodeID segIdx.IndexState = commonpb.IndexState_InProgress err := m.alterSegmentIndexes([]*model.SegmentIndex{segIdx}) diff --git a/internal/datacoord/index_meta_test.go b/internal/datacoord/index_meta_test.go index 71b8486973..e6c5043b94 100644 --- a/internal/datacoord/index_meta_test.go +++ b/internal/datacoord/index_meta_test.go @@ -1116,18 +1116,18 @@ func TestMeta_UpdateVersion(t *testing.T) { ).Return(errors.New("fail")) t.Run("success", func(t *testing.T) { - err := m.UpdateVersion(buildID) + err := m.UpdateVersion(buildID, nodeID) assert.NoError(t, err) }) t.Run("fail", func(t *testing.T) { m.catalog = ec - err := m.UpdateVersion(buildID) + err := m.UpdateVersion(buildID, nodeID) assert.Error(t, err) }) t.Run("not exist", func(t *testing.T) { - err := m.UpdateVersion(buildID + 1) + err := m.UpdateVersion(buildID+1, nodeID) assert.Error(t, err) }) } @@ -1184,18 +1184,18 @@ func TestMeta_BuildIndex(t *testing.T) { ).Return(errors.New("fail")) t.Run("success", func(t *testing.T) { - err := m.BuildIndex(buildID, nodeID) + err := m.BuildIndex(buildID) assert.NoError(t, err) }) t.Run("fail", func(t *testing.T) { m.catalog = ec - err := m.BuildIndex(buildID, nodeID) + err := m.BuildIndex(buildID) assert.Error(t, err) }) t.Run("not exist", func(t *testing.T) { - err := m.BuildIndex(buildID+1, nodeID) + err := m.BuildIndex(buildID + 1) assert.Error(t, err) }) } diff --git a/internal/datacoord/task_analyze.go b/internal/datacoord/task_analyze.go index 2d9e77c93e..84fb2c1f10 100644 --- a/internal/datacoord/task_analyze.go +++ b/internal/datacoord/task_analyze.go @@ -107,18 +107,21 @@ func (at *analyzeTask) GetFailReason() string { return at.taskInfo.GetFailReason() } -func (at *analyzeTask) UpdateVersion(ctx context.Context, meta *meta) error { - return meta.analyzeMeta.UpdateVersion(at.GetTaskID()) -} - -func (at *analyzeTask) UpdateMetaBuildingState(nodeID int64, meta *meta) error { - if err := meta.analyzeMeta.BuildingTask(at.GetTaskID(), nodeID); err != nil { +func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error { + if err := meta.analyzeMeta.UpdateVersion(at.GetTaskID(), nodeID); err != nil { return err } at.nodeID = nodeID return nil } +func (at *analyzeTask) UpdateMetaBuildingState(meta *meta) error { + if err := meta.analyzeMeta.BuildingTask(at.GetTaskID()); err != nil { + return err + } + return nil +} + func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool { t := dependency.meta.analyzeMeta.GetTask(at.GetTaskID()) if t == nil { diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index 8f54912db4..826fd61fd0 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -109,13 +109,16 @@ func (it *indexBuildTask) GetFailReason() string { return it.taskInfo.FailReason } -func (it *indexBuildTask) UpdateVersion(ctx context.Context, meta *meta) error { - return meta.indexMeta.UpdateVersion(it.taskID) +func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error { + if err := meta.indexMeta.UpdateVersion(it.taskID, nodeID); err != nil { + return err + } + it.nodeID = nodeID + return nil } -func (it *indexBuildTask) UpdateMetaBuildingState(nodeID int64, meta *meta) error { - it.nodeID = nodeID - return meta.indexMeta.BuildIndex(it.taskID, nodeID) +func (it *indexBuildTask) UpdateMetaBuildingState(meta *meta) error { + return meta.indexMeta.BuildIndex(it.taskID) } func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool { diff --git a/internal/datacoord/task_scheduler.go b/internal/datacoord/task_scheduler.go index 58949eba67..289e35d8fe 100644 --- a/internal/datacoord/task_scheduler.go +++ b/internal/datacoord/task_scheduler.go @@ -255,7 +255,7 @@ func (s *taskScheduler) process(taskID UniqueID) bool { log.Ctx(s.ctx).Info("pick client success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID)) // 2. update version - if err := task.UpdateVersion(s.ctx, s.meta); err != nil { + if err := task.UpdateVersion(s.ctx, nodeID, s.meta); err != nil { log.Ctx(s.ctx).Warn("update task version failed", zap.Int64("taskID", taskID), zap.Error(err)) return false } @@ -273,7 +273,7 @@ func (s *taskScheduler) process(taskID UniqueID) bool { log.Ctx(s.ctx).Info("assign task to client success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID)) // 4. update meta state - if err := task.UpdateMetaBuildingState(nodeID, s.meta); err != nil { + if err := task.UpdateMetaBuildingState(s.meta); err != nil { log.Ctx(s.ctx).Warn("update meta building state failed", zap.Int64("taskID", taskID), zap.Error(err)) task.SetState(indexpb.JobState_JobStateRetry, "update meta building state failed") return false diff --git a/internal/datacoord/types.go b/internal/datacoord/types.go index 3d9cf46fd6..a80f6d1228 100644 --- a/internal/datacoord/types.go +++ b/internal/datacoord/types.go @@ -33,8 +33,8 @@ type Task interface { SetState(state indexpb.JobState, failReason string) GetState() indexpb.JobState GetFailReason() string - UpdateVersion(ctx context.Context, meta *meta) error - UpdateMetaBuildingState(nodeID int64, meta *meta) error + UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error + UpdateMetaBuildingState(meta *meta) error AssignTask(ctx context.Context, client types.IndexNodeClient) bool QueryResult(ctx context.Context, client types.IndexNodeClient) DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool