diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index d43a3804e4..1206e7a9ee 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -242,6 +242,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { err = c.imeta.AddTask(context.TODO(), t) if err != nil { log.Warn("add new import task failed", WrapTaskLog(t, zap.Error(err))...) + updateErr := c.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) + if updateErr != nil { + log.Warn("failed to update job state to Failed", zap.Error(updateErr)) + } return } log.Info("add new import task", WrapTaskLog(t)...) diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index d45ae1f679..f3d7428829 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -288,13 +288,17 @@ func (s *ImportCheckerSuite) TestCheckJob_Failed() { catalog.ExpectedCalls = nil catalog.EXPECT().SaveImportTask(mock.Anything, mock.Anything).Return(mockErr) + catalog.EXPECT().SaveImportJob(mock.Anything, mock.Anything).Return(nil) s.checker.checkPreImportingJob(job) importTasks := s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(0, len(importTasks)) - s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) + s.Equal(internalpb.ImportJobState_Failed, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) alloc.ExpectedCalls = nil alloc.EXPECT().AllocN(mock.Anything).Return(0, 0, mockErr) + err := s.imeta.UpdateJob(context.TODO(), job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) + s.NoError(err) + s.checker.checkPreImportingJob(job) importTasks = s.imeta.GetTaskBy(context.TODO(), WithJob(job.GetJobID()), WithType(ImportTaskType)) s.Equal(0, len(importTasks)) s.Equal(internalpb.ImportJobState_PreImporting, s.imeta.GetJob(context.TODO(), job.GetJobID()).GetState()) diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 5e43ec7acc..00b3a8e2d4 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -278,8 +278,7 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { return } if resp.GetState() == datapb.ImportTaskStateV2_Failed { - err = s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), - UpdateJobReason(resp.GetReason())) + err = s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(resp.GetReason())) if err != nil { log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(err)) } @@ -324,7 +323,11 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { op2 := UpdateStatusOperator(info.GetSegmentID(), commonpb.SegmentState_Flushed) err = s.meta.UpdateSegmentsInfo(context.TODO(), op1, op2) if err != nil { - log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...) + updateErr := s.imeta.UpdateJob(context.TODO(), task.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) + if updateErr != nil { + log.Warn("failed to update job state to Failed", zap.Int64("jobID", task.GetJobID()), zap.Error(updateErr)) + } + log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.String("err", err.Error()))...) return } } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 000be0f6c0..c055124858 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1706,6 +1706,11 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("no binlog to import, input=%s", in.GetFiles()))) return resp, nil } + if len(files) > paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt() { + resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("The max number of import files should not exceed %d, but got %d", + paramtable.Get().DataCoordCfg.MaxFilesPerImportReq.GetAsInt(), len(files)))) + return resp, nil + } log.Info("list binlogs prefixes for import", zap.Any("binlog_prefixes", files)) }