mirror of https://github.com/milvus-io/milvus.git
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/25884/head
parent
5437fcce8e
commit
fe1c0ffe56
|
@ -44,6 +44,8 @@ const (
|
||||||
delimiter = "/"
|
delimiter = "/"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errSegmentNotExist = errors.New("segment not exist")
|
||||||
|
|
||||||
// checkPendingTasksInterval is the default interval to check and send out pending tasks,
|
// checkPendingTasksInterval is the default interval to check and send out pending tasks,
|
||||||
// default 60*1000 milliseconds (1 minute).
|
// default 60*1000 milliseconds (1 minute).
|
||||||
var checkPendingTasksInterval = 60 * 1000
|
var checkPendingTasksInterval = 60 * 1000
|
||||||
|
@ -264,6 +266,25 @@ func (m *importManager) sendOutTasks(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *importManager) markTaskFailed(task *datapb.ImportTaskInfo) {
|
||||||
|
if err := m.setImportTaskStateAndReason(task.GetId(), commonpb.ImportState_ImportFailed,
|
||||||
|
"the import task failed"); err != nil {
|
||||||
|
log.Warn("failed to set import task state",
|
||||||
|
zap.Int64("task ID", task.GetId()),
|
||||||
|
zap.Any("target state", commonpb.ImportState_ImportFailed),
|
||||||
|
zap.Error(err))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Remove DataNode from busy node list, so it can serve other tasks again.
|
||||||
|
// remove after set state failed, prevent double remove, remove the nodeID of another task.
|
||||||
|
m.busyNodesLock.Lock()
|
||||||
|
delete(m.busyNodes, task.GetDatanodeId())
|
||||||
|
m.busyNodesLock.Unlock()
|
||||||
|
m.workingLock.Lock()
|
||||||
|
delete(m.workingTasks, task.GetId())
|
||||||
|
m.workingLock.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
// loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to
|
// loadAndFlipPersistedTasks checks every import task in `ImportPersisted` state and flips their import state to
|
||||||
// `ImportCompleted` if eligible.
|
// `ImportCompleted` if eligible.
|
||||||
func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error {
|
func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error {
|
||||||
|
@ -286,6 +307,9 @@ func (m *importManager) loadAndFlipPersistedTasks(ctx context.Context) error {
|
||||||
log.Error("failed to flip task flushed state",
|
log.Error("failed to flip task flushed state",
|
||||||
zap.Int64("task ID", task.GetId()),
|
zap.Int64("task ID", task.GetId()),
|
||||||
zap.Error(err))
|
zap.Error(err))
|
||||||
|
if errors.Is(err, errSegmentNotExist) {
|
||||||
|
m.markTaskFailed(task)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -355,14 +379,19 @@ func (m *importManager) checkFlushDone(ctx context.Context, segIDs []UniqueID) (
|
||||||
}
|
}
|
||||||
log.Debug("checking import segment states",
|
log.Debug("checking import segment states",
|
||||||
zap.Strings("segment states", lo.Map(resp.GetStates(), getSegmentStates)))
|
zap.Strings("segment states", lo.Map(resp.GetStates(), getSegmentStates)))
|
||||||
|
flushed := true
|
||||||
for _, states := range resp.GetStates() {
|
for _, states := range resp.GetStates() {
|
||||||
// Flushed segment could get compacted, so only returns false if there are still importing segments.
|
// Flushed segment could get compacted, so only returns false if there are still importing segments.
|
||||||
|
if states.GetState() == commonpb.SegmentState_Dropped ||
|
||||||
|
states.GetState() == commonpb.SegmentState_NotExist {
|
||||||
|
return false, errSegmentNotExist
|
||||||
|
}
|
||||||
if states.GetState() == commonpb.SegmentState_Importing ||
|
if states.GetState() == commonpb.SegmentState_Importing ||
|
||||||
states.GetState() == commonpb.SegmentState_Sealed {
|
states.GetState() == commonpb.SegmentState_Sealed {
|
||||||
return false, nil
|
flushed = false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return true, nil
|
return flushed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *importManager) isRowbased(files []string) (bool, error) {
|
func (m *importManager) isRowbased(files []string) (bool, error) {
|
||||||
|
@ -890,10 +919,6 @@ func (m *importManager) expireOldTasksFromMem() {
|
||||||
zap.Float64("ImportTaskExpiration", Params.RootCoordCfg.ImportTaskExpiration.GetAsFloat()))
|
zap.Float64("ImportTaskExpiration", Params.RootCoordCfg.ImportTaskExpiration.GetAsFloat()))
|
||||||
taskID := v.GetId()
|
taskID := v.GetId()
|
||||||
m.workingLock.Unlock()
|
m.workingLock.Unlock()
|
||||||
// Remove DataNode from busy node list, so it can serve other tasks again.
|
|
||||||
m.busyNodesLock.Lock()
|
|
||||||
delete(m.busyNodes, v.GetDatanodeId())
|
|
||||||
m.busyNodesLock.Unlock()
|
|
||||||
|
|
||||||
if err := m.setImportTaskStateAndReason(taskID, commonpb.ImportState_ImportFailed,
|
if err := m.setImportTaskStateAndReason(taskID, commonpb.ImportState_ImportFailed,
|
||||||
"the import task has timed out"); err != nil {
|
"the import task has timed out"); err != nil {
|
||||||
|
@ -902,6 +927,11 @@ func (m *importManager) expireOldTasksFromMem() {
|
||||||
zap.Any("target state", commonpb.ImportState_ImportFailed))
|
zap.Any("target state", commonpb.ImportState_ImportFailed))
|
||||||
} else {
|
} else {
|
||||||
taskExpiredAndStateUpdated = true
|
taskExpiredAndStateUpdated = true
|
||||||
|
// Remove DataNode from busy node list, so it can serve other tasks again.
|
||||||
|
// remove after set state failed, prevent double remove, remove the nodeID of another task.
|
||||||
|
m.busyNodesLock.Lock()
|
||||||
|
delete(m.busyNodes, v.GetDatanodeId())
|
||||||
|
m.busyNodesLock.Unlock()
|
||||||
}
|
}
|
||||||
m.workingLock.Lock()
|
m.workingLock.Lock()
|
||||||
if taskExpiredAndStateUpdated {
|
if taskExpiredAndStateUpdated {
|
||||||
|
|
Loading…
Reference in New Issue