Refactor task scheduler and executor (#20828)

Make the performance able to scale out

Signed-off-by: yah01 <yang.cen@zilliz.com>

Signed-off-by: yah01 <yang.cen@zilliz.com>
pull/20891/head
yah01 2022-11-30 13:57:15 +08:00 committed by GitHub
parent 18762f82aa
commit 060649b8aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 283 additions and 92 deletions

View File

@ -38,7 +38,7 @@ ignore:
- "**/*.pb.go"
- "**/*.proto"
- "internal/metastore/db/dbmodel/mocks/.*"
- "internal/mocks"
- "**/mock_*.go"

View File

@ -184,6 +184,7 @@ queryCoord:
loadTimeoutSeconds: 600
checkHandoffInterval: 5000
taskMergeCap: 16
taskExecutionCap: 256
enableActiveStandby: false # Enable active-standby
# Related configuration of queryNode, used to run hybrid search between vector and scalar data.

View File

@ -311,6 +311,7 @@ func (s *Server) Start() error {
}
for _, node := range sessions {
s.nodeMgr.Add(session.NewNodeInfo(node.ServerID, node.Address))
s.taskScheduler.AddExecutor(node.ServerID)
}
s.checkReplicas()
for _, node := range sessions {
@ -571,6 +572,7 @@ func (s *Server) watchNodes(revision int64) {
func (s *Server) handleNodeUp(node int64) {
log := log.With(zap.Int64("nodeID", node))
s.taskScheduler.AddExecutor(node)
s.distController.StartDistInstance(s.ctx, node)
for _, collection := range s.meta.CollectionManager.GetAll() {
@ -598,6 +600,7 @@ func (s *Server) handleNodeUp(node int64) {
func (s *Server) handleNodeDown(node int64) {
log := log.With(zap.Int64("nodeID", node))
s.taskScheduler.RemoveExecutor(node)
s.distController.Remove(node)
// Refresh the targets, to avoid consuming messages too early from channel

View File

@ -26,9 +26,11 @@ import (
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/internal/util/tsoutil"
"go.uber.org/atomic"
"go.uber.org/zap"
)
@ -45,7 +47,8 @@ type Executor struct {
// Merge load segment requests
merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest]
executingTasks sync.Map
executingTasks sync.Map
executingTaskNum atomic.Int32
}
func NewExecutor(meta *meta.Meta,
@ -82,10 +85,14 @@ func (ex *Executor) Stop() {
// does nothing and returns false if the action is already committed,
// returns true otherwise.
func (ex *Executor) Execute(task Task, step int) bool {
if ex.executingTaskNum.Load() > Params.QueryCoordCfg.TaskExecutionCap {
return false
}
_, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{})
if exist {
return false
}
ex.executingTaskNum.Inc()
log := log.With(
zap.Int64("taskID", task.ID()),
@ -137,7 +144,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
defer func() {
for i := range mergeTask.tasks {
mergeTask.tasks[i].SetErr(task.Err())
ex.removeAction(mergeTask.tasks[i], mergeTask.steps[i])
ex.removeTask(mergeTask.tasks[i], mergeTask.steps[i])
}
}()
@ -180,7 +187,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
log.Info("load segments done", zap.Int64("taskID", task.ID()), zap.Duration("timeTaken", elapsed))
}
func (ex *Executor) removeAction(task Task, step int) {
func (ex *Executor) removeTask(task Task, step int) {
if task.Err() != nil {
log.Info("excute action done, remove it",
zap.Int64("taskID", task.ID()),
@ -189,6 +196,7 @@ func (ex *Executor) removeAction(task Task, step int) {
}
ex.executingTasks.Delete(task.ID())
ex.executingTaskNum.Dec()
}
func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
@ -218,7 +226,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
if err != nil {
task.SetErr(err)
task.Cancel()
ex.removeAction(task, step)
ex.removeTask(task, step)
}
}()
@ -270,7 +278,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
}
func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
defer ex.removeAction(task, step)
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*SegmentAction)
defer action.isReleaseCommitted.Store(true)
@ -343,7 +351,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
}
func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
log := log.With(
@ -415,7 +423,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
}
func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
defer ex.removeAction(task, step)
defer ex.removeTask(task, step)
startTs := time.Now()
action := task.Actions()[step].(*ChannelAction)
log := log.With(

View File

@ -1,4 +1,4 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
// Code generated by mockery v2.14.1. DO NOT EDIT.
package task
@ -58,6 +58,34 @@ func (_c *MockScheduler_Add_Call) Return(_a0 error) *MockScheduler_Add_Call {
return _c
}
// AddExecutor provides a mock function with given fields: nodeID
func (_m *MockScheduler) AddExecutor(nodeID int64) {
_m.Called(nodeID)
}
// MockScheduler_AddExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddExecutor'
type MockScheduler_AddExecutor_Call struct {
*mock.Call
}
// AddExecutor is a helper method to define mock.On call
// - nodeID int64
func (_e *MockScheduler_Expecter) AddExecutor(nodeID interface{}) *MockScheduler_AddExecutor_Call {
return &MockScheduler_AddExecutor_Call{Call: _e.mock.On("AddExecutor", nodeID)}
}
func (_c *MockScheduler_AddExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_AddExecutor_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockScheduler_AddExecutor_Call) Return() *MockScheduler_AddExecutor_Call {
_c.Call.Return()
return _c
}
// Dispatch provides a mock function with given fields: node
func (_m *MockScheduler) Dispatch(node int64) {
_m.Called(node)
@ -188,6 +216,34 @@ func (_c *MockScheduler_RemoveByNode_Call) Return() *MockScheduler_RemoveByNode_
return _c
}
// RemoveExecutor provides a mock function with given fields: nodeID
func (_m *MockScheduler) RemoveExecutor(nodeID int64) {
_m.Called(nodeID)
}
// MockScheduler_RemoveExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveExecutor'
type MockScheduler_RemoveExecutor_Call struct {
*mock.Call
}
// RemoveExecutor is a helper method to define mock.On call
// - nodeID int64
func (_e *MockScheduler_Expecter) RemoveExecutor(nodeID interface{}) *MockScheduler_RemoveExecutor_Call {
return &MockScheduler_RemoveExecutor_Call{Call: _e.mock.On("RemoveExecutor", nodeID)}
}
func (_c *MockScheduler_RemoveExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_RemoveExecutor_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(int64))
})
return _c
}
func (_c *MockScheduler_RemoveExecutor_Call) Return() *MockScheduler_RemoveExecutor_Call {
_c.Call.Return()
return _c
}
// Start provides a mock function with given fields: ctx
func (_m *MockScheduler) Start(ctx context.Context) {
_m.Called(ctx)

View File

@ -38,8 +38,6 @@ const (
TaskTypeGrow Type = iota + 1
TaskTypeReduce
TaskTypeMove
taskPoolSize = 256
)
var (
@ -56,8 +54,6 @@ var (
// No enough memory to load segment
ErrResourceNotEnough = errors.New("ResourceNotEnough")
ErrTaskQueueFull = errors.New("TaskQueueFull")
ErrFailedResponse = errors.New("RpcFailed")
ErrTaskAlreadyDone = errors.New("TaskAlreadyDone")
)
@ -85,17 +81,17 @@ type replicaChannelIndex struct {
}
type taskQueue struct {
// TaskPriority -> Tasks
buckets [][]Task
cap int
// TaskPriority -> TaskID -> Task
buckets []map[int64]Task
}
func newTaskQueue(cap int) *taskQueue {
func newTaskQueue() *taskQueue {
buckets := make([]map[int64]Task, len(TaskPriorities))
for i := range buckets {
buckets[i] = make(map[int64]Task)
}
return &taskQueue{
buckets: make([][]Task, len(TaskPriorities)),
cap: cap,
buckets: buckets,
}
}
@ -108,35 +104,21 @@ func (queue *taskQueue) Len() int {
return taskNum
}
func (queue *taskQueue) Cap() int {
return queue.cap
}
func (queue *taskQueue) Add(task Task) bool {
if queue.Len() >= queue.Cap() {
return false
}
queue.buckets[task.Priority()] = append(queue.buckets[task.Priority()], task)
return true
func (queue *taskQueue) Add(task Task) {
bucket := queue.buckets[task.Priority()]
bucket[task.ID()] = task
}
func (queue *taskQueue) Remove(task Task) {
bucket := &queue.buckets[task.Priority()]
for i := range *bucket {
if (*bucket)[i].ID() == task.ID() {
*bucket = append((*bucket)[:i], (*bucket)[i+1:]...)
break
}
}
bucket := queue.buckets[task.Priority()]
delete(bucket, task.ID())
}
// Range iterates all tasks in the queue ordered by priority from high to low
func (queue *taskQueue) Range(fn func(task Task) bool) {
for priority := len(queue.buckets) - 1; priority >= 0; priority-- {
for i := range queue.buckets[priority] {
if !fn(queue.buckets[priority][i]) {
for _, task := range queue.buckets[priority] {
if !fn(task) {
return
}
}
@ -146,6 +128,8 @@ func (queue *taskQueue) Range(fn func(task Task) bool) {
type Scheduler interface {
Start(ctx context.Context)
Stop()
AddExecutor(nodeID int64)
RemoveExecutor(nodeID int64)
Add(task Task) error
Dispatch(node int64)
RemoveByNode(node int64)
@ -156,13 +140,14 @@ type Scheduler interface {
type taskScheduler struct {
rwmutex sync.RWMutex
ctx context.Context
executor *Executor
executors map[int64]*Executor // NodeID -> Executor
idAllocator func() UniqueID
distMgr *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker meta.Broker
cluster session.Cluster
nodeMgr *session.NodeManager
tasks UniqueSet
@ -181,8 +166,8 @@ func NewScheduler(ctx context.Context,
nodeMgr *session.NodeManager) *taskScheduler {
id := int64(0)
return &taskScheduler{
ctx: ctx,
executor: NewExecutor(meta, distMgr, broker, targetMgr, cluster, nodeMgr),
ctx: ctx,
executors: make(map[int64]*Executor),
idAllocator: func() UniqueID {
id++
return id
@ -192,22 +177,59 @@ func NewScheduler(ctx context.Context,
meta: meta,
targetMgr: targetMgr,
broker: broker,
cluster: cluster,
nodeMgr: nodeMgr,
tasks: make(UniqueSet),
segmentTasks: make(map[replicaSegmentIndex]Task),
channelTasks: make(map[replicaChannelIndex]Task),
processQueue: newTaskQueue(taskPoolSize),
waitQueue: newTaskQueue(taskPoolSize * 10),
processQueue: newTaskQueue(),
waitQueue: newTaskQueue(),
}
}
func (scheduler *taskScheduler) Start(ctx context.Context) {
scheduler.executor.Start(ctx)
}
func (scheduler *taskScheduler) Start(ctx context.Context) {}
func (scheduler *taskScheduler) Stop() {
scheduler.executor.Stop()
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
for nodeID, executor := range scheduler.executors {
executor.Stop()
delete(scheduler.executors, nodeID)
}
}
func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
if _, exist := scheduler.executors[nodeID]; exist {
return
}
executor := NewExecutor(scheduler.meta,
scheduler.distMgr,
scheduler.broker,
scheduler.targetMgr,
scheduler.cluster,
scheduler.nodeMgr)
scheduler.executors[nodeID] = executor
executor.Start(scheduler.ctx)
log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
}
func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
scheduler.rwmutex.Lock()
defer scheduler.rwmutex.Unlock()
executor, ok := scheduler.executors[nodeID]
if ok {
executor.Stop()
delete(scheduler.executors, nodeID)
log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
}
}
func (scheduler *taskScheduler) Add(task Task) error {
@ -219,12 +241,8 @@ func (scheduler *taskScheduler) Add(task Task) error {
return err
}
if !scheduler.waitQueue.Add(task) {
log.Warn("failed to add task", zap.String("task", task.String()))
return ErrTaskQueueFull
}
task.SetID(scheduler.idAllocator())
scheduler.waitQueue.Add(task)
scheduler.tasks.Insert(task.ID())
switch task := task.(type) {
case *SegmentTask:
@ -317,33 +335,35 @@ func (scheduler *taskScheduler) promote(task Task) error {
return err
}
if scheduler.processQueue.Add(task) {
task.SetStatus(TaskStatusStarted)
return nil
}
return ErrTaskQueueFull
scheduler.processQueue.Add(task)
task.SetStatus(TaskStatusStarted)
return nil
}
func (scheduler *taskScheduler) tryPromoteAll() {
// Promote waiting tasks
toPromote := make([]Task, 0, scheduler.processQueue.Cap()-scheduler.processQueue.Len())
toPromote := make([]Task, 0, scheduler.waitQueue.Len())
toRemove := make([]Task, 0)
scheduler.waitQueue.Range(func(task Task) bool {
err := scheduler.promote(task)
if errors.Is(err, ErrTaskStale) { // Task canceled or stale
task.SetStatus(TaskStatusStale)
task.SetErr(err)
toRemove = append(toRemove, task)
} else if errors.Is(err, ErrTaskCanceled) {
if err != nil {
task.SetStatus(TaskStatusCanceled)
if errors.Is(err, ErrTaskStale) { // Task canceled or stale
task.SetStatus(TaskStatusStale)
}
log.Warn("failed to promote task",
zap.Int64("taskID", task.ID()),
zap.Error(err),
)
task.SetErr(err)
toRemove = append(toRemove, task)
} else if err == nil {
} else {
toPromote = append(toPromote, task)
}
return !errors.Is(err, ErrTaskQueueFull)
return true
})
for _, task := range toPromote {
@ -527,7 +547,28 @@ func (scheduler *taskScheduler) process(task Task) bool {
zap.Int64("source", task.SourceID()),
)
if !scheduler.executor.Exist(task.ID()) && task.IsFinished(scheduler.distMgr) {
actions, step := task.Actions(), task.Step()
for step < len(actions) && actions[step].IsFinished(scheduler.distMgr) {
task.StepUp()
step++
}
if step == len(actions) {
step--
}
executor, ok := scheduler.executors[actions[step].Node()]
if !ok {
log.Warn("no executor for QueryNode",
zap.Int("step", step),
zap.Int64("nodeID", actions[step].Node()))
return false
}
if task.IsFinished(scheduler.distMgr) {
if executor.Exist(task.ID()) {
return false
}
task.SetStatus(TaskStatusSucceeded)
} else if scheduler.checkCanceled(task) {
task.SetStatus(TaskStatusCanceled)
@ -539,11 +580,10 @@ func (scheduler *taskScheduler) process(task Task) bool {
task.SetErr(ErrTaskStale)
}
step := task.Step()
log = log.With(zap.Int("step", step))
switch task.Status() {
case TaskStatusStarted:
if scheduler.executor.Execute(task, step) {
if executor.Execute(task, step) {
return true
}

View File

@ -74,6 +74,7 @@ type Task interface {
Wait() error
Actions() []Action
Step() int
StepUp() int
IsFinished(dist *meta.DistributionManager) bool
String() string
}
@ -188,18 +189,16 @@ func (task *baseTask) Step() int {
return task.step
}
func (task *baseTask) StepUp() int {
task.step++
return task.step
}
func (task *baseTask) IsFinished(distMgr *meta.DistributionManager) bool {
if task.Status() != TaskStatusStarted {
return false
}
actions, step := task.Actions(), task.Step()
for step < len(actions) && actions[step].IsFinished(distMgr) {
task.step++
step++
}
return task.Step() >= len(actions)
return task.Step() >= len(task.Actions())
}
func (task *baseTask) String() string {

View File

@ -132,6 +132,9 @@ func (suite *TaskSuite) SetupTest() {
suite.scheduler = suite.newScheduler()
suite.scheduler.Start(context.Background())
suite.scheduler.AddExecutor(1)
suite.scheduler.AddExecutor(2)
suite.scheduler.AddExecutor(3)
}
func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
@ -147,7 +150,8 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
"TestTaskCanceled",
"TestMoveSegmentTask",
"TestSubmitDuplicateLoadSegmentTask",
"TestSubmitDuplicateSubscribeChannelTask":
"TestSubmitDuplicateSubscribeChannelTask",
"TestNoExecutor":
suite.meta.PutCollection(&meta.Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: suite.collection,
@ -367,7 +371,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
// Expect
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{
Name: "TestSubscribeChannelTask",
Name: "TestLoadSegmentTask",
}, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, suite.collection).Return([]int64{100, 101}, nil)
for _, segment := range suite.loadSegments {
@ -496,7 +500,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
// Expect
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{
Name: "TestSubscribeChannelTask",
Name: "TestLoadSegmentTask",
}, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, suite.collection).Return([]int64{100, 101}, nil)
for _, segment := range suite.loadSegments {
@ -1129,6 +1133,76 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
suite.AssertTaskNum(0, segmentNum, 0, segmentNum)
}
func (suite *TaskSuite) TestNoExecutor() {
ctx := context.Background()
timeout := 10 * time.Second
targetNode := int64(-1)
channel := &datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: Params.CommonCfg.RootCoordDml + "-test",
}
suite.nodeMgr.Add(session.NewNodeInfo(targetNode, "localhost"))
suite.meta.ReplicaManager.Put(
utils.CreateTestReplica(suite.replica, suite.collection, []int64{1, 2, 3, -1}))
// Test load segment task
suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}))
tasks := []Task{}
segments := make([]*datapb.SegmentBinlogs, 0)
for _, segment := range suite.loadSegments {
segments = append(segments, &datapb.SegmentBinlogs{
SegmentID: segment,
InsertChannel: channel.ChannelName,
})
task, err := NewSegmentTask(
ctx,
timeout,
0,
suite.collection,
suite.replica,
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
)
suite.NoError(err)
tasks = append(tasks, task)
err = suite.scheduler.Add(task)
suite.NoError(err)
}
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, suite.collection, int64(1)).Return(nil, segments, nil)
suite.target.UpdateCollectionNextTargetWithPartitions(suite.collection, int64(1))
segmentsNum := len(suite.loadSegments)
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
// Process tasks
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
// Other nodes' HB can't trigger the procedure of tasks
suite.dispatchAndWait(targetNode + 1)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
// Process tasks done
// Dist contains channels
view := &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Segments: map[int64]*querypb.SegmentDist{},
}
for _, segment := range suite.loadSegments {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
suite.dist.LeaderViewManager.Update(targetNode, view)
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
for _, task := range tasks {
suite.Equal(TaskStatusStarted, task.Status())
suite.NoError(task.Err())
}
}
func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) {
scheduler := suite.scheduler
@ -1148,11 +1222,15 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
for start := time.Now(); time.Since(start) < timeout; {
count = 0
keys = make([]any, 0)
suite.scheduler.executor.executingTasks.Range(func(key, value any) bool {
keys = append(keys, key)
count++
return true
})
for _, executor := range suite.scheduler.executors {
executor.executingTasks.Range(func(key, value any) bool {
keys = append(keys, key)
count++
return true
})
}
if count == 0 {
return
}

View File

@ -706,9 +706,10 @@ type queryCoordConfig struct {
UpdatedTime time.Time
//---- Task ---
RetryNum int32
RetryInterval int64
TaskMergeCap int32
RetryNum int32
RetryInterval int64
TaskMergeCap int32
TaskExecutionCap int32
//---- Handoff ---
AutoHandoff bool
@ -736,6 +737,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
p.initTaskRetryNum()
p.initTaskRetryInterval()
p.initTaskMergeCap()
p.initTaskExecutionCap()
//---- Handoff ---
p.initAutoHandoff()
@ -767,6 +769,10 @@ func (p *queryCoordConfig) initTaskMergeCap() {
p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 16)
}
func (p *queryCoordConfig) initTaskExecutionCap() {
p.TaskExecutionCap = p.Base.ParseInt32WithDefault("queryCoord.taskExecutionCap", 256)
}
func (p *queryCoordConfig) initAutoHandoff() {
handoff, err := p.Base.Load("queryCoord.autoHandoff")
if err != nil {