Update datacoord compaction plan after datanode update plan to ensure consistency (#22143) (#22329)

Signed-off-by: aoiasd <zhicheng.yue@zilliz.com>
pull/22341/head
aoiasd 2023-02-22 18:19:45 +08:00 committed by GitHub
parent dc6c7ddbee
commit 40878a8656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 24 additions and 26 deletions

View File

@ -59,6 +59,7 @@ type compactionTaskState int8
const (
executing compactionTaskState = iota + 1
pipelining
completed
failed
timeout
@ -155,6 +156,12 @@ func (c *compactionPlanHandler) stop() {
c.wg.Wait()
}
func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskOpt) {
c.mu.Lock()
defer c.mu.Unlock()
c.plans[planID] = c.plans[planID].shadowClone(opts...)
}
// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
c.mu.Lock()
@ -170,7 +177,7 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
task := &compactionTask{
triggerInfo: signal,
plan: plan,
state: executing,
state: pipelining,
dataNodeID: nodeID,
}
c.plans[plan.PlanID] = task
@ -184,21 +191,12 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
if err != nil {
log.Warn("Alloc start time for CompactionPlan failed", zap.Int64("planID", plan.GetPlanID()))
// update plan ts to TIMEOUT ts
c.mu.Lock()
c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) {
task.plan.StartTime = tsTimeout
})
c.mu.Unlock()
c.updateTask(plan.PlanID, setState(executing), setStartTime(tsTimeout))
return
}
c.mu.Lock()
c.plans[plan.PlanID] = c.plans[plan.PlanID].shadowClone(func(task *compactionTask) {
task.plan.StartTime = ts
})
c.mu.Unlock()
c.updateTask(plan.PlanID, setStartTime(ts))
err = c.sessions.Compaction(nodeID, plan)
c.updateTask(plan.PlanID, setState(executing))
if err != nil {
log.Warn("try to Compaction but DataNode rejected",
zap.Int64("targetNodeID", nodeID),
@ -208,7 +206,6 @@ func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, pla
// release queue will be done in `updateCompaction`
return
}
log.Info("start compaction", zap.Int64("nodeID", nodeID), zap.Int64("planID", plan.GetPlanID()))
}()
return nil
@ -318,12 +315,6 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
stateResult, ok := planStates[task.plan.PlanID]
state := stateResult.GetState()
planID := task.plan.PlanID
startTime := task.plan.GetStartTime()
// start time is 0 means this task have not started, skip checker
if startTime == 0 {
continue
}
// check wether the state of CompactionPlan is working
if ok {
if state == commonpb.CompactionState_Completed {
@ -429,6 +420,12 @@ func setState(state compactionTaskState) compactionTaskOpt {
}
}
func setStartTime(startTime uint64) compactionTaskOpt {
return func(task *compactionTask) {
task.plan.StartTime = startTime
}
}
func setResult(result *datapb.CompactionResult) compactionTaskOpt {
return func(task *compactionTask) {
task.result = result

View File

@ -39,7 +39,6 @@ import (
)
func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
ch := make(chan interface{}, 1)
type fields struct {
plans map[int64]*compactionTask
sessions *SessionManager
@ -67,7 +66,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
data map[int64]*Session
}{
data: map[int64]*Session{
1: {client: &mockDataNodeClient{ch: ch}},
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1)}},
},
},
},
@ -97,7 +96,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
data map[int64]*Session
}{
data: map[int64]*Session{
1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
},
},
},
@ -127,7 +126,7 @@ func Test_compactionPlanHandler_execCompactionPlan(t *testing.T) {
data map[int64]*Session
}{
data: map[int64]*Session{
1: {client: &mockDataNodeClient{ch: ch, compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
1: {client: &mockDataNodeClient{ch: make(chan interface{}, 1), compactionResp: &commonpb.Status{ErrorCode: commonpb.ErrorCode_CacheFailed}}},
},
},
},

View File

@ -1042,6 +1042,8 @@ func getCompactionMergeInfo(task *compactionTask) *milvuspb.CompactionMergeInfo
func getCompactionState(tasks []*compactionTask) (state commonpb.CompactionState, executingCnt, completedCnt, failedCnt, timeoutCnt int) {
for _, t := range tasks {
switch t.state {
case pipelining:
executingCnt++
case executing:
executingCnt++
case completed:

View File

@ -46,10 +46,10 @@ func newCompactionExecutor() *compactionExecutor {
func (c *compactionExecutor) execute(task compactor) {
c.taskCh <- task
c.toExecutingState(task)
}
func (c *compactionExecutor) toExecutingState(task compactor) {
task.start()
c.executing.Store(task.getPlanID(), task)
}
@ -60,7 +60,7 @@ func (c *compactionExecutor) toCompleteState(task compactor) {
// These two func are bounded for waitGroup
func (c *compactionExecutor) executeWithState(task compactor) {
c.toExecutingState(task)
task.start()
go c.executeTask(task)
}