mirror of https://github.com/milvus-io/milvus.git
enhance: [2.5] Limit the maximum number of segments restored and fail the job if saving the binlog fails (#39359)
1. Limit the maximum number of restored segments to 1024. 2. Fail the import job if saving binlog fails. 3. Fail the import job if saving the import task fails to prevent repeatedly generating dirty importing segments. issue: https://github.com/milvus-io/milvus/issues/39331 pr: https://github.com/milvus-io/milvus/pull/39344 Signed-off-by: bigsheeper <yihao.dai@zilliz.com>pull/39380/head
parent
e7520599df
commit
b69994272f
|
@ -242,6 +242,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) {
|
|||
err = c.imeta.AddTask(context.TODO(), t)
|
||||
if err != nil {
|
||||
log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...)
|
||||
updateErr := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
|
||||
if updateErr != nil {
|
||||
log.Warn("failed to update job state to Failed", zap.Error(updateErr))
|
||||
}
|
||||
return
|
||||
}
|
||||
log.Info("add new import task", WrapTaskLog(t)...)
|
||||
|
|
|
@ -288,13 +288,17 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() {
|
|||
|
||||
catalog.ExpectedCalls = nil
|
||||
catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(mockErr)
|
||||
catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil)
|
||||
s.checker.checkPreImportingJob(job)
|
||||
importTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
|
||||
s.Equal(0, len(importTasks))
|
||||
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
|
||||
s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
|
||||
|
||||
alloc.ExpectedCalls = nil
|
||||
alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr)
|
||||
err := s.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting))
|
||||
s.NoError(err)
|
||||
s.checker.checkPreImportingJob(job)
|
||||
importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType))
|
||||
s.Equal(0, len(importTasks))
|
||||
s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState())
|
||||
|
|
|
@ -278,8 +278,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
|||
return
|
||||
}
|
||||
if resp.GetState() == datapb.ImportTaskStateV2_Failed {
|
||||
err = s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed),
|
||||
UpdateJobReason(resp.GetReason()))
|
||||
err = s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason()))
|
||||
if err != nil {
|
||||
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err))
|
||||
}
|
||||
|
@ -324,7 +323,11 @@ func (s *importScheduler) processInProgressImport(task ImportTask) {
|
|||
op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed)
|
||||
err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2)
|
||||
if err != nil {
|
||||
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...)
|
||||
updateErr := s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error()))
|
||||
if updateErr != nil {
|
||||
log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(updateErr))
|
||||
}
|
||||
log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.String("err", err.Error()))...)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1706,6 +1706,11 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter
|
|||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles())))
|
||||
return resp, nil
|
||||
}
|
||||
if len(files) > paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt() {
|
||||
resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("The max number of import files should not exceed %d, but got %d",
|
||||
paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files))))
|
||||
return resp, nil
|
||||
}
|
||||
log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files))
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue