From 6b2d3c3ec1557d68a0b63ee651ec50d1c80e3035 Mon Sep 17 00:00:00 2001 From: Ten Thousand Leaves <69466447+soothing-rain@users.noreply.github.com> Date: Sat, 7 May 2022 14:05:52 +0800 Subject: [PATCH] Make each dataNode only accept one bulk load task simultaneously (#16820) issue: #16674 /kind improvement Signed-off-by: Yuchen Gao --- internal/datacoord/server_test.go | 2 +- internal/datacoord/services.go | 23 +++-- internal/rootcoord/import_manager.go | 86 +++++++++++------ internal/rootcoord/import_manager_test.go | 111 +++++++++++++++++++++- internal/rootcoord/root_coord.go | 9 +- 5 files changed, 176 insertions(+), 55 deletions(-) diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 06b8b285ed..cf71cc9a53 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -2412,7 +2412,7 @@ func TestImport(t *testing.T) { WorkingNodes: []int64{0}, }) assert.Nil(t, err) - assert.EqualValues(t, commonpb.ErrorCode_Success, resp.Status.GetErrorCode()) + assert.EqualValues(t, commonpb.ErrorCode_UnexpectedError, resp.Status.GetErrorCode()) etcd.StopEtcdServer() }) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b5f318b870..fb0a032a7a 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -998,9 +998,10 @@ func (s *Server) GetFlushState(ctx context.Context, req *milvuspb.GetFlushStateR return resp, nil } -// Import data files(json, numpy, etc.) on MinIO/S3 storage, read and parse them into sealed segments +// Import distributes the import tasks to dataNodes. +// It returns a failed status if no dataNode is available or if any error occurs. func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*datapb.ImportTaskResponse, error) { - log.Info("receive import request", zap.Any("import task request", itr)) + log.Info("received import request", zap.Any("import task request", itr)) resp := &datapb.ImportTaskResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_UnexpectedError, @@ -1008,18 +1009,17 @@ func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*da } if s.isClosed() { - log.Warn("failed to import because of closed server", zap.Any("import task request", itr)) + log.Error("failed to import for closed dataCoord service") resp.Status.Reason = msgDataCoordIsUnhealthy(Params.DataCoordCfg.GetNodeID()) return resp, nil } - workingNodes := itr.WorkingNodes nodes := s.channelManager.store.GetNodes() if len(nodes) == 0 { - log.Error("import failed as all dataNodes are offline", zap.Any("import task request", itr)) + log.Error("import failed as all dataNodes are offline") return resp, nil } - avaNodes := getDiff(nodes, workingNodes) + avaNodes := getDiff(nodes, itr.GetWorkingNodes()) if len(avaNodes) > 0 { // If there exists available DataNodes, pick one at random. dnID := avaNodes[rand.Intn(len(avaNodes))] @@ -1028,12 +1028,11 @@ func (s *Server) Import(ctx context.Context, itr *datapb.ImportTaskRequest) (*da zap.Int64("picking free dataNode with ID", dnID)) s.cluster.Import(s.ctx, dnID, itr) } else { - // No DataNodes are available, choose a still working DataNode randomly. - dnID := nodes[rand.Intn(len(nodes))] - log.Info("all dataNodes are busy, picking a random dataNode still", - zap.Any("all dataNodes", nodes), - zap.Int64("picking dataNode with ID", dnID)) - s.cluster.Import(s.ctx, dnID, itr) + // No dataNode is available, reject the import request. + errMsg := "all dataNodes are busy working on data import, please try again later or add new dataNode instances" + log.Error(errMsg, zap.Int64("task ID", itr.GetImportTask().GetTaskId())) + resp.Status.Reason = errMsg + return resp, nil } resp.Status.ErrorCode = commonpb.ErrorCode_Success diff --git a/internal/rootcoord/import_manager.go b/internal/rootcoord/import_manager.go index 50c971ecea..6405ff33a8 100644 --- a/internal/rootcoord/import_manager.go +++ b/internal/rootcoord/import_manager.go @@ -45,6 +45,14 @@ const ( taskExpiredMsgPrefix = "task has expired after " ) +// CheckPendingTasksInterval is the default interval to check and send out pending tasks, +// default 60*1000 milliseconds (1 minute). +var checkPendingTasksInterval = 60 * 1000 + +// ExpireOldTasksInterval is the default interval to loop through all in memory tasks and expire old ones. +// default 10*60*1000 milliseconds (10 minutes) +var expireOldTasksInterval = 10 * 60 * 1000 + // import task state type importTaskState struct { stateCode commonpb.ImportState // state code @@ -103,6 +111,41 @@ func (m *importManager) init(ctx context.Context) { }) } +// sendOutTasksLoop periodically calls `sendOutTasks` to process left over pending tasks. +func (m *importManager) sendOutTasksLoop(wg *sync.WaitGroup) { + defer wg.Done() + ticker := time.NewTicker(time.Duration(checkPendingTasksInterval) * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + log.Debug("import manager context done, exit check sendOutTasksLoop") + return + case <-ticker.C: + log.Debug("sending out tasks") + m.sendOutTasks(m.ctx) + } + } +} + +// expireOldTasksLoop starts a loop that checks and expires old tasks every `ImportTaskExpiration` seconds. +func (m *importManager) expireOldTasksLoop(wg *sync.WaitGroup) { + defer wg.Done() + ticker := time.NewTicker(time.Duration(expireOldTasksInterval) * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + log.Info("(in loop) import manager context done, exit expireOldTasksLoop") + return + case <-ticker.C: + log.Debug("(in loop) starting expiring old tasks...", + zap.Duration("cleaning up interval", time.Duration(expireOldTasksInterval)*time.Millisecond)) + m.expireOldTasks() + } + } +} + // sendOutTasks pushes all pending tasks to DataCoord, gets DataCoord response and re-add these tasks as working tasks. func (m *importManager) sendOutTasks(ctx context.Context) error { m.pendingLock.Lock() @@ -113,10 +156,6 @@ func (m *importManager) sendOutTasks(ctx context.Context) error { // Trigger Import() action to DataCoord. for len(m.pendingTasks) > 0 { task := m.pendingTasks[0] - // Skip failed (mostly like expired) tasks. - if task.GetState().GetStateCode() == commonpb.ImportState_ImportFailed { - continue - } // TODO: Use ImportTaskInfo directly. it := &datapb.ImportTask{ CollectionId: task.GetCollectionId(), @@ -133,32 +172,33 @@ func (m *importManager) sendOutTasks(ctx context.Context) error { }, } - log.Debug("sending import task to DataCoord", zap.Int64("taskID", task.GetId())) // Get all busy dataNodes for reference. var busyNodeList []int64 for k := range m.busyNodes { busyNodeList = append(busyNodeList, k) } - // Call DataCoord.Import(). + // Send import task to dataCoord, which will then distribute the import task to dataNode. resp := m.callImportService(ctx, &datapb.ImportTaskRequest{ ImportTask: it, WorkingNodes: busyNodeList, }) - if resp.Status.ErrorCode == commonpb.ErrorCode_UnexpectedError { - log.Debug("import task is rejected", zap.Int64("task ID", it.GetTaskId())) + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + log.Warn("import task is rejected", + zap.Int64("task ID", it.GetTaskId()), + zap.Any("error code", resp.GetStatus().GetErrorCode()), + zap.String("cause", resp.GetStatus().GetReason())) break } + + // Successfully assigned dataNode for the import task. Add task to working task list and update task store. task.DatanodeId = resp.GetDatanodeId() - log.Debug("import task successfully assigned to DataNode", + log.Debug("import task successfully assigned to dataNode", zap.Int64("task ID", it.GetTaskId()), zap.Int64("dataNode ID", task.GetDatanodeId())) // Add new working dataNode to busyNodes. m.busyNodes[resp.GetDatanodeId()] = true - // erase this task from head of pending list if the callImportService succeed - m.pendingTasks = append(m.pendingTasks[:0], m.pendingTasks[1:]...) - func() { m.workingLock.Lock() defer m.workingLock.Unlock() @@ -168,6 +208,9 @@ func (m *importManager) sendOutTasks(ctx context.Context) error { m.workingTasks[task.GetId()] = task m.updateImportTaskStore(task) }() + + // Erase this task from head of pending list. + m.pendingTasks = append(m.pendingTasks[:0], m.pendingTasks[1:]...) } return nil @@ -503,25 +546,6 @@ func (m *importManager) updateImportTaskStore(ti *datapb.ImportTaskInfo) error { return nil } -// expireOldTasksLoop starts a loop that checks and expires old tasks every `ImportTaskExpiration` seconds. -func (m *importManager) expireOldTasksLoop(wg *sync.WaitGroup) { - defer wg.Done() - ticker := time.NewTicker(time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000) * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-m.ctx.Done(): - log.Info("(in loop) import manager context done, exit expireOldTasksLoop") - return - case <-ticker.C: - log.Info("(in loop) starting expiring old tasks...", - zap.Duration("cleaning up interval", - time.Duration(Params.RootCoordCfg.ImportTaskExpiration*1000)*time.Millisecond)) - m.expireOldTasks() - } - } -} - // expireOldTasks marks expires tasks as failed. func (m *importManager) expireOldTasks() { // Expire old pending tasks, if any. diff --git a/internal/rootcoord/import_manager_test.go b/internal/rootcoord/import_manager_test.go index e24f88a7d0..9211479b26 100644 --- a/internal/rootcoord/import_manager_test.go +++ b/internal/rootcoord/import_manager_test.go @@ -48,7 +48,9 @@ func TestImportManager_NewImportManager(t *testing.T) { return globalCount, 0, nil } Params.RootCoordCfg.ImportTaskSubPath = "test_import_task" - Params.RootCoordCfg.ImportTaskExpiration = 1 + Params.RootCoordCfg.ImportTaskExpiration = 100 + checkPendingTasksInterval = 100 + expireOldTasksInterval = 100 mockKv := &kv.MockMetaKV{} mockKv.InMemKv = make(map[string]string) ti1 := &datapb.ImportTaskInfo{ @@ -90,8 +92,9 @@ func TestImportManager_NewImportManager(t *testing.T) { assert.NotNil(t, mgr) mgr.init(ctx) var wgLoop sync.WaitGroup - wgLoop.Add(1) + wgLoop.Add(2) mgr.expireOldTasksLoop(&wgLoop) + mgr.sendOutTasksLoop(&wgLoop) wgLoop.Wait() }) @@ -104,8 +107,9 @@ func TestImportManager_NewImportManager(t *testing.T) { assert.NotNil(t, mgr) mgr.init(context.TODO()) var wgLoop sync.WaitGroup - wgLoop.Add(1) + wgLoop.Add(2) mgr.expireOldTasksLoop(&wgLoop) + mgr.sendOutTasksLoop(&wgLoop) wgLoop.Wait() }) @@ -125,8 +129,25 @@ func TestImportManager_NewImportManager(t *testing.T) { }) mgr.loadFromTaskStore() var wgLoop sync.WaitGroup - wgLoop.Add(1) + wgLoop.Add(2) mgr.expireOldTasksLoop(&wgLoop) + mgr.sendOutTasksLoop(&wgLoop) + wgLoop.Wait() + }) + + wg.Add(1) + t.Run("check init", func(t *testing.T) { + defer wg.Done() + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + mgr := newImportManager(ctx, mockKv, idAlloc, fn) + assert.NotNil(t, mgr) + mgr.init(ctx) + var wgLoop sync.WaitGroup + wgLoop.Add(2) + mgr.expireOldTasksLoop(&wgLoop) + mgr.sendOutTasksLoop(&wgLoop) + time.Sleep(500 * time.Millisecond) wgLoop.Wait() }) @@ -233,6 +254,88 @@ func TestImportManager_ImportJob(t *testing.T) { assert.Equal(t, 2, len(mgr.workingTasks)) } +func TestImportManager_AllDataNodesBusy(t *testing.T) { + var countLock sync.RWMutex + var globalCount = typeutil.UniqueID(0) + + var idAlloc = func(count uint32) (typeutil.UniqueID, typeutil.UniqueID, error) { + countLock.Lock() + defer countLock.Unlock() + globalCount++ + return globalCount, 0, nil + } + Params.RootCoordCfg.ImportTaskSubPath = "test_import_task" + colID := int64(100) + mockKv := &kv.MockMetaKV{} + mockKv.InMemKv = make(map[string]string) + rowReq := &milvuspb.ImportRequest{ + CollectionName: "c1", + PartitionName: "p1", + RowBased: true, + Files: []string{"f1", "f2", "f3"}, + } + colReq := &milvuspb.ImportRequest{ + CollectionName: "c1", + PartitionName: "p1", + RowBased: false, + Files: []string{"f1", "f2"}, + Options: []*commonpb.KeyValuePair{ + { + Key: Bucket, + Value: "mybucket", + }, + }, + } + + dnList := []int64{1, 2, 3} + count := 0 + fn := func(ctx context.Context, req *datapb.ImportTaskRequest) *datapb.ImportTaskResponse { + if count < len(dnList) { + count++ + return &datapb.ImportTaskResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + }, + DatanodeId: dnList[count-1], + } + } + return &datapb.ImportTaskResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + }, + } + } + + mgr := newImportManager(context.TODO(), mockKv, idAlloc, fn) + mgr.importJob(context.TODO(), rowReq, colID, 0) + assert.Equal(t, 0, len(mgr.pendingTasks)) + assert.Equal(t, len(rowReq.Files), len(mgr.workingTasks)) + + mgr = newImportManager(context.TODO(), mockKv, idAlloc, fn) + mgr.importJob(context.TODO(), rowReq, colID, 0) + assert.Equal(t, len(rowReq.Files), len(mgr.pendingTasks)) + assert.Equal(t, 0, len(mgr.workingTasks)) + + // Reset count. + count = 0 + mgr = newImportManager(context.TODO(), mockKv, idAlloc, fn) + mgr.importJob(context.TODO(), colReq, colID, 0) + assert.Equal(t, 0, len(mgr.pendingTasks)) + assert.Equal(t, 1, len(mgr.workingTasks)) + + mgr.importJob(context.TODO(), colReq, colID, 0) + assert.Equal(t, 0, len(mgr.pendingTasks)) + assert.Equal(t, 2, len(mgr.workingTasks)) + + mgr.importJob(context.TODO(), colReq, colID, 0) + assert.Equal(t, 0, len(mgr.pendingTasks)) + assert.Equal(t, 3, len(mgr.workingTasks)) + + mgr.importJob(context.TODO(), colReq, colID, 0) + assert.Equal(t, 1, len(mgr.pendingTasks)) + assert.Equal(t, 3, len(mgr.workingTasks)) +} + func TestImportManager_TaskState(t *testing.T) { var countLock sync.RWMutex var globalCount = typeutil.UniqueID(0) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 84e21a959a..0e60805e2e 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -707,19 +707,13 @@ func (c *Core) SetDataCoord(ctx context.Context, s types.DataCoord) error { ErrorCode: commonpb.ErrorCode_Success, }, } - defer func() { if err := recover(); err != nil { resp.Status.ErrorCode = commonpb.ErrorCode_UnexpectedError resp.Status.Reason = "assign import task to data coord panic" } }() - resp, _ = s.Import(ctx, req) - if resp.Status.ErrorCode != commonpb.ErrorCode_Success { - return resp - } - return resp } @@ -1296,12 +1290,13 @@ func (c *Core) Start() error { log.Fatal("RootCoord Start reSendDdMsg failed", zap.Error(err)) panic(err) } - c.wg.Add(5) + c.wg.Add(6) go c.startTimeTickLoop() go c.tsLoop() go c.chanTimeTick.startWatch(&c.wg) go c.checkFlushedSegmentsLoop() go c.importManager.expireOldTasksLoop(&c.wg) + go c.importManager.sendOutTasksLoop(&c.wg) Params.RootCoordCfg.CreatedTime = time.Now() Params.RootCoordCfg.UpdatedTime = time.Now() })