mirror of https://github.com/milvus-io/milvus.git
fix: [cherry-pick] Record the nodeID before assigning tasks (#36493)
issue: #33744 master pr: #36371 Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>pull/36588/head
parent
132e4c3ba1
commit
6335c8bf69
|
@ -110,7 +110,7 @@ func (m *analyzeMeta) DropAnalyzeTask(taskID int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (m *analyzeMeta) UpdateVersion(taskID int64) error {
|
||||
func (m *analyzeMeta) UpdateVersion(taskID, nodeID int64) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
|
@ -121,11 +121,13 @@ func (m *analyzeMeta) UpdateVersion(taskID int64) error {
|
|||
|
||||
cloneT := proto.Clone(t).(*indexpb.AnalyzeTask)
|
||||
cloneT.Version++
|
||||
log.Info("update task version", zap.Int64("taskID", taskID), zap.Int64("newVersion", cloneT.Version))
|
||||
cloneT.NodeID = nodeID
|
||||
log.Info("update task version", zap.Int64("taskID", taskID),
|
||||
zap.Int64("newVersion", cloneT.Version), zap.Int64("nodeID", nodeID))
|
||||
return m.saveTask(cloneT)
|
||||
}
|
||||
|
||||
func (m *analyzeMeta) BuildingTask(taskID, nodeID int64) error {
|
||||
func (m *analyzeMeta) BuildingTask(taskID int64) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
|
@ -135,9 +137,8 @@ func (m *analyzeMeta) BuildingTask(taskID, nodeID int64) error {
|
|||
}
|
||||
|
||||
cloneT := proto.Clone(t).(*indexpb.AnalyzeTask)
|
||||
cloneT.NodeID = nodeID
|
||||
cloneT.State = indexpb.JobState_JobStateInProgress
|
||||
log.Info("task will be building", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID))
|
||||
log.Info("task will be building", zap.Int64("taskID", taskID))
|
||||
|
||||
return m.saveTask(cloneT)
|
||||
}
|
||||
|
|
|
@ -141,13 +141,13 @@ func (s *AnalyzeMetaSuite) Test_AnalyzeMeta() {
|
|||
})
|
||||
|
||||
s.Run("UpdateVersion", func() {
|
||||
err := am.UpdateVersion(1)
|
||||
err := am.UpdateVersion(1, 1)
|
||||
s.NoError(err)
|
||||
s.Equal(int64(1), am.GetTask(1).Version)
|
||||
})
|
||||
|
||||
s.Run("BuildingTask", func() {
|
||||
err := am.BuildingTask(1, 1)
|
||||
err := am.BuildingTask(1)
|
||||
s.NoError(err)
|
||||
s.Equal(indexpb.JobState_JobStateInProgress, am.GetTask(1).State)
|
||||
})
|
||||
|
@ -217,19 +217,19 @@ func (s *AnalyzeMetaSuite) Test_failCase() {
|
|||
})
|
||||
|
||||
s.Run("UpdateVersion", func() {
|
||||
err := am.UpdateVersion(777)
|
||||
err := am.UpdateVersion(777, 1)
|
||||
s.Error(err)
|
||||
|
||||
err = am.UpdateVersion(1)
|
||||
err = am.UpdateVersion(1, 1)
|
||||
s.Error(err)
|
||||
s.Equal(int64(0), am.GetTask(1).Version)
|
||||
})
|
||||
|
||||
s.Run("BuildingTask", func() {
|
||||
err := am.BuildingTask(777, 1)
|
||||
err := am.BuildingTask(777)
|
||||
s.Error(err)
|
||||
|
||||
err = am.BuildingTask(1, 1)
|
||||
err = am.BuildingTask(1)
|
||||
s.Error(err)
|
||||
s.Equal(int64(0), am.GetTask(1).NodeID)
|
||||
s.Equal(indexpb.JobState_JobStateInit, am.GetTask(1).State)
|
||||
|
|
|
@ -674,11 +674,11 @@ func (m *indexMeta) IsIndexExist(collID, indexID UniqueID) bool {
|
|||
}
|
||||
|
||||
// UpdateVersion updates the version and nodeID of the index meta, whenever the task is built once, the version will be updated once.
|
||||
func (m *indexMeta) UpdateVersion(buildID UniqueID) error {
|
||||
func (m *indexMeta) UpdateVersion(buildID, nodeID UniqueID) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
log.Debug("IndexCoord metaTable UpdateVersion receive", zap.Int64("buildID", buildID))
|
||||
log.Debug("IndexCoord metaTable UpdateVersion receive", zap.Int64("buildID", buildID), zap.Int64("nodeID", nodeID))
|
||||
segIdx, ok := m.buildID2SegmentIndex[buildID]
|
||||
if !ok {
|
||||
return fmt.Errorf("there is no index with buildID: %d", buildID)
|
||||
|
@ -686,6 +686,7 @@ func (m *indexMeta) UpdateVersion(buildID UniqueID) error {
|
|||
|
||||
updateFunc := func(segIdx *model.SegmentIndex) error {
|
||||
segIdx.IndexVersion++
|
||||
segIdx.NodeID = nodeID
|
||||
return m.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
|
||||
}
|
||||
|
||||
|
@ -748,7 +749,7 @@ func (m *indexMeta) DeleteTask(buildID int64) error {
|
|||
}
|
||||
|
||||
// BuildIndex set the index state to be InProgress. It means IndexNode is building the index.
|
||||
func (m *indexMeta) BuildIndex(buildID, nodeID UniqueID) error {
|
||||
func (m *indexMeta) BuildIndex(buildID UniqueID) error {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
|
@ -758,7 +759,6 @@ func (m *indexMeta) BuildIndex(buildID, nodeID UniqueID) error {
|
|||
}
|
||||
|
||||
updateFunc := func(segIdx *model.SegmentIndex) error {
|
||||
segIdx.NodeID = nodeID
|
||||
segIdx.IndexState = commonpb.IndexState_InProgress
|
||||
|
||||
err := m.alterSegmentIndexes([]*model.SegmentIndex{segIdx})
|
||||
|
|
|
@ -1116,18 +1116,18 @@ func TestMeta_UpdateVersion(t *testing.T) {
|
|||
).Return(errors.New("fail"))
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
err := m.UpdateVersion(buildID)
|
||||
err := m.UpdateVersion(buildID, nodeID)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("fail", func(t *testing.T) {
|
||||
m.catalog = ec
|
||||
err := m.UpdateVersion(buildID)
|
||||
err := m.UpdateVersion(buildID, nodeID)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("not exist", func(t *testing.T) {
|
||||
err := m.UpdateVersion(buildID + 1)
|
||||
err := m.UpdateVersion(buildID+1, nodeID)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
@ -1184,18 +1184,18 @@ func TestMeta_BuildIndex(t *testing.T) {
|
|||
).Return(errors.New("fail"))
|
||||
|
||||
t.Run("success", func(t *testing.T) {
|
||||
err := m.BuildIndex(buildID, nodeID)
|
||||
err := m.BuildIndex(buildID)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("fail", func(t *testing.T) {
|
||||
m.catalog = ec
|
||||
err := m.BuildIndex(buildID, nodeID)
|
||||
err := m.BuildIndex(buildID)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
|
||||
t.Run("not exist", func(t *testing.T) {
|
||||
err := m.BuildIndex(buildID+1, nodeID)
|
||||
err := m.BuildIndex(buildID + 1)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -107,18 +107,21 @@ func (at *analyzeTask) GetFailReason() string {
|
|||
return at.taskInfo.GetFailReason()
|
||||
}
|
||||
|
||||
func (at *analyzeTask) UpdateVersion(ctx context.Context, meta *meta) error {
|
||||
return meta.analyzeMeta.UpdateVersion(at.GetTaskID())
|
||||
}
|
||||
|
||||
func (at *analyzeTask) UpdateMetaBuildingState(nodeID int64, meta *meta) error {
|
||||
if err := meta.analyzeMeta.BuildingTask(at.GetTaskID(), nodeID); err != nil {
|
||||
func (at *analyzeTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
|
||||
if err := meta.analyzeMeta.UpdateVersion(at.GetTaskID(), nodeID); err != nil {
|
||||
return err
|
||||
}
|
||||
at.nodeID = nodeID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (at *analyzeTask) UpdateMetaBuildingState(meta *meta) error {
|
||||
if err := meta.analyzeMeta.BuildingTask(at.GetTaskID()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (at *analyzeTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
|
||||
t := dependency.meta.analyzeMeta.GetTask(at.GetTaskID())
|
||||
if t == nil {
|
||||
|
|
|
@ -109,13 +109,16 @@ func (it *indexBuildTask) GetFailReason() string {
|
|||
return it.taskInfo.FailReason
|
||||
}
|
||||
|
||||
func (it *indexBuildTask) UpdateVersion(ctx context.Context, meta *meta) error {
|
||||
return meta.indexMeta.UpdateVersion(it.taskID)
|
||||
func (it *indexBuildTask) UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error {
|
||||
if err := meta.indexMeta.UpdateVersion(it.taskID, nodeID); err != nil {
|
||||
return err
|
||||
}
|
||||
it.nodeID = nodeID
|
||||
return nil
|
||||
}
|
||||
|
||||
func (it *indexBuildTask) UpdateMetaBuildingState(nodeID int64, meta *meta) error {
|
||||
it.nodeID = nodeID
|
||||
return meta.indexMeta.BuildIndex(it.taskID, nodeID)
|
||||
func (it *indexBuildTask) UpdateMetaBuildingState(meta *meta) error {
|
||||
return meta.indexMeta.BuildIndex(it.taskID)
|
||||
}
|
||||
|
||||
func (it *indexBuildTask) PreCheck(ctx context.Context, dependency *taskScheduler) bool {
|
||||
|
|
|
@ -255,7 +255,7 @@ func (s *taskScheduler) process(taskID UniqueID) bool {
|
|||
log.Ctx(s.ctx).Info("pick client success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID))
|
||||
|
||||
// 2. update version
|
||||
if err := task.UpdateVersion(s.ctx, s.meta); err != nil {
|
||||
if err := task.UpdateVersion(s.ctx, nodeID, s.meta); err != nil {
|
||||
log.Ctx(s.ctx).Warn("update task version failed", zap.Int64("taskID", taskID), zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
@ -273,7 +273,7 @@ func (s *taskScheduler) process(taskID UniqueID) bool {
|
|||
log.Ctx(s.ctx).Info("assign task to client success", zap.Int64("taskID", taskID), zap.Int64("nodeID", nodeID))
|
||||
|
||||
// 4. update meta state
|
||||
if err := task.UpdateMetaBuildingState(nodeID, s.meta); err != nil {
|
||||
if err := task.UpdateMetaBuildingState(s.meta); err != nil {
|
||||
log.Ctx(s.ctx).Warn("update meta building state failed", zap.Int64("taskID", taskID), zap.Error(err))
|
||||
task.SetState(indexpb.JobState_JobStateRetry, "update meta building state failed")
|
||||
return false
|
||||
|
|
|
@ -33,8 +33,8 @@ type Task interface {
|
|||
SetState(state indexpb.JobState, failReason string)
|
||||
GetState() indexpb.JobState
|
||||
GetFailReason() string
|
||||
UpdateVersion(ctx context.Context, meta *meta) error
|
||||
UpdateMetaBuildingState(nodeID int64, meta *meta) error
|
||||
UpdateVersion(ctx context.Context, nodeID int64, meta *meta) error
|
||||
UpdateMetaBuildingState(meta *meta) error
|
||||
AssignTask(ctx context.Context, client types.IndexNodeClient) bool
|
||||
QueryResult(ctx context.Context, client types.IndexNodeClient)
|
||||
DropTaskOnWorker(ctx context.Context, client types.IndexNodeClient) bool
|
||||
|
|
Loading…
Reference in New Issue