mirror of https://github.com/milvus-io/milvus.git
enhance: Limit number of segments restored and promptly terminate the job (#39344)
1. Limit the maximum number of restored segments to 1024. 2. Fail the import job if saving binlog fails. 3. Fail the import job if saving the import task fails to prevent repeatedly generating dirty importing segments. issue: https://github.com/milvus-io/milvus/issues/39331 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/39489/head
parent
7261128df0
commit
89eaf92984
|
@ -242,6 +242,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
|
|||
err = c.imeta.AddTask(context.TODO(), t)
|
||||
if err != nil {
|
||||
log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...)
|
||||
updateErr := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
|
||||
if updateErr != nil {
|
||||
log.Warn("failed to update job state to Failed", zap.Error(updateErr))
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Info("add new import task", WrapTaskLog(t)...)
|
||||
|
|
|
@ -288,13 +288,17 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() {
|
|||
|
||||
catalog.ExpectedCalls = nil
|
||||
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(mockErr)
|
||||
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
|
||||
s.checker.checkPreImportingJob(job)
|
||||
importTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
|
||||
s.Equal(0, len(importTasks))
|
||||
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
|
||||
s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
|
||||
|
||||
alloc.ExpectedCalls = nil
|
||||
alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr)
|
||||
err := s.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
|
||||
s.NoError(err)
|
||||
s.checker.checkPreImportingJob(job)
|
||||
importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
|
||||
s.Equal(0, len(importTasks))
|
||||
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
|
||||
|
|
|
@ -278,8 +278,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
|||
return
|
||||
}
|
||||
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
|
||||
err = s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
|
||||
UpdateJobReason(resp.GetReason()))
|
||||
err = s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason()))
|
||||
if err != nil {
|
||||
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
|
||||
}
|
||||
|
@ -324,7 +323,11 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
|||
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
|
||||
err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
|
||||
if err != nil {
|
||||
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...)
|
||||
updateErr := s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
|
||||
if updateErr != nil {
|
||||
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(updateErr))
|
||||
}
|
||||
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.String("err", err.Error()))...)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1715,6 +1715,11 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
|||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
|
||||
return resp, nil
|
||||
}
|
||||
if len(files) > paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt() {
|
||||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("The max number of import files should not exceed %d, but got %d",
|
||||
paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files))))
|
||||
return resp, nil
|
||||
}
|
||||
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue