diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index e8c05e342c..f9857d4274 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -154,7 +154,7 @@ func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) { log.Info("import manager context done, exit check flipTaskStateLoop") return case <-flipPersistedTicker.C: - log.Debug("start trying to flip ImportPersisted task") + // log.Debug("start trying to flip ImportPersisted task") if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil { log.Error("failed to flip ImportPersisted task", zap.Error(err)) } @@ -786,7 +786,7 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse // other in-progress tasks as failed, when `load2Mem` is set to `true`. // loadFromTaskStore instead returns a list of all import tasks if `load2Mem` is set to `false`. func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskInfo, error) { - log.Info("import manager starts loading from Etcd") + // log.Info("import manager starts loading from Etcd") _, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath) if err != nil { log.Error("import manager failed to load from Etcd", zap.Error(err)) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 9389b6d110..89a864b1ab 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -1765,7 +1765,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( // If setting ImportState_ImportCompleted, simply update the state and return directly. if ir.GetState() == commonpb.ImportState_ImportCompleted { - log.Warn("this should not be called!") + log.Warn("ReportImport() should not be called when import task is completed!") } // Upon receiving ReportImport request, update the related task's state in task store. ti, err := c.importManager.updateTaskInfo(ir) @@ -1782,14 +1782,16 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) ( log.Info("an import task has failed, marking DataNode available and resending import task", zap.Int64("task ID", ir.GetTaskId())) resendTaskFunc() - } else if ir.GetState() != commonpb.ImportState_ImportPersisted { - log.Warn("unexpected import task state reported, return immediately (this should not happen)", - zap.Any("task ID", ir.GetTaskId()), - zap.Any("import state", ir.GetState())) + } else if ir.GetState() == commonpb.ImportState_ImportCompleted { + // When a DataNode completes importing, remove this DataNode from the busy node list and send out import tasks again. + log.Info("an import task has completed, marking DataNode available and resending import task", + zap.Int64("task ID", ir.GetTaskId())) resendTaskFunc() - } else { + } else if ir.GetState() == commonpb.ImportState_ImportPersisted { // Here ir.GetState() == commonpb.ImportState_ImportPersisted // Seal these import segments, so they can be auto-flushed later. + log.Info("an import task turns to persisted state, flush segments to be sealed", + zap.Any("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments())) if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil { log.Error("failed to call Flush on bulk insert segments", zap.Int64("task ID", ir.GetTaskId())) diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index b95ace38ca..38b924bb02 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -1184,24 +1184,7 @@ func TestCore_ReportImport(t *testing.T) { assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) }) - t.Run("report import started state", func(t *testing.T) { - ctx := context.Background() - c := newTestCore(withHealthyCode()) - c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil) - c.importManager.loadFromTaskStore(true) - c.importManager.sendOutTasks(ctx) - resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ - TaskId: 100, - State: commonpb.ImportState_ImportStarted, - }) - assert.NoError(t, err) - assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) - // Change the state back. - err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) - assert.NoError(t, err) - }) - - t.Run("report persisted import", func(t *testing.T) { + testFunc := func(state commonpb.ImportState) { ctx := context.Background() c := newTestCore( withHealthyCode(), @@ -1216,13 +1199,29 @@ func TestCore_ReportImport(t *testing.T) { resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{ TaskId: 100, - State: commonpb.ImportState_ImportPersisted, + State: state, }) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode()) // Change the state back. err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending) assert.NoError(t, err) + } + + t.Run("report import started state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportStarted) + }) + + t.Run("report import persisted state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportPersisted) + }) + + t.Run("report import completed state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportCompleted) + }) + + t.Run("report import failed state", func(t *testing.T) { + testFunc(commonpb.ImportState_ImportFailed) }) }