From fe1c0ffe56e46e226ddafdd2737a88a469e4d1d9 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Tue, 25 Jul 2023 10:09:01 +0800 Subject: [PATCH] Check sgement exist before check import task state (#25809) (#25817) Signed-off-by: cai.zhang --- internal/rootcoord/import_manager.go | 42 ++++++++++++++++++++++++---- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index f4c8605cfa..2a7a84fb8c 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -44,6 +44,8 @@ const ( delimiter = "/" ) +var errSegmentNotExist = errors.New("segment not exist") + // checkPendingTasksInterval is the default interval to check and send out pending tasks, // default 60*1000 milliseconds (1 minute). var checkPendingTasksInterval = 60 * 1000 @@ -264,6 +266,25 @@ func (m *importManager) sendOutTasks(ctx context.Context) error { return nil } +func (m *importManager) markTaskFailed(task *datapb.ImportTaskInfo) { + if err := m.setImportTaskStateAndReason(task.GetId(), commonpb.ImportState_ImportFailed, + "the import task failed"); err != nil { + log.Warn("failed to set import task state", + zap.Int64("task ID", task.GetId()), + zap.Any("target state", commonpb.ImportState_ImportFailed), + zap.Error(err)) + return + } + // Remove DataNode from busy node list, so it can serve other tasks again. + // remove after set state failed, prevent double remove, remove the nodeID of another task. + m.busyNodesLock.Lock() + delete(m.busyNodes, task.GetDatanodeId()) + m.busyNodesLock.Unlock() + m.workingLock.Lock() + delete(m.workingTasks, task.GetId()) + m.workingLock.Unlock() +} + // loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to // `ImportCompleted` if eligible. func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error { @@ -286,6 +307,9 @@ func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error { log.Error("failed to flip task flushed state", zap.Int64("task ID", task.GetId()), zap.Error(err)) + if errors.Is(err, errSegmentNotExist) { + m.markTaskFailed(task) + } } } } @@ -355,14 +379,19 @@ func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) ( } log.Debug("checking import segment states", zap.Strings("segment states", lo.Map(resp.GetStates(), getSegmentStates))) + flushed := true for _, states := range resp.GetStates() { // Flushed segment could get compacted, so only returns false if there are still importing segments. + if states.GetState() == commonpb.SegmentState_Dropped || + states.GetState() == commonpb.SegmentState_NotExist { + return false, errSegmentNotExist + } if states.GetState() == commonpb.SegmentState_Importing || states.GetState() == commonpb.SegmentState_Sealed { - return false, nil + flushed = false } } - return true, nil + return flushed, nil } func (m *importManager) isRowbased(files []string) (bool, error) { @@ -890,10 +919,6 @@ func (m *importManager) expireOldTasksFromMem() { zap.Float64("ImportTaskExpiration", Params.RootCoordCfg.ImportTaskExpiration.GetAsFloat())) taskID := v.GetId() m.workingLock.Unlock() - // Remove DataNode from busy node list, so it can serve other tasks again. - m.busyNodesLock.Lock() - delete(m.busyNodes, v.GetDatanodeId()) - m.busyNodesLock.Unlock() if err := m.setImportTaskStateAndReason(taskID, commonpb.ImportState_ImportFailed, "the import task has timed out"); err != nil { @@ -902,6 +927,11 @@ func (m *importManager) expireOldTasksFromMem() { zap.Any("target state", commonpb.ImportState_ImportFailed)) } else { taskExpiredAndStateUpdated = true + // Remove DataNode from busy node list, so it can serve other tasks again. + // remove after set state failed, prevent double remove, remove the nodeID of another task. + m.busyNodesLock.Lock() + delete(m.busyNodes, v.GetDatanodeId()) + m.busyNodesLock.Unlock() } m.workingLock.Lock() if taskExpiredAndStateUpdated {