diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 7210e96b11..e8d56e7e38 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -57,6 +57,7 @@ type compactionTaskState int8 const ( executing compactionTaskState = iota + 1 + pipelining completed failed timeout @@ -153,6 +154,12 @@ func (c *compactionPlanHandler) stop() { c.wg.Wait() } +func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskOpt) { + c.mu.Lock() + defer c.mu.Unlock() + c.plans[planID] = c.plans[planID].shadowClone(opts...) +} + // execCompactionPlan start to execute plan and return immediately func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { c.mu.Lock() @@ -171,7 +178,7 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla task := &compactionTask{ triggerInfo: signal, plan: plan, - state: executing, + state: pipelining, dataNodeID: nodeID, } c.plans[plan.PlanID] = task @@ -185,21 +192,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla if err != nil { log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID())) // update plan ts to TIMEOUT ts - c.mu.Lock() - c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) { - task.plan.StartTime = tsTimeout - }) - c.mu.Unlock() + c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout)) return } - - c.mu.Lock() - c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) { - task.plan.StartTime = ts - }) - c.mu.Unlock() - + c.updateTask(plan.PlanID, setStartTime(ts)) err = c.sessions.Compaction(nodeID, plan) + c.updateTask(plan.PlanID, setState(executing)) if err != nil { log.Warn("try to Compaction but DataNode rejected", zap.Int64("targetNodeID", nodeID), @@ -209,7 +207,6 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla // release queue will be done in `updateCompaction` return } - log.Info("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID())) }() return nil @@ -311,12 +308,6 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { stateResult, ok := planStates[task.plan.PlanID] state := stateResult.GetState() planID := task.plan.PlanID - startTime := task.plan.GetStartTime() - - // start time is 0 means this task have not started, skip checker - if startTime == 0 { - continue - } // check wether the state of CompactionPlan is working if ok { if state == commonpb.CompactionState_Completed { @@ -422,6 +413,12 @@ func setState(state compactionTaskState) compactionTaskOpt { } } +func setStartTime(startTime uint64) compactionTaskOpt { + return func(task *compactionTask) { + task.plan.StartTime = startTime + } +} + func setResult(result *datapb.CompactionResult) compactionTaskOpt { return func(task *compactionTask) { task.result = result diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 3ca88a5bed..f0f6805e5e 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -40,7 +40,6 @@ import ( ) func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { - ch := make(chan interface{}, 1) type fields struct { plans map[int64]*compactionTask sessions *SessionManager @@ -68,7 +67,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { data map[int64]*Session }{ data: map[int64]*Session{ - 1: {client: &mockDataNodeClient{ch: ch}}, + 1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1)}}, }, }, }, @@ -98,7 +97,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { data map[int64]*Session }{ data: map[int64]*Session{ - 1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, + 1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, }, }, }, @@ -128,7 +127,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) { data map[int64]*Session }{ data: map[int64]*Session{ - 1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, + 1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}}, }, }, }, diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 0c512f4cc3..4469779380 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1065,6 +1065,8 @@ func getCompactionMergeInfo(task *compactionTask) *milvuspb.CompactionMergeInfo func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState, executingCnt, completedCnt, failedCnt, timeoutCnt int) { for _, t := range tasks { switch t.state { + case pipelining: + executingCnt++ case executing: executingCnt++ case completed: diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index da03c53728..224fc01adc 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -46,10 +46,10 @@ func newCompactionExecutor() *compactionExecutor { func (c *compactionExecutor) execute(task compactor) { c.taskCh <- task + c.toExecutingState(task) } func (c *compactionExecutor) toExecutingState(task compactor) { - task.start() c.executing.Store(task.getPlanID(), task) } @@ -60,7 +60,7 @@ func (c *compactionExecutor) toCompleteState(task compactor) { // These two func are bounded for waitGroup func (c *compactionExecutor) executeWithState(task compactor) { - c.toExecutingState(task) + task.start() go c.executeTask(task) }