mirror of https://github.com/milvus-io/milvus.git
Update datacoord compaction plan after datanode update plan to ensure consistency (#22143)
Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>pull/22114/head
parent
e20d79a8a1
commit
1894766235
|
@ -57,6 +57,7 @@ type compactionTaskState int8
|
|||
|
||||
const (
|
||||
executing compactionTaskState = iota + 1
|
||||
pipelining
|
||||
completed
|
||||
failed
|
||||
timeout
|
||||
|
@ -153,6 +154,12 @@ func (c *compactionPlanHandler) stop() {
|
|||
c.wg.Wait()
|
||||
}
|
||||
|
||||
func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskOpt) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.plans[planID] = c.plans[planID].shadowClone(opts...)
|
||||
}
|
||||
|
||||
// execCompactionPlan start to execute plan and return immediately
|
||||
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
|
||||
c.mu.Lock()
|
||||
|
@ -171,7 +178,7 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
|||
task := &compactionTask{
|
||||
triggerInfo: signal,
|
||||
plan: plan,
|
||||
state: executing,
|
||||
state: pipelining,
|
||||
dataNodeID: nodeID,
|
||||
}
|
||||
c.plans[plan.PlanID] = task
|
||||
|
@ -185,21 +192,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
|||
if err != nil {
|
||||
log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID()))
|
||||
// update plan ts to TIMEOUT ts
|
||||
c.mu.Lock()
|
||||
c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) {
|
||||
task.plan.StartTime = tsTimeout
|
||||
})
|
||||
c.mu.Unlock()
|
||||
c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout))
|
||||
return
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) {
|
||||
task.plan.StartTime = ts
|
||||
})
|
||||
c.mu.Unlock()
|
||||
|
||||
c.updateTask(plan.PlanID, setStartTime(ts))
|
||||
err = c.sessions.Compaction(nodeID, plan)
|
||||
c.updateTask(plan.PlanID, setState(executing))
|
||||
if err != nil {
|
||||
log.Warn("try to Compaction but DataNode rejected",
|
||||
zap.Int64("targetNodeID", nodeID),
|
||||
|
@ -209,7 +207,6 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
|
|||
// release queue will be done in `updateCompaction`
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
|
||||
}()
|
||||
return nil
|
||||
|
@ -311,12 +308,6 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
|
|||
stateResult, ok := planStates[task.plan.PlanID]
|
||||
state := stateResult.GetState()
|
||||
planID := task.plan.PlanID
|
||||
startTime := task.plan.GetStartTime()
|
||||
|
||||
// start time is 0 means this task have not started, skip checker
|
||||
if startTime == 0 {
|
||||
continue
|
||||
}
|
||||
// check wether the state of CompactionPlan is working
|
||||
if ok {
|
||||
if state == commonpb.CompactionState_Completed {
|
||||
|
@ -422,6 +413,12 @@ func setState(state compactionTaskState) compactionTaskOpt {
|
|||
}
|
||||
}
|
||||
|
||||
func setStartTime(startTime uint64) compactionTaskOpt {
|
||||
return func(task *compactionTask) {
|
||||
task.plan.StartTime = startTime
|
||||
}
|
||||
}
|
||||
|
||||
func setResult(result *datapb.CompactionResult) compactionTaskOpt {
|
||||
return func(task *compactionTask) {
|
||||
task.result = result
|
||||
|
|
|
@ -40,7 +40,6 @@ import (
|
|||
)
|
||||
|
||||
func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
|
||||
ch := make(chan interface{}, 1)
|
||||
type fields struct {
|
||||
plans map[int64]*compactionTask
|
||||
sessions *SessionManager
|
||||
|
@ -68,7 +67,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
|
|||
data map[int64]*Session
|
||||
}{
|
||||
data: map[int64]*Session{
|
||||
1: {client: &mockDataNodeClient{ch: ch}},
|
||||
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1)}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -98,7 +97,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
|
|||
data map[int64]*Session
|
||||
}{
|
||||
data: map[int64]*Session{
|
||||
1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
|
||||
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
@ -128,7 +127,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
|
|||
data map[int64]*Session
|
||||
}{
|
||||
data: map[int64]*Session{
|
||||
1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
|
||||
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -1065,6 +1065,8 @@ func getCompactionMergeInfo(task *compactionTask) *milvuspb.CompactionMergeInfo
|
|||
func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState, executingCnt, completedCnt, failedCnt, timeoutCnt int) {
|
||||
for _, t := range tasks {
|
||||
switch t.state {
|
||||
case pipelining:
|
||||
executingCnt++
|
||||
case executing:
|
||||
executingCnt++
|
||||
case completed:
|
||||
|
|
|
@ -46,10 +46,10 @@ func newCompactionExecutor() *compactionExecutor {
|
|||
|
||||
func (c *compactionExecutor) execute(task compactor) {
|
||||
c.taskCh <- task
|
||||
c.toExecutingState(task)
|
||||
}
|
||||
|
||||
func (c *compactionExecutor) toExecutingState(task compactor) {
|
||||
task.start()
|
||||
c.executing.Store(task.getPlanID(), task)
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
|
|||
|
||||
// These two func are bounded for waitGroup
|
||||
func (c *compactionExecutor) executeWithState(task compactor) {
|
||||
c.toExecutingState(task)
|
||||
task.start()
|
||||
go c.executeTask(task)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue