From a61668c77e043887d33d307b18e409a40ff6d287 Mon Sep 17 00:00:00 2001 From: "yihao.dai" Date: Sun, 15 Sep 2024 15:17:08 +0800 Subject: [PATCH] feat: Introduce stats task for import (#35868) This PR introduce stats task for import: 1. Define new `Stats` and `IndexBuilding` states for importJob 2. Add new stats step to the import process: trigger the stats task and wait for its completion 3. Abort stats task if import job failed issue: https://github.com/milvus-io/milvus/issues/33744 --------- Signed-off-by: bigsheeper --- Makefile | 1 + internal/datacoord/import_checker.go | 166 ++++++++++--- internal/datacoord/import_checker_test.go | 94 ++++--- internal/datacoord/import_scheduler.go | 10 +- internal/datacoord/import_scheduler_test.go | 14 +- internal/datacoord/import_task.go | 8 + internal/datacoord/import_util.go | 128 ++++++---- internal/datacoord/import_util_test.go | 57 +++-- internal/datacoord/job_manager.go | 24 +- internal/datacoord/mock_compaction_meta.go | 43 ---- internal/datacoord/mock_job_manager.go | 230 ++++++++++++++++++ internal/datacoord/server.go | 4 +- internal/datacoord/services.go | 4 +- internal/datacoord/stats_task_meta.go | 8 +- internal/datacoord/task_scheduler_test.go | 20 +- internal/proto/data_coord.proto | 1 + internal/proto/internal.proto | 2 + tests/integration/import/binlog_test.go | 7 +- .../integration/import/dynamic_field_test.go | 2 +- tests/integration/import/import_test.go | 2 +- .../integration/import/partition_key_test.go | 2 +- 21 files changed, 599 insertions(+), 228 deletions(-) create mode 100644 internal/datacoord/mock_job_manager.go diff --git a/Makefile b/Makefile index 6443de2604..4fe53268a1 100644 --- a/Makefile +++ b/Makefile @@ -498,6 +498,7 @@ generate-mockery-datacoord: getdeps $(INSTALL_PATH)/mockery --name=Broker --dir=internal/datacoord/broker --filename=mock_coordinator_broker.go --output=internal/datacoord/broker --structname=MockBroker --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=WorkerManager --dir=internal/datacoord/session --filename=mock_worker_manager.go --output=internal/datacoord/session --structname=MockWorkerManager --with-expecter --inpackage $(INSTALL_PATH)/mockery --name=Manager --dir=internal/datacoord --filename=mock_segment_manager.go --output=internal/datacoord --structname=MockManager --with-expecter --inpackage + $(INSTALL_PATH)/mockery --name=StatsJobManager --dir=internal/datacoord --filename=mock_job_manager.go --output=internal/datacoord --structname=MockStatsJobManager --with-expecter --inpackage generate-mockery-datanode: getdeps $(INSTALL_PATH)/mockery --name=Allocator --dir=$(PWD)/internal/datanode/allocator --output=$(PWD)/internal/datanode/allocator --filename=mock_allocator.go --with-expecter --structname=MockAllocator --outpkg=allocator --inpackage diff --git a/internal/datacoord/import_checker.go b/internal/datacoord/import_checker.go index 7f48e680bb..99bfd4e75a 100644 --- a/internal/datacoord/import_checker.go +++ b/internal/datacoord/import_checker.go @@ -28,6 +28,7 @@ import ( "github.com/milvus-io/milvus/internal/datacoord/allocator" "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/log" @@ -47,6 +48,7 @@ type importChecker struct { alloc allocator.Allocator sm Manager imeta ImportMeta + sjm StatsJobManager closeOnce sync.Once closeChan chan struct{} @@ -58,6 +60,7 @@ func NewImportChecker(meta *meta, alloc allocator.Allocator, sm Manager, imeta ImportMeta, + sjm StatsJobManager, ) ImportChecker { return &importChecker{ meta: meta, @@ -66,6 +69,7 @@ func NewImportChecker(meta *meta, alloc: alloc, sm: sm, imeta: imeta, + sjm: sjm, closeChan: make(chan struct{}), } } @@ -93,8 +97,12 @@ func (c *importChecker) Start() { c.checkPreImportingJob(job) case internalpb.ImportJobState_Importing: c.checkImportingJob(job) + case internalpb.ImportJobState_Stats: + c.checkStatsJob(job) + case internalpb.ImportJobState_IndexBuilding: + c.checkIndexBuildingJob(job) case internalpb.ImportJobState_Failed: - c.tryFailingTasks(job) + c.checkFailedJob(job) } } case <-ticker2.C: @@ -178,6 +186,7 @@ func (c *importChecker) getLackFilesForImports(job ImportJob) []*datapb.ImportFi } func (c *importChecker) checkPendingJob(job ImportJob) { + logger := log.With(zap.Int64("jobID", job.GetJobID())) lacks := c.getLackFilesForPreImports(job) if len(lacks) == 0 { return @@ -186,7 +195,7 @@ func (c *importChecker) checkPendingJob(job ImportJob) { newTasks, err := NewPreImportTasks(fileGroups, job, c.alloc) if err != nil { - log.Warn("new preimport tasks failed", zap.Error(err)) + logger.Warn("new preimport tasks failed", zap.Error(err)) return } for _, t := range newTasks { @@ -199,11 +208,12 @@ func (c *importChecker) checkPendingJob(job ImportJob) { } err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) if err != nil { - log.Warn("failed to update job state to PreImporting", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + logger.Warn("failed to update job state to PreImporting", zap.Error(err)) } } func (c *importChecker) checkPreImportingJob(job ImportJob) { + logger := log.With(zap.Int64("jobID", job.GetJobID())) lacks := c.getLackFilesForImports(job) if len(lacks) == 0 { return @@ -211,10 +221,10 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { requestSize, err := CheckDiskQuota(job, c.meta, c.imeta) if err != nil { - log.Warn("import failed, disk quota exceeded", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + logger.Warn("import failed, disk quota exceeded", zap.Error(err)) err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(err.Error())) if err != nil { - log.Warn("failed to update job state to Failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + logger.Warn("failed to update job state to Failed", zap.Error(err)) } return } @@ -223,7 +233,7 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { groups := RegroupImportFiles(job, lacks, allDiskIndex) newTasks, err := NewImportTasks(groups, job, c.sm, c.alloc) if err != nil { - log.Warn("new import tasks failed", zap.Error(err)) + logger.Warn("new import tasks failed", zap.Error(err)) return } for _, t := range newTasks { @@ -236,49 +246,124 @@ func (c *importChecker) checkPreImportingJob(job ImportJob) { } err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing), UpdateRequestedDiskSize(requestSize)) if err != nil { - log.Warn("failed to update job state to Importing", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + logger.Warn("failed to update job state to Importing", zap.Error(err)) } } func (c *importChecker) checkImportingJob(job ImportJob) { - log := log.With(zap.Int64("jobID", job.GetJobID()), - zap.Int64("collectionID", job.GetCollectionID())) tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) for _, t := range tasks { if t.GetState() != datapb.ImportTaskStateV2_Completed { return } } + err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats)) + if err != nil { + log.Warn("failed to update job state to Stats", zap.Error(err)) + return + } + log.Info("update import job state to Stats", zap.Int64("jobID", job.GetJobID())) +} - segmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { - return t.(*importTask).GetSegmentIDs() - }) +func (c *importChecker) checkStatsJob(job ImportJob) { + logger := log.With(zap.Int64("jobID", job.GetJobID())) + updateJobState := func(state internalpb.ImportJobState) { + err := c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(state)) + if err != nil { + logger.Warn("failed to update job state", zap.Error(err)) + return + } + logger.Info("update import job state", zap.String("state", state.String())) + } - // Verify completion of index building for imported segments. - unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), segmentIDs) - if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 && !importutilv2.IsL0Import(job.GetOptions()) { - log.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) + // Skip stats stage if not enable stats or is l0 import. + if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() || importutilv2.IsL0Import(job.GetOptions()) { + updateJobState(internalpb.ImportJobState_IndexBuilding) return } - unfinished := lo.Filter(segmentIDs, func(segmentID int64, _ int) bool { + // Check and trigger stats tasks. + var ( + taskCnt = 0 + doneCnt = 0 + ) + tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) + for _, task := range tasks { + originSegmentIDs := task.(*importTask).GetSegmentIDs() + statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs() + taskCnt += len(originSegmentIDs) + for i, originSegmentID := range originSegmentIDs { + state := c.sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort) + switch state { + case indexpb.JobState_JobStateNone: + err := c.sjm.SubmitStatsTask(originSegmentID, statsSegmentIDs[i], indexpb.StatsSubJob_Sort, false) + if err != nil { + logger.Warn("submit stats task failed", zap.Error(err)) + continue + } + log.Info("submit stats task done", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...) + case indexpb.JobState_JobStateInit, indexpb.JobState_JobStateRetry, indexpb.JobState_JobStateInProgress: + logger.Debug("waiting for stats task...", WrapTaskLog(task, zap.Int64("origin", originSegmentID), zap.Int64("stats", statsSegmentIDs[i]))...) + case indexpb.JobState_JobStateFailed: + updateJobState(internalpb.ImportJobState_Failed) + return + case indexpb.JobState_JobStateFinished: + doneCnt++ + } + } + } + + // All segments are stats-ed. Update job state to `IndexBuilding`. + if taskCnt == doneCnt { + updateJobState(internalpb.ImportJobState_IndexBuilding) + } +} + +func (c *importChecker) checkIndexBuildingJob(job ImportJob) { + logger := log.With(zap.Int64("jobID", job.GetJobID())) + tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) + originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetSegmentIDs() + }) + statsSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetStatsSegmentIDs() + }) + + targetSegmentIDs := statsSegmentIDs + if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() { + targetSegmentIDs = originSegmentIDs + } + + unindexed := c.meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), targetSegmentIDs) + if Params.DataCoordCfg.WaitForIndex.GetAsBool() && len(unindexed) > 0 && !importutilv2.IsL0Import(job.GetOptions()) { + for _, segmentID := range unindexed { + select { + case getBuildIndexChSingleton() <- segmentID: // accelerate index building: + default: + } + } + logger.Debug("waiting for import segments building index...", zap.Int64s("unindexed", unindexed)) + return + } + + // Here, all segment indexes have been successfully built, try unset isImporting flag for all segments. + isImportingSegments := lo.Filter(append(originSegmentIDs, statsSegmentIDs...), func(segmentID int64, _ int) bool { segment := c.meta.GetSegment(segmentID) if segment == nil { - log.Warn("cannot find segment, may be compacted", zap.Int64("segmentID", segmentID)) + logger.Warn("cannot find segment", zap.Int64("segmentID", segmentID)) return false } return segment.GetIsImporting() }) - - channels, err := c.meta.GetSegmentsChannels(unfinished) + channels, err := c.meta.GetSegmentsChannels(isImportingSegments) if err != nil { - log.Warn("get segments channels failed", zap.Error(err)) + logger.Warn("get segments channels failed", zap.Error(err)) return } - for _, segmentID := range unfinished { + for _, segmentID := range isImportingSegments { channelCP := c.meta.GetChannelCheckpoint(channels[segmentID]) if channelCP == nil { - log.Warn("nil channel checkpoint") + logger.Warn("nil channel checkpoint") return } op1 := UpdateStartPosition([]*datapb.SegmentStartPosition{{StartPosition: channelCP, SegmentID: segmentID}}) @@ -286,18 +371,34 @@ func (c *importChecker) checkImportingJob(job ImportJob) { op3 := UpdateIsImporting(segmentID, false) err = c.meta.UpdateSegmentsInfo(op1, op2, op3) if err != nil { - log.Warn("update import segment failed", zap.Error(err)) + logger.Warn("update import segment failed", zap.Error(err)) return } } + // all finished, update import job state to `Completed`. completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = c.imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed), UpdateJobCompleteTime(completeTime)) if err != nil { - log.Warn("failed to update job state to Completed", zap.Error(err)) + logger.Warn("failed to update job state to Completed", zap.Error(err)) return } - log.Info("import job completed") + logger.Info("import job completed") +} + +func (c *importChecker) checkFailedJob(job ImportJob) { + tasks := c.imeta.GetTaskBy(WithType(ImportTaskType), WithJob(job.GetJobID())) + originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetSegmentIDs() + }) + for _, originSegmentID := range originSegmentIDs { + err := c.sjm.DropStatsTask(originSegmentID, indexpb.StatsSubJob_Sort) + if err != nil { + log.Warn("Drop stats task failed", zap.Int64("jobID", job.GetJobID())) + return + } + } + c.tryFailingTasks(job) } func (c *importChecker) tryFailingTasks(job ImportJob) { @@ -306,8 +407,8 @@ func (c *importChecker) tryFailingTasks(job ImportJob) { if len(tasks) == 0 { return } - log.Warn("Import job has failed, all tasks with the same jobID"+ - " will be marked as failed", zap.Int64("jobID", job.GetJobID())) + log.Warn("Import job has failed, all tasks with the same jobID will be marked as failed", + zap.Int64("jobID", job.GetJobID()), zap.String("reason", job.GetReason())) for _, task := range tasks { err := c.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Failed), UpdateReason(job.GetReason())) @@ -364,14 +465,15 @@ func (c *importChecker) checkGC(job ImportJob) { } cleanupTime := tsoutil.PhysicalTime(job.GetCleanupTs()) if time.Now().After(cleanupTime) { + logger := log.With(zap.Int64("jobID", job.GetJobID())) GCRetention := Params.DataCoordCfg.ImportTaskRetention.GetAsDuration(time.Second) - log.Info("job has reached the GC retention", zap.Int64("jobID", job.GetJobID()), + logger.Info("job has reached the GC retention", zap.Time("cleanupTime", cleanupTime), zap.Duration("GCRetention", GCRetention)) tasks := c.imeta.GetTaskBy(WithJob(job.GetJobID())) shouldRemoveJob := true for _, task := range tasks { if job.GetState() == internalpb.ImportJobState_Failed && task.GetType() == ImportTaskType { - if len(task.(*importTask).GetSegmentIDs()) != 0 { + if len(task.(*importTask).GetSegmentIDs()) != 0 || len(task.(*importTask).GetStatsSegmentIDs()) != 0 { shouldRemoveJob = false continue } @@ -393,9 +495,9 @@ func (c *importChecker) checkGC(job ImportJob) { } err := c.imeta.RemoveJob(job.GetJobID()) if err != nil { - log.Warn("remove import job failed", zap.Int64("jobID", job.GetJobID()), zap.Error(err)) + logger.Warn("remove import job failed", zap.Error(err)) return } - log.Info("import job removed", zap.Int64("jobID", job.GetJobID())) + logger.Info("import job removed") } } diff --git a/internal/datacoord/import_checker_test.go b/internal/datacoord/import_checker_test.go index 43c3a2959f..714c6a2dc5 100644 --- a/internal/datacoord/import_checker_test.go +++ b/internal/datacoord/import_checker_test.go @@ -32,6 +32,7 @@ import ( broker2 "github.com/milvus-io/milvus/internal/datacoord/broker" "github.com/milvus-io/milvus/internal/metastore/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -72,7 +73,9 @@ func (s *ImportCheckerSuite) SetupTest() { broker := broker2.NewMockBroker(s.T()) sm := NewMockManager(s.T()) - checker := NewImportChecker(meta, broker, cluster, s.alloc, sm, imeta).(*importChecker) + sjm := NewMockStatsJobManager(s.T()) + + checker := NewImportChecker(meta, broker, cluster, s.alloc, sm, imeta, sjm).(*importChecker) s.checker = checker job := &importJob{ @@ -174,7 +177,7 @@ func (s *ImportCheckerSuite) TestCheckJob() { s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(job.GetJobID()).GetState()) // test checkImportingJob - s.checker.checkImportingJob(job) // not completed + s.checker.checkImportingJob(job) s.Equal(internalpb.ImportJobState_Importing, s.imeta.GetJob(job.GetJobID()).GetState()) for _, t := range importTasks { task := s.imeta.GetTask(t.GetTaskID()) @@ -198,12 +201,34 @@ func (s *ImportCheckerSuite) TestCheckJob() { err := s.checker.meta.AddSegment(context.Background(), segment) s.NoError(err) err = s.imeta.UpdateTask(t.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), - UpdateSegmentIDs([]int64{segment.GetID()})) + UpdateSegmentIDs([]int64{segment.GetID()}), UpdateStatsSegmentIDs([]int64{rand.Int63()})) s.NoError(err) err = s.checker.meta.UpdateChannelCheckpoint(segment.GetInsertChannel(), &msgpb.MsgPosition{MsgID: []byte{0}}) s.NoError(err) } s.checker.checkImportingJob(job) + s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(job.GetJobID()).GetState()) + + // test check stats job + alloc.EXPECT().AllocID(mock.Anything).Return(rand.Int63(), nil).Maybe() + sjm := s.checker.sjm.(*MockStatsJobManager) + sjm.EXPECT().SubmitStatsTask(mock.Anything, mock.Anything, mock.Anything, false).Return(nil) + sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateNone) + s.checker.checkStatsJob(job) + s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(job.GetJobID()).GetState()) + sjm = NewMockStatsJobManager(s.T()) + sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateInProgress) + s.checker.sjm = sjm + s.checker.checkStatsJob(job) + s.Equal(internalpb.ImportJobState_Stats, s.imeta.GetJob(job.GetJobID()).GetState()) + sjm = NewMockStatsJobManager(s.T()) + sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateFinished) + s.checker.sjm = sjm + s.checker.checkStatsJob(job) + s.Equal(internalpb.ImportJobState_IndexBuilding, s.imeta.GetJob(job.GetJobID()).GetState()) + + // test check IndexBuilding job + s.checker.checkIndexBuildingJob(job) for _, t := range importTasks { task := s.imeta.GetTask(t.GetTaskID()) for _, id := range task.(*importTask).GetSegmentIDs() { @@ -298,39 +323,40 @@ func (s *ImportCheckerSuite) TestCheckTimeout() { func (s *ImportCheckerSuite) TestCheckFailure() { catalog := s.imeta.(*importMeta).catalog.(*mocks.DataCoordCatalog) - catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil) + catalog.EXPECT().SaveImportTask(mock.Anything).Return(nil) - pit1 := &preImportTask{ - PreImportTask: &datapb.PreImportTask{ - JobID: s.jobID, - TaskID: 1, - State: datapb.ImportTaskStateV2_Pending, + it := &importTask{ + ImportTaskV2: &datapb.ImportTaskV2{ + JobID: s.jobID, + TaskID: 1, + State: datapb.ImportTaskStateV2_Pending, + SegmentIDs: []int64{2}, + StatsSegmentIDs: []int64{3}, }, } - err := s.imeta.AddTask(pit1) + err := s.imeta.AddTask(it) s.NoError(err) - pit2 := &preImportTask{ - PreImportTask: &datapb.PreImportTask{ - JobID: s.jobID, - TaskID: 2, - State: datapb.ImportTaskStateV2_Completed, - }, - } - err = s.imeta.AddTask(pit2) - s.NoError(err) + sjm := NewMockStatsJobManager(s.T()) + sjm.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(errors.New("mock err")) + s.checker.sjm = sjm + s.checker.checkFailedJob(s.imeta.GetJob(s.jobID)) + tasks := s.imeta.GetTaskBy(WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) + s.Equal(0, len(tasks)) + sjm.ExpectedCalls = nil + sjm.EXPECT().DropStatsTask(mock.Anything, mock.Anything).Return(nil) catalog.ExpectedCalls = nil - catalog.EXPECT().SavePreImportTask(mock.Anything).Return(errors.New("mock error")) - s.checker.tryFailingTasks(s.imeta.GetJob(s.jobID)) - tasks := s.imeta.GetTaskBy(WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) + catalog.EXPECT().SaveImportTask(mock.Anything).Return(errors.New("mock error")) + s.checker.checkFailedJob(s.imeta.GetJob(s.jobID)) + tasks = s.imeta.GetTaskBy(WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) s.Equal(0, len(tasks)) catalog.ExpectedCalls = nil - catalog.EXPECT().SavePreImportTask(mock.Anything).Return(nil) - s.checker.tryFailingTasks(s.imeta.GetJob(s.jobID)) + catalog.EXPECT().SaveImportTask(mock.Anything).Return(nil) + s.checker.checkFailedJob(s.imeta.GetJob(s.jobID)) tasks = s.imeta.GetTaskBy(WithJob(s.jobID), WithStates(datapb.ImportTaskStateV2_Failed)) - s.Equal(2, len(tasks)) + s.Equal(1, len(tasks)) } func (s *ImportCheckerSuite) TestCheckGC() { @@ -340,10 +366,11 @@ func (s *ImportCheckerSuite) TestCheckGC() { catalog.EXPECT().SaveImportTask(mock.Anything).Return(nil) var task ImportTask = &importTask{ ImportTaskV2: &datapb.ImportTaskV2{ - JobID: s.jobID, - TaskID: 1, - State: datapb.ImportTaskStateV2_Failed, - SegmentIDs: []int64{2}, + JobID: s.jobID, + TaskID: 1, + State: datapb.ImportTaskStateV2_Failed, + SegmentIDs: []int64{2}, + StatsSegmentIDs: []int64{3}, }, } err := s.imeta.AddTask(task) @@ -367,13 +394,20 @@ func (s *ImportCheckerSuite) TestCheckGC() { err = s.imeta.AddJob(job) s.NoError(err) - // segment not dropped + // origin segment not dropped s.checker.checkGC(s.imeta.GetJob(s.jobID)) s.Equal(1, len(s.imeta.GetTaskBy(WithJob(s.jobID)))) s.Equal(1, len(s.imeta.GetJobBy())) err = s.imeta.UpdateTask(task.GetTaskID(), UpdateSegmentIDs([]int64{})) s.NoError(err) + // stats segment not dropped + s.checker.checkGC(s.imeta.GetJob(s.jobID)) + s.Equal(1, len(s.imeta.GetTaskBy(WithJob(s.jobID)))) + s.Equal(1, len(s.imeta.GetJobBy())) + err = s.imeta.UpdateTask(task.GetTaskID(), UpdateStatsSegmentIDs([]int64{})) + s.NoError(err) + // task is not dropped s.checker.checkGC(s.imeta.GetJob(s.jobID)) s.Equal(1, len(s.imeta.GetTaskBy(WithJob(s.jobID)))) diff --git a/internal/datacoord/import_scheduler.go b/internal/datacoord/import_scheduler.go index 6f7e8f54e3..d0f91aa578 100644 --- a/internal/datacoord/import_scheduler.go +++ b/internal/datacoord/import_scheduler.go @@ -315,10 +315,6 @@ func (s *importScheduler) processInProgressImport(task ImportTask) { log.Warn("update import segment binlogs failed", WrapTaskLog(task, zap.Error(err))...) return } - select { - case getBuildIndexChSingleton() <- info.GetSegmentID(): // accelerate index building: - default: - } } completeTime := time.Now().Format("2006-01-02T15:04:05Z07:00") err = s.imeta.UpdateTask(task.GetTaskID(), UpdateState(datapb.ImportTaskStateV2_Completed), UpdateCompleteTime(completeTime)) @@ -340,7 +336,9 @@ func (s *importScheduler) processCompleted(task ImportTask) { func (s *importScheduler) processFailed(task ImportTask) { if task.GetType() == ImportTaskType { - segments := task.(*importTask).GetSegmentIDs() + originSegmentIDs := task.(*importTask).GetSegmentIDs() + statsSegmentIDs := task.(*importTask).GetStatsSegmentIDs() + segments := append(originSegmentIDs, statsSegmentIDs...) for _, segment := range segments { op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped) err := s.meta.UpdateSegmentsInfo(op) @@ -350,7 +348,7 @@ func (s *importScheduler) processFailed(task ImportTask) { } } if len(segments) > 0 { - err := s.imeta.UpdateTask(task.GetTaskID(), UpdateSegmentIDs(nil)) + err := s.imeta.UpdateTask(task.GetTaskID(), UpdateSegmentIDs(nil), UpdateStatsSegmentIDs(nil)) if err != nil { log.Warn("update import task segments failed", WrapTaskLog(task, zap.Error(err))...) } diff --git a/internal/datacoord/import_scheduler_test.go b/internal/datacoord/import_scheduler_test.go index ff6346d56b..753056d688 100644 --- a/internal/datacoord/import_scheduler_test.go +++ b/internal/datacoord/import_scheduler_test.go @@ -74,7 +74,6 @@ func (s *ImportSchedulerSuite) SetupTest() { }) s.imeta, err = NewImportMeta(s.catalog) s.NoError(err) - s.scheduler = NewImportScheduler(s.meta, s.cluster, s.alloc, s.imeta).(*importScheduler) } @@ -216,12 +215,13 @@ func (s *ImportSchedulerSuite) TestProcessFailed() { s.catalog.EXPECT().SaveImportTask(mock.Anything).Return(nil) var task ImportTask = &importTask{ ImportTaskV2: &datapb.ImportTaskV2{ - JobID: 0, - TaskID: 1, - CollectionID: s.collectionID, - NodeID: 6, - SegmentIDs: []int64{2, 3}, - State: datapb.ImportTaskStateV2_Failed, + JobID: 0, + TaskID: 1, + CollectionID: s.collectionID, + NodeID: 6, + SegmentIDs: []int64{2, 3}, + StatsSegmentIDs: []int64{4, 5}, + State: datapb.ImportTaskStateV2_Failed, }, } err := s.imeta.AddTask(task) diff --git a/internal/datacoord/import_task.go b/internal/datacoord/import_task.go index 1261cd5854..b4bb782eb2 100644 --- a/internal/datacoord/import_task.go +++ b/internal/datacoord/import_task.go @@ -122,6 +122,14 @@ func UpdateSegmentIDs(segmentIDs []UniqueID) UpdateAction { } } +func UpdateStatsSegmentIDs(segmentIDs []UniqueID) UpdateAction { + return func(t ImportTask) { + if task, ok := t.(*importTask); ok { + task.ImportTaskV2.StatsSegmentIDs = segmentIDs + } + } +} + type ImportTask interface { GetJobID() int64 GetTaskID() int64 diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index d72ae49110..84af224e51 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -23,6 +23,8 @@ import ( "sort" "time" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" @@ -103,6 +105,14 @@ func NewImportTasks(fileGroups [][]*datapb.ImportFileStats, return nil, err } task.SegmentIDs = segments + if paramtable.Get().DataCoordCfg.EnableStatsTask.GetAsBool() { + statsSegIDBegin, _, err := alloc.AllocN(int64(len(segments))) + if err != nil { + return nil, err + } + task.StatsSegmentIDs = lo.RangeFrom(statsSegIDBegin, len(segments)) + log.Info("preallocate stats segment ids", WrapTaskLog(task, zap.Int64s("segmentIDs", task.StatsSegmentIDs))...) + } tasks = append(tasks, task) } return tasks, nil @@ -355,11 +365,7 @@ func getPreImportingProgress(jobID int64, imeta ImportMeta) float32 { return float32(len(completedTasks)) / float32(len(tasks)) } -func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, int64, int64) { - var ( - importedRows int64 - totalRows int64 - ) +func getImportRowsInfo(jobID int64, imeta ImportMeta, meta *meta) (importedRows, totalRows int64) { tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType)) segmentIDs := make([]int64, 0) for _, task := range tasks { @@ -369,37 +375,71 @@ func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, i segmentIDs = append(segmentIDs, task.(*importTask).GetSegmentIDs()...) } importedRows = meta.GetSegmentsTotalCurrentRows(segmentIDs) - var importingProgress float32 = 1 - if totalRows != 0 { - importingProgress = float32(importedRows) / float32(totalRows) - } - - var ( - unsetIsImportingSegment int64 - totalSegment int64 - ) - for _, task := range tasks { - segmentIDs := task.(*importTask).GetSegmentIDs() - for _, segmentID := range segmentIDs { - segment := meta.GetSegment(segmentID) - if segment == nil { - log.Warn("cannot find segment, may be compacted", WrapTaskLog(task, zap.Int64("segmentID", segmentID))...) - continue - } - totalSegment++ - if !segment.GetIsImporting() { - unsetIsImportingSegment++ - } - } - } - var completedProgress float32 = 1 - if totalSegment != 0 { - completedProgress = float32(unsetIsImportingSegment) / float32(totalSegment) - } - return importingProgress*0.5 + completedProgress*0.5, importedRows, totalRows + return } -func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalpb.ImportJobState, int64, int64, string) { +func getImportingProgress(jobID int64, imeta ImportMeta, meta *meta) (float32, int64, int64) { + importedRows, totalRows := getImportRowsInfo(jobID, imeta, meta) + if totalRows == 0 { + return 1, importedRows, totalRows + } + return float32(importedRows) / float32(totalRows), importedRows, totalRows +} + +func getStatsProgress(jobID int64, imeta ImportMeta, sjm StatsJobManager) float32 { + if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() { + return 1 + } + tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType)) + originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetSegmentIDs() + }) + if len(originSegmentIDs) == 0 { + return 1 + } + doneCnt := 0 + for _, originSegmentID := range originSegmentIDs { + state := sjm.GetStatsTaskState(originSegmentID, indexpb.StatsSubJob_Sort) + if state == indexpb.JobState_JobStateFinished { + doneCnt++ + } + } + return float32(doneCnt) / float32(len(originSegmentIDs)) +} + +func getIndexBuildingProgress(jobID int64, imeta ImportMeta, meta *meta) float32 { + job := imeta.GetJob(jobID) + if !Params.DataCoordCfg.WaitForIndex.GetAsBool() { + return 1 + } + tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType)) + originSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetSegmentIDs() + }) + targetSegmentIDs := lo.FlatMap(tasks, func(t ImportTask, _ int) []int64 { + return t.(*importTask).GetStatsSegmentIDs() + }) + if len(originSegmentIDs) == 0 { + return 1 + } + if !Params.DataCoordCfg.EnableStatsTask.GetAsBool() { + targetSegmentIDs = originSegmentIDs + } + unindexed := meta.indexMeta.GetUnindexedSegments(job.GetCollectionID(), targetSegmentIDs) + return float32(len(targetSegmentIDs)-len(unindexed)) / float32(len(targetSegmentIDs)) +} + +// GetJobProgress calculates the importing job progress. +// The weight of each status is as follows: +// 10%: Pending +// 30%: PreImporting +// 30%: Importing +// 10%: Stats +// 10%: IndexBuilding +// 10%: Completed +// TODO: Wrap a function to map status to user status. +// TODO: Save these progress to job instead of recalculating. +func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta, sjm StatsJobManager) (int64, internalpb.ImportJobState, int64, int64, string) { job := imeta.GetJob(jobID) if job == nil { return 0, internalpb.ImportJobState_Failed, 0, 0, fmt.Sprintf("import job does not exist, jobID=%d", jobID) @@ -415,16 +455,20 @@ func GetJobProgress(jobID int64, imeta ImportMeta, meta *meta) (int64, internalp case internalpb.ImportJobState_Importing: progress, importedRows, totalRows := getImportingProgress(jobID, imeta, meta) - return 10 + 30 + int64(progress*60), internalpb.ImportJobState_Importing, importedRows, totalRows, "" + return 10 + 30 + int64(progress*30), internalpb.ImportJobState_Importing, importedRows, totalRows, "" + + case internalpb.ImportJobState_Stats: + progress := getStatsProgress(jobID, imeta, sjm) + _, totalRows := getImportRowsInfo(jobID, imeta, meta) + return 10 + 30 + 30 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, "" + + case internalpb.ImportJobState_IndexBuilding: + progress := getIndexBuildingProgress(jobID, imeta, meta) + _, totalRows := getImportRowsInfo(jobID, imeta, meta) + return 10 + 30 + 30 + 10 + int64(progress*10), internalpb.ImportJobState_Importing, totalRows, totalRows, "" case internalpb.ImportJobState_Completed: - totalRows := int64(0) - tasks := imeta.GetTaskBy(WithJob(jobID), WithType(ImportTaskType)) - for _, task := range tasks { - totalRows += lo.SumBy(task.GetFileStats(), func(file *datapb.ImportFileStats) int64 { - return file.GetTotalRows() - }) - } + _, totalRows := getImportRowsInfo(jobID, imeta, meta) return 100, internalpb.ImportJobState_Completed, totalRows, totalRows, "" case internalpb.ImportJobState_Failed: diff --git a/internal/datacoord/import_util_test.go b/internal/datacoord/import_util_test.go index 49a4578b64..59e27e8f50 100644 --- a/internal/datacoord/import_util_test.go +++ b/internal/datacoord/import_util_test.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/mocks" mocks2 "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" @@ -556,13 +557,14 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // failed state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Failed), UpdateJobReason(mockErr)) assert.NoError(t, err) - progress, state, _, _, reason := GetJobProgress(job.GetJobID(), imeta, meta) + + progress, state, _, _, reason := GetJobProgress(job.GetJobID(), imeta, meta, nil) assert.Equal(t, int64(0), progress) assert.Equal(t, internalpb.ImportJobState_Failed, state) assert.Equal(t, mockErr, reason) // job does not exist - progress, state, _, _, reason = GetJobProgress(-1, imeta, meta) + progress, state, _, _, reason = GetJobProgress(-1, imeta, meta, nil) assert.Equal(t, int64(0), progress) assert.Equal(t, internalpb.ImportJobState_Failed, state) assert.NotEqual(t, "", reason) @@ -570,7 +572,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // pending state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Pending)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) + progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) assert.Equal(t, int64(10), progress) assert.Equal(t, internalpb.ImportJobState_Pending, state) assert.Equal(t, "", reason) @@ -578,7 +580,7 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // preImporting state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_PreImporting)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) + progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) assert.Equal(t, int64(10+30), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) @@ -586,19 +588,15 @@ func TestImportUtil_GetImportProgress(t *testing.T) { // importing state, segmentImportedRows/totalRows = 0.5 err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Importing)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) + progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) assert.Equal(t, int64(10+30+30*0.5), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) - // importing state, segmentImportedRows/totalRows = 1, partial segments is in importing state - op1 := UpdateIsImporting(10, false) - op2 := UpdateImportedRows(10, 100) - err = meta.UpdateSegmentsInfo(op1, op2) + // importing state, segmentImportedRows/totalRows = 1 + err = meta.UpdateSegmentsInfo(UpdateImportedRows(10, 100)) assert.NoError(t, err) - op1 = UpdateIsImporting(20, false) - op2 = UpdateImportedRows(20, 100) - err = meta.UpdateSegmentsInfo(op1, op2) + err = meta.UpdateSegmentsInfo(UpdateImportedRows(20, 100)) assert.NoError(t, err) err = meta.UpdateSegmentsInfo(UpdateImportedRows(11, 100)) assert.NoError(t, err) @@ -608,29 +606,38 @@ func TestImportUtil_GetImportProgress(t *testing.T) { assert.NoError(t, err) err = meta.UpdateSegmentsInfo(UpdateImportedRows(22, 100)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(float32(10+30+30+30*2/6)), progress) + progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, nil) + assert.Equal(t, int64(float32(10+30+30)), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) - // importing state, no segment is in importing state - err = meta.UpdateSegmentsInfo(UpdateIsImporting(11, false)) + // stats state, len(statsSegmentIDs) / (len(originalSegmentIDs) = 0.5 + err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Stats)) assert.NoError(t, err) - err = meta.UpdateSegmentsInfo(UpdateIsImporting(12, false)) - assert.NoError(t, err) - err = meta.UpdateSegmentsInfo(UpdateIsImporting(21, false)) - assert.NoError(t, err) - err = meta.UpdateSegmentsInfo(UpdateIsImporting(22, false)) - assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) - assert.Equal(t, int64(10+40+40+10), progress) + sjm := NewMockStatsJobManager(t) + sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).RunAndReturn(func(segmentID int64, _ indexpb.StatsSubJob) indexpb.JobState { + if lo.Contains([]int64{10, 11, 12}, segmentID) { + return indexpb.JobState_JobStateFinished + } + return indexpb.JobState_JobStateInProgress + }) + progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) + assert.Equal(t, int64(10+30+30+10*0.5), progress) + assert.Equal(t, internalpb.ImportJobState_Importing, state) + assert.Equal(t, "", reason) + + // stats state, len(statsSegmentIDs) / (len(originalSegmentIDs) = 1 + sjm = NewMockStatsJobManager(t) + sjm.EXPECT().GetStatsTaskState(mock.Anything, mock.Anything).Return(indexpb.JobState_JobStateFinished) + progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) + assert.Equal(t, int64(10+30+30+10), progress) assert.Equal(t, internalpb.ImportJobState_Importing, state) assert.Equal(t, "", reason) // completed state err = imeta.UpdateJob(job.GetJobID(), UpdateJobState(internalpb.ImportJobState_Completed)) assert.NoError(t, err) - progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta) + progress, state, _, _, reason = GetJobProgress(job.GetJobID(), imeta, meta, sjm) assert.Equal(t, int64(100), progress) assert.Equal(t, internalpb.ImportJobState_Completed, state) assert.Equal(t, "", reason) diff --git a/internal/datacoord/job_manager.go b/internal/datacoord/job_manager.go index 4bb5fa2e3e..44a7cc5901 100644 --- a/internal/datacoord/job_manager.go +++ b/internal/datacoord/job_manager.go @@ -16,7 +16,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) -type StatsTaskManager interface { +type StatsJobManager interface { Start() Stop() SubmitStatsTask(originSegmentID, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool) error @@ -24,7 +24,7 @@ type StatsTaskManager interface { DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error } -var _ StatsTaskManager = (*statsJobManager)(nil) +var _ StatsJobManager = (*statsJobManager)(nil) type statsJobManager struct { ctx context.Context @@ -90,15 +90,6 @@ func (jm *statsJobManager) triggerStatsTaskLoop() { log.Warn("segment is not exist, no need to do stats task", zap.Int64("segmentID", segID)) continue } - // TODO @xiaocai2333 @bigsheeper: remove code after allow create stats task for importing segment - if segment.GetIsImporting() { - log.Info("segment is importing, skip stats task", zap.Int64("segmentID", segID)) - select { - case getBuildIndexChSingleton() <- segID: - default: - } - continue - } jm.createSortStatsTaskForSegment(segment) } } @@ -106,17 +97,10 @@ func (jm *statsJobManager) triggerStatsTaskLoop() { func (jm *statsJobManager) triggerSortStatsTask() { segments := jm.mt.SelectSegments(SegmentFilterFunc(func(seg *SegmentInfo) bool { - return isFlush(seg) && seg.GetLevel() != datapb.SegmentLevel_L0 && !seg.GetIsSorted() + return isFlush(seg) && seg.GetLevel() != datapb.SegmentLevel_L0 && !seg.GetIsSorted() && !seg.GetIsImporting() })) for _, segment := range segments { - if !segment.GetIsSorted() { - // TODO @xiaocai2333, @bigsheeper: - if segment.GetIsImporting() { - log.Warn("segment is importing, skip stats task, wait @bigsheeper support it") - continue - } - jm.createSortStatsTaskForSegment(segment) - } + jm.createSortStatsTaskForSegment(segment) } } diff --git a/internal/datacoord/mock_compaction_meta.go b/internal/datacoord/mock_compaction_meta.go index 419f7c059b..ec90d4b216 100644 --- a/internal/datacoord/mock_compaction_meta.go +++ b/internal/datacoord/mock_compaction_meta.go @@ -567,49 +567,6 @@ func (_c *MockCompactionMeta_GetSegment_Call) RunAndReturn(run func(int64) *Segm return _c } -// GetStatsTaskMeta provides a mock function with given fields: -func (_m *MockCompactionMeta) GetStatsTaskMeta() *statsTaskMeta { - ret := _m.Called() - - var r0 *statsTaskMeta - if rf, ok := ret.Get(0).(func() *statsTaskMeta); ok { - r0 = rf() - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*statsTaskMeta) - } - } - - return r0 -} - -// MockCompactionMeta_GetStatsTaskMeta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStatsTaskMeta' -type MockCompactionMeta_GetStatsTaskMeta_Call struct { - *mock.Call -} - -// GetStatsTaskMeta is a helper method to define mock.On call -func (_e *MockCompactionMeta_Expecter) GetStatsTaskMeta() *MockCompactionMeta_GetStatsTaskMeta_Call { - return &MockCompactionMeta_GetStatsTaskMeta_Call{Call: _e.mock.On("GetStatsTaskMeta")} -} - -func (_c *MockCompactionMeta_GetStatsTaskMeta_Call) Run(run func()) *MockCompactionMeta_GetStatsTaskMeta_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockCompactionMeta_GetStatsTaskMeta_Call) Return(_a0 *statsTaskMeta) *MockCompactionMeta_GetStatsTaskMeta_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockCompactionMeta_GetStatsTaskMeta_Call) RunAndReturn(run func() *statsTaskMeta) *MockCompactionMeta_GetStatsTaskMeta_Call { - _c.Call.Return(run) - return _c -} - // SaveCompactionTask provides a mock function with given fields: task func (_m *MockCompactionMeta) SaveCompactionTask(task *datapb.CompactionTask) error { ret := _m.Called(task) diff --git a/internal/datacoord/mock_job_manager.go b/internal/datacoord/mock_job_manager.go new file mode 100644 index 0000000000..de47574859 --- /dev/null +++ b/internal/datacoord/mock_job_manager.go @@ -0,0 +1,230 @@ +// Code generated by mockery v2.32.4. DO NOT EDIT. + +package datacoord + +import ( + indexpb "github.com/milvus-io/milvus/internal/proto/indexpb" + mock "github.com/stretchr/testify/mock" +) + +// MockStatsJobManager is an autogenerated mock type for the StatsJobManager type +type MockStatsJobManager struct { + mock.Mock +} + +type MockStatsJobManager_Expecter struct { + mock *mock.Mock +} + +func (_m *MockStatsJobManager) EXPECT() *MockStatsJobManager_Expecter { + return &MockStatsJobManager_Expecter{mock: &_m.Mock} +} + +// DropStatsTask provides a mock function with given fields: originSegmentID, subJobType +func (_m *MockStatsJobManager) DropStatsTask(originSegmentID int64, subJobType indexpb.StatsSubJob) error { + ret := _m.Called(originSegmentID, subJobType) + + var r0 error + if rf, ok := ret.Get(0).(func(int64, indexpb.StatsSubJob) error); ok { + r0 = rf(originSegmentID, subJobType) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockStatsJobManager_DropStatsTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DropStatsTask' +type MockStatsJobManager_DropStatsTask_Call struct { + *mock.Call +} + +// DropStatsTask is a helper method to define mock.On call +// - originSegmentID int64 +// - subJobType indexpb.StatsSubJob +func (_e *MockStatsJobManager_Expecter) DropStatsTask(originSegmentID interface{}, subJobType interface{}) *MockStatsJobManager_DropStatsTask_Call { + return &MockStatsJobManager_DropStatsTask_Call{Call: _e.mock.On("DropStatsTask", originSegmentID, subJobType)} +} + +func (_c *MockStatsJobManager_DropStatsTask_Call) Run(run func(originSegmentID int64, subJobType indexpb.StatsSubJob)) *MockStatsJobManager_DropStatsTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(indexpb.StatsSubJob)) + }) + return _c +} + +func (_c *MockStatsJobManager_DropStatsTask_Call) Return(_a0 error) *MockStatsJobManager_DropStatsTask_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStatsJobManager_DropStatsTask_Call) RunAndReturn(run func(int64, indexpb.StatsSubJob) error) *MockStatsJobManager_DropStatsTask_Call { + _c.Call.Return(run) + return _c +} + +// GetStatsTaskState provides a mock function with given fields: originSegmentID, subJobType +func (_m *MockStatsJobManager) GetStatsTaskState(originSegmentID int64, subJobType indexpb.StatsSubJob) indexpb.JobState { + ret := _m.Called(originSegmentID, subJobType) + + var r0 indexpb.JobState + if rf, ok := ret.Get(0).(func(int64, indexpb.StatsSubJob) indexpb.JobState); ok { + r0 = rf(originSegmentID, subJobType) + } else { + r0 = ret.Get(0).(indexpb.JobState) + } + + return r0 +} + +// MockStatsJobManager_GetStatsTaskState_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetStatsTaskState' +type MockStatsJobManager_GetStatsTaskState_Call struct { + *mock.Call +} + +// GetStatsTaskState is a helper method to define mock.On call +// - originSegmentID int64 +// - subJobType indexpb.StatsSubJob +func (_e *MockStatsJobManager_Expecter) GetStatsTaskState(originSegmentID interface{}, subJobType interface{}) *MockStatsJobManager_GetStatsTaskState_Call { + return &MockStatsJobManager_GetStatsTaskState_Call{Call: _e.mock.On("GetStatsTaskState", originSegmentID, subJobType)} +} + +func (_c *MockStatsJobManager_GetStatsTaskState_Call) Run(run func(originSegmentID int64, subJobType indexpb.StatsSubJob)) *MockStatsJobManager_GetStatsTaskState_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(indexpb.StatsSubJob)) + }) + return _c +} + +func (_c *MockStatsJobManager_GetStatsTaskState_Call) Return(_a0 indexpb.JobState) *MockStatsJobManager_GetStatsTaskState_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStatsJobManager_GetStatsTaskState_Call) RunAndReturn(run func(int64, indexpb.StatsSubJob) indexpb.JobState) *MockStatsJobManager_GetStatsTaskState_Call { + _c.Call.Return(run) + return _c +} + +// Start provides a mock function with given fields: +func (_m *MockStatsJobManager) Start() { + _m.Called() +} + +// MockStatsJobManager_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type MockStatsJobManager_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +func (_e *MockStatsJobManager_Expecter) Start() *MockStatsJobManager_Start_Call { + return &MockStatsJobManager_Start_Call{Call: _e.mock.On("Start")} +} + +func (_c *MockStatsJobManager_Start_Call) Run(run func()) *MockStatsJobManager_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStatsJobManager_Start_Call) Return() *MockStatsJobManager_Start_Call { + _c.Call.Return() + return _c +} + +func (_c *MockStatsJobManager_Start_Call) RunAndReturn(run func()) *MockStatsJobManager_Start_Call { + _c.Call.Return(run) + return _c +} + +// Stop provides a mock function with given fields: +func (_m *MockStatsJobManager) Stop() { + _m.Called() +} + +// MockStatsJobManager_Stop_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Stop' +type MockStatsJobManager_Stop_Call struct { + *mock.Call +} + +// Stop is a helper method to define mock.On call +func (_e *MockStatsJobManager_Expecter) Stop() *MockStatsJobManager_Stop_Call { + return &MockStatsJobManager_Stop_Call{Call: _e.mock.On("Stop")} +} + +func (_c *MockStatsJobManager_Stop_Call) Run(run func()) *MockStatsJobManager_Stop_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockStatsJobManager_Stop_Call) Return() *MockStatsJobManager_Stop_Call { + _c.Call.Return() + return _c +} + +func (_c *MockStatsJobManager_Stop_Call) RunAndReturn(run func()) *MockStatsJobManager_Stop_Call { + _c.Call.Return(run) + return _c +} + +// SubmitStatsTask provides a mock function with given fields: originSegmentID, targetSegmentID, subJobType, canRecycle +func (_m *MockStatsJobManager) SubmitStatsTask(originSegmentID int64, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool) error { + ret := _m.Called(originSegmentID, targetSegmentID, subJobType, canRecycle) + + var r0 error + if rf, ok := ret.Get(0).(func(int64, int64, indexpb.StatsSubJob, bool) error); ok { + r0 = rf(originSegmentID, targetSegmentID, subJobType, canRecycle) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockStatsJobManager_SubmitStatsTask_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SubmitStatsTask' +type MockStatsJobManager_SubmitStatsTask_Call struct { + *mock.Call +} + +// SubmitStatsTask is a helper method to define mock.On call +// - originSegmentID int64 +// - targetSegmentID int64 +// - subJobType indexpb.StatsSubJob +// - canRecycle bool +func (_e *MockStatsJobManager_Expecter) SubmitStatsTask(originSegmentID interface{}, targetSegmentID interface{}, subJobType interface{}, canRecycle interface{}) *MockStatsJobManager_SubmitStatsTask_Call { + return &MockStatsJobManager_SubmitStatsTask_Call{Call: _e.mock.On("SubmitStatsTask", originSegmentID, targetSegmentID, subJobType, canRecycle)} +} + +func (_c *MockStatsJobManager_SubmitStatsTask_Call) Run(run func(originSegmentID int64, targetSegmentID int64, subJobType indexpb.StatsSubJob, canRecycle bool)) *MockStatsJobManager_SubmitStatsTask_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64), args[2].(indexpb.StatsSubJob), args[3].(bool)) + }) + return _c +} + +func (_c *MockStatsJobManager_SubmitStatsTask_Call) Return(_a0 error) *MockStatsJobManager_SubmitStatsTask_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockStatsJobManager_SubmitStatsTask_Call) RunAndReturn(run func(int64, int64, indexpb.StatsSubJob, bool) error) *MockStatsJobManager_SubmitStatsTask_Call { + _c.Call.Return(run) + return _c +} + +// NewMockStatsJobManager creates a new instance of MockStatsJobManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockStatsJobManager(t interface { + mock.TestingT + Cleanup(func()) +}) *MockStatsJobManager { + mock := &MockStatsJobManager{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 391053a903..46c72948e0 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -152,7 +152,7 @@ type Server struct { indexEngineVersionManager IndexEngineVersionManager taskScheduler *taskScheduler - jobManager *statsJobManager + jobManager StatsJobManager // manage ways that data coord access other coord broker broker.Broker @@ -396,7 +396,7 @@ func (s *Server) initDataCoord() error { return err } s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta) - s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta) + s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta, s.jobManager) s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 1f348aeca6..2e1c94c2a4 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1770,7 +1770,7 @@ func (s *Server) GetImportProgress(ctx context.Context, in *internalpb.GetImport resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprintf("import job does not exist, jobID=%d", jobID))) return resp, nil } - progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta) + progress, state, importedRows, totalRows, reason := GetJobProgress(jobID, s.importMeta, s.meta, s.jobManager) resp.State = state resp.Reason = reason resp.Progress = progress @@ -1807,7 +1807,7 @@ func (s *Server) ListImports(ctx context.Context, req *internalpb.ListImportsReq } for _, job := range jobs { - progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta) + progress, state, _, _, reason := GetJobProgress(job.GetJobID(), s.importMeta, s.meta, s.jobManager) resp.JobIDs = append(resp.JobIDs, fmt.Sprintf("%d", job.GetJobID())) resp.States = append(resp.States, state) resp.Reasons = append(resp.Reasons, reason) diff --git a/internal/datacoord/stats_task_meta.go b/internal/datacoord/stats_task_meta.go index b062422711..1ad3bcffb5 100644 --- a/internal/datacoord/stats_task_meta.go +++ b/internal/datacoord/stats_task_meta.go @@ -108,8 +108,8 @@ func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error { } } - log.Info("add stats task", zap.Int64("taskID", t.GetTaskID()), - zap.Int64("segmentID", t.GetSegmentID()), zap.String("subJobType", t.GetSubJobType().String())) + log.Info("add stats task", zap.Int64("taskID", t.GetTaskID()), zap.Int64("originSegmentID", t.GetSegmentID()), + zap.Int64("targetSegmentID", t.GetTargetSegmentID()), zap.String("subJobType", t.GetSubJobType().String())) t.State = indexpb.JobState_JobStateInit if err := stm.catalog.SaveStatsTask(stm.ctx, t); err != nil { @@ -124,8 +124,8 @@ func (stm *statsTaskMeta) AddStatsTask(t *indexpb.StatsTask) error { stm.tasks[t.GetTaskID()] = t stm.updateMetrics() - log.Info("add stats task success", zap.Int64("taskID", t.GetTaskID()), - zap.Int64("segmentID", t.GetSegmentID()), zap.String("subJobType", t.GetSubJobType().String())) + log.Info("add stats task success", zap.Int64("taskID", t.GetTaskID()), zap.Int64("originSegmentID", t.GetSegmentID()), + zap.Int64("targetSegmentID", t.GetTargetSegmentID()), zap.String("subJobType", t.GetSubJobType().String())) return nil } diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index b7ae52da30..7ac63191ef 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -860,7 +860,7 @@ func (s *taskSchedulerSuite) scheduler(handler Handler) { scheduler.collectMetricsDuration = time.Millisecond * 200 scheduler.Start() - s.Run("enqueue", func() { + s.Run("Submit", func() { taskID := int64(6) newTask := &indexpb.AnalyzeTask{ CollectionID: s.collectionID, @@ -1654,7 +1654,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { resetMetaFunc() }) - s.Run("enqueue valid", func() { + s.Run("Submit valid", func() { for _, dataType := range []schemapb.DataType{ schemapb.DataType_Int8, schemapb.DataType_Int16, @@ -1685,7 +1685,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { }) // should still be able to build vec index when opt field is not set - s.Run("enqueue returns empty optional field when cfg disable", func() { + s.Run("Submit returns empty optional field when cfg disable", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { @@ -1706,7 +1706,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { resetMetaFunc() }) - s.Run("enqueue returns empty when vector type is not dense vector", func() { + s.Run("Submit returns empty when vector type is not dense vector", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") for _, dataType := range []schemapb.DataType{ schemapb.DataType_SparseFloatVector, @@ -1732,7 +1732,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { } }) - s.Run("enqueue returns empty optional field when the data type is not STRING or VARCHAR or Integer", func() { + s.Run("Submit returns empty optional field when the data type is not STRING or VARCHAR or Integer", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") for _, dataType := range []schemapb.DataType{ schemapb.DataType_Bool, @@ -1762,7 +1762,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { } }) - s.Run("enqueue returns empty optional field when no partition key", func() { + s.Run("Submit returns empty optional field when no partition key", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") mt.collections[collID].Schema.Fields[1].IsPartitionKey = false in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( @@ -1784,7 +1784,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { resetMetaFunc() }) - s.Run("enqueue partitionKeyIsolation is false when schema is not set", func() { + s.Run("Submit partitionKeyIsolation is false when schema is not set", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { @@ -1823,7 +1823,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { scheduler_isolation := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler_isolation, nil) scheduler_isolation.Start() - s.Run("enqueue partitionKeyIsolation is false when MV not enabled", func() { + s.Run("Submit partitionKeyIsolation is false when MV not enabled", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( func(ctx context.Context, in *workerpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { @@ -1844,7 +1844,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { resetMetaFunc() }) - s.Run("enqueue partitionKeyIsolation is true when MV enabled", func() { + s.Run("Submit partitionKeyIsolation is true when MV enabled", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "true" @@ -1867,7 +1867,7 @@ func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { resetMetaFunc() }) - s.Run("enqueue partitionKeyIsolation is invalid when MV is enabled", func() { + s.Run("Submit partitionKeyIsolation is invalid when MV is enabled", func() { paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") isoCollInfo.Properties[common.PartitionKeyIsolationKey] = "invalid" diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index cd96e74afc..1cd55e9c13 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -905,6 +905,7 @@ message ImportTaskV2 { string reason = 7; string complete_time = 8; repeated ImportFileStats file_stats = 9; + repeated int64 stats_segmentIDs = 10; } enum GcCommand { diff --git a/internal/proto/internal.proto b/internal/proto/internal.proto index b50ea4004c..6b92c59665 100644 --- a/internal/proto/internal.proto +++ b/internal/proto/internal.proto @@ -304,6 +304,8 @@ enum ImportJobState { Importing = 3; Failed = 4; Completed = 5; + Stats = 6; + IndexBuilding = 7; } message ImportFile { diff --git a/tests/integration/import/binlog_test.go b/tests/integration/import/binlog_test.go index 99c4c871ab..f1a32c4502 100644 --- a/tests/integration/import/binlog_test.go +++ b/tests/integration/import/binlog_test.go @@ -279,8 +279,11 @@ func (s *BulkInsertSuite) TestBinlogImport() { return segment.GetCollectionID() == newCollectionID }) log.Info("Show segments", zap.Any("segments", segments)) - s.Equal(1, len(segments)) - segment := segments[0] + s.Equal(2, len(segments)) + segment, ok := lo.Find(segments, func(segment *datapb.SegmentInfo) bool { + return segment.GetState() == commonpb.SegmentState_Flushed + }) + s.True(ok) s.Equal(commonpb.SegmentState_Flushed, segment.GetState()) s.True(len(segment.GetBinlogs()) > 0) s.NoError(CheckLogID(segment.GetBinlogs())) diff --git a/tests/integration/import/dynamic_field_test.go b/tests/integration/import/dynamic_field_test.go index 4c928df891..93b5228144 100644 --- a/tests/integration/import/dynamic_field_test.go +++ b/tests/integration/import/dynamic_field_test.go @@ -47,7 +47,7 @@ func (s *BulkInsertSuite) testImportDynamicField() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) defer cancel() collectionName := "TestBulkInsert_B_" + funcutil.GenRandomStr() diff --git a/tests/integration/import/import_test.go b/tests/integration/import/import_test.go index 0eb43b55f8..85797dbd5b 100644 --- a/tests/integration/import/import_test.go +++ b/tests/integration/import/import_test.go @@ -76,7 +76,7 @@ func (s *BulkInsertSuite) run() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) defer cancel() collectionName := "TestBulkInsert" + funcutil.GenRandomStr() diff --git a/tests/integration/import/partition_key_test.go b/tests/integration/import/partition_key_test.go index 9a6d6db930..de1b71dc11 100644 --- a/tests/integration/import/partition_key_test.go +++ b/tests/integration/import/partition_key_test.go @@ -46,7 +46,7 @@ func (s *BulkInsertSuite) TestImportWithPartitionKey() { ) c := s.Cluster - ctx, cancel := context.WithTimeout(c.GetContext(), 60*time.Second) + ctx, cancel := context.WithTimeout(c.GetContext(), 120*time.Second) defer cancel() collectionName := "TestBulkInsert_WithPartitionKey_" + funcutil.GenRandomStr()