mirror of https://github.com/milvus-io/milvus.git
parent
2efa11e830
commit
b6bbc9ae8b
|
@ -154,7 +154,7 @@ func (m *importManager) flipTaskStateLoop(wg *sync.WaitGroup) {
|
|||
log.Info("import manager context done, exit check flipTaskStateLoop")
|
||||
return
|
||||
case <-flipPersistedTicker.C:
|
||||
log.Debug("start trying to flip ImportPersisted task")
|
||||
// log.Debug("start trying to flip ImportPersisted task")
|
||||
if err := m.loadAndFlipPersistedTasks(m.ctx); err != nil {
|
||||
log.Error("failed to flip ImportPersisted task", zap.Error(err))
|
||||
}
|
||||
|
@ -786,7 +786,7 @@ func (m *importManager) getTaskState(tID int64) *milvuspb.GetImportStateResponse
|
|||
// other in-progress tasks as failed, when `load2Mem` is set to `true`.
|
||||
// loadFromTaskStore instead returns a list of all import tasks if `load2Mem` is set to `false`.
|
||||
func (m *importManager) loadFromTaskStore(load2Mem bool) ([]*datapb.ImportTaskInfo, error) {
|
||||
log.Info("import manager starts loading from Etcd")
|
||||
// log.Info("import manager starts loading from Etcd")
|
||||
_, v, err := m.taskStore.LoadWithPrefix(Params.RootCoordCfg.ImportTaskSubPath)
|
||||
if err != nil {
|
||||
log.Error("import manager failed to load from Etcd", zap.Error(err))
|
||||
|
|
|
@ -1765,7 +1765,7 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
|
|||
|
||||
// If setting ImportState_ImportCompleted, simply update the state and return directly.
|
||||
if ir.GetState() == commonpb.ImportState_ImportCompleted {
|
||||
log.Warn("this should not be called!")
|
||||
log.Warn("ReportImport() should not be called when import task is completed!")
|
||||
}
|
||||
// Upon receiving ReportImport request, update the related task's state in task store.
|
||||
ti, err := c.importManager.updateTaskInfo(ir)
|
||||
|
@ -1782,14 +1782,16 @@ func (c *Core) ReportImport(ctx context.Context, ir *rootcoordpb.ImportResult) (
|
|||
log.Info("an import task has failed, marking DataNode available and resending import task",
|
||||
zap.Int64("task ID", ir.GetTaskId()))
|
||||
resendTaskFunc()
|
||||
} else if ir.GetState() != commonpb.ImportState_ImportPersisted {
|
||||
log.Warn("unexpected import task state reported, return immediately (this should not happen)",
|
||||
zap.Any("task ID", ir.GetTaskId()),
|
||||
zap.Any("import state", ir.GetState()))
|
||||
} else if ir.GetState() == commonpb.ImportState_ImportCompleted {
|
||||
// When a DataNode completes importing, remove this DataNode from the busy node list and send out import tasks again.
|
||||
log.Info("an import task has completed, marking DataNode available and resending import task",
|
||||
zap.Int64("task ID", ir.GetTaskId()))
|
||||
resendTaskFunc()
|
||||
} else {
|
||||
} else if ir.GetState() == commonpb.ImportState_ImportPersisted {
|
||||
// Here ir.GetState() == commonpb.ImportState_ImportPersisted
|
||||
// Seal these import segments, so they can be auto-flushed later.
|
||||
log.Info("an import task turns to persisted state, flush segments to be sealed",
|
||||
zap.Any("task ID", ir.GetTaskId()), zap.Any("segments", ir.GetSegments()))
|
||||
if err := c.broker.Flush(ctx, ti.GetCollectionId(), ir.GetSegments()); err != nil {
|
||||
log.Error("failed to call Flush on bulk insert segments",
|
||||
zap.Int64("task ID", ir.GetTaskId()))
|
||||
|
|
|
@ -1184,24 +1184,7 @@ func TestCore_ReportImport(t *testing.T) {
|
|||
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
||||
})
|
||||
|
||||
t.Run("report import started state", func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
c := newTestCore(withHealthyCode())
|
||||
c.importManager = newImportManager(ctx, mockKv, idAlloc, callImportServiceFn, callMarkSegmentsDropped, callGetSegmentStates, nil, nil)
|
||||
c.importManager.loadFromTaskStore(true)
|
||||
c.importManager.sendOutTasks(ctx)
|
||||
resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{
|
||||
TaskId: 100,
|
||||
State: commonpb.ImportState_ImportStarted,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
||||
// Change the state back.
|
||||
err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending)
|
||||
assert.NoError(t, err)
|
||||
})
|
||||
|
||||
t.Run("report persisted import", func(t *testing.T) {
|
||||
testFunc := func(state commonpb.ImportState) {
|
||||
ctx := context.Background()
|
||||
c := newTestCore(
|
||||
withHealthyCode(),
|
||||
|
@ -1216,13 +1199,29 @@ func TestCore_ReportImport(t *testing.T) {
|
|||
|
||||
resp, err := c.ReportImport(ctx, &rootcoordpb.ImportResult{
|
||||
TaskId: 100,
|
||||
State: commonpb.ImportState_ImportPersisted,
|
||||
State: state,
|
||||
})
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetErrorCode())
|
||||
// Change the state back.
|
||||
err = c.importManager.setImportTaskState(100, commonpb.ImportState_ImportPending)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
t.Run("report import started state", func(t *testing.T) {
|
||||
testFunc(commonpb.ImportState_ImportStarted)
|
||||
})
|
||||
|
||||
t.Run("report import persisted state", func(t *testing.T) {
|
||||
testFunc(commonpb.ImportState_ImportPersisted)
|
||||
})
|
||||
|
||||
t.Run("report import completed state", func(t *testing.T) {
|
||||
testFunc(commonpb.ImportState_ImportCompleted)
|
||||
})
|
||||
|
||||
t.Run("report import failed state", func(t *testing.T) {
|
||||
testFunc(commonpb.ImportState_ImportFailed)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue