mirror of https://github.com/milvus-io/milvus.git
Make the performance able to scale out Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com> Signed-off-by: yah01 <yang.cen@zilliz.com>pull/20910/head
parent
225ced2183
commit
c5f215da67
|
@ -38,7 +38,7 @@ ignore:
|
|||
- "**/*.pb.go"
|
||||
- "**/*.proto"
|
||||
- "internal/metastore/db/dbmodel/mocks/.*"
|
||||
- "internal/mocks"
|
||||
- "**/mock_*.go"
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -181,6 +181,7 @@ queryCoord:
|
|||
loadTimeoutSeconds: 600
|
||||
checkHandoffInterval: 5000
|
||||
taskMergeCap: 16
|
||||
taskExecutionCap: 256
|
||||
enableActiveStandby: false # Enable active-standby
|
||||
refreshTargetsIntervalSeconds: 300
|
||||
|
||||
|
|
|
@ -313,6 +313,7 @@ func (s *Server) Start() error {
|
|||
}
|
||||
for _, node := range sessions {
|
||||
s.nodeMgr.Add(session.NewNodeInfo(node.ServerID, node.Address))
|
||||
s.taskScheduler.AddExecutor(node.ServerID)
|
||||
}
|
||||
s.checkReplicas()
|
||||
for _, node := range sessions {
|
||||
|
@ -589,6 +590,7 @@ func (s *Server) watchNodes(revision int64) {
|
|||
|
||||
func (s *Server) handleNodeUp(node int64) {
|
||||
log := log.With(zap.Int64("nodeID", node))
|
||||
s.taskScheduler.AddExecutor(node)
|
||||
s.distController.StartDistInstance(s.ctx, node)
|
||||
|
||||
for _, collection := range s.meta.CollectionManager.GetAll() {
|
||||
|
@ -616,6 +618,7 @@ func (s *Server) handleNodeUp(node int64) {
|
|||
|
||||
func (s *Server) handleNodeDown(node int64) {
|
||||
log := log.With(zap.Int64("nodeID", node))
|
||||
s.taskScheduler.RemoveExecutor(node)
|
||||
s.distController.Remove(node)
|
||||
|
||||
// Refresh the targets, to avoid consuming messages too early from channel
|
||||
|
|
|
@ -25,9 +25,11 @@ import (
|
|||
"github.com/milvus-io/milvus/internal/log"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/internal/util/tsoutil"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -44,7 +46,8 @@ type Executor struct {
|
|||
// Merge load segment requests
|
||||
merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest]
|
||||
|
||||
executingTasks sync.Map
|
||||
executingTasks sync.Map
|
||||
executingTaskNum atomic.Int32
|
||||
}
|
||||
|
||||
func NewExecutor(meta *meta.Meta,
|
||||
|
@ -81,10 +84,14 @@ func (ex *Executor) Stop() {
|
|||
// does nothing and returns false if the action is already committed,
|
||||
// returns true otherwise.
|
||||
func (ex *Executor) Execute(task Task, step int) bool {
|
||||
if ex.executingTaskNum.Load() > Params.QueryCoordCfg.TaskExecutionCap {
|
||||
return false
|
||||
}
|
||||
_, exist := ex.executingTasks.LoadOrStore(task.ID(), struct{}{})
|
||||
if exist {
|
||||
return false
|
||||
}
|
||||
ex.executingTaskNum.Inc()
|
||||
|
||||
log := log.With(
|
||||
zap.Int64("taskID", task.ID()),
|
||||
|
@ -136,7 +143,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
|
|||
defer func() {
|
||||
for i := range mergeTask.tasks {
|
||||
mergeTask.tasks[i].SetErr(task.Err())
|
||||
ex.removeAction(mergeTask.tasks[i], mergeTask.steps[i])
|
||||
ex.removeTask(mergeTask.tasks[i], mergeTask.steps[i])
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -179,7 +186,7 @@ func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
|
|||
log.Info("load segments done", zap.Int64("taskID", task.ID()), zap.Duration("timeTaken", elapsed))
|
||||
}
|
||||
|
||||
func (ex *Executor) removeAction(task Task, step int) {
|
||||
func (ex *Executor) removeTask(task Task, step int) {
|
||||
if task.Err() != nil {
|
||||
log.Info("excute action done, remove it",
|
||||
zap.Int64("taskID", task.ID()),
|
||||
|
@ -188,6 +195,7 @@ func (ex *Executor) removeAction(task Task, step int) {
|
|||
}
|
||||
|
||||
ex.executingTasks.Delete(task.ID())
|
||||
ex.executingTaskNum.Dec()
|
||||
}
|
||||
|
||||
func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
|
||||
|
@ -217,7 +225,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
if err != nil {
|
||||
task.SetErr(err)
|
||||
task.Cancel()
|
||||
ex.removeAction(task, step)
|
||||
ex.removeTask(task, step)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -269,7 +277,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
|||
}
|
||||
|
||||
func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
||||
defer ex.removeAction(task, step)
|
||||
defer ex.removeTask(task, step)
|
||||
startTs := time.Now()
|
||||
action := task.Actions()[step].(*SegmentAction)
|
||||
defer action.isReleaseCommitted.Store(true)
|
||||
|
@ -342,7 +350,7 @@ func (ex *Executor) executeDmChannelAction(task *ChannelTask, step int) {
|
|||
}
|
||||
|
||||
func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
|
||||
defer ex.removeAction(task, step)
|
||||
defer ex.removeTask(task, step)
|
||||
startTs := time.Now()
|
||||
action := task.Actions()[step].(*ChannelAction)
|
||||
log := log.With(
|
||||
|
@ -409,7 +417,7 @@ func (ex *Executor) subDmChannel(task *ChannelTask, step int) error {
|
|||
}
|
||||
|
||||
func (ex *Executor) unsubDmChannel(task *ChannelTask, step int) error {
|
||||
defer ex.removeAction(task, step)
|
||||
defer ex.removeTask(task, step)
|
||||
startTs := time.Now()
|
||||
action := task.Actions()[step].(*ChannelAction)
|
||||
log := log.With(
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// Code generated by mockery v2.14.0. DO NOT EDIT.
|
||||
// Code generated by mockery v2.14.1. DO NOT EDIT.
|
||||
|
||||
package task
|
||||
|
||||
|
@ -58,6 +58,34 @@ func (_c *MockScheduler_Add_Call) Return(_a0 error) *MockScheduler_Add_Call {
|
|||
return _c
|
||||
}
|
||||
|
||||
// AddExecutor provides a mock function with given fields: nodeID
|
||||
func (_m *MockScheduler) AddExecutor(nodeID int64) {
|
||||
_m.Called(nodeID)
|
||||
}
|
||||
|
||||
// MockScheduler_AddExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'AddExecutor'
|
||||
type MockScheduler_AddExecutor_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// AddExecutor is a helper method to define mock.On call
|
||||
// - nodeID int64
|
||||
func (_e *MockScheduler_Expecter) AddExecutor(nodeID interface{}) *MockScheduler_AddExecutor_Call {
|
||||
return &MockScheduler_AddExecutor_Call{Call: _e.mock.On("AddExecutor", nodeID)}
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_AddExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_AddExecutor_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_AddExecutor_Call) Return() *MockScheduler_AddExecutor_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
// Dispatch provides a mock function with given fields: node
|
||||
func (_m *MockScheduler) Dispatch(node int64) {
|
||||
_m.Called(node)
|
||||
|
@ -188,6 +216,34 @@ func (_c *MockScheduler_RemoveByNode_Call) Return() *MockScheduler_RemoveByNode_
|
|||
return _c
|
||||
}
|
||||
|
||||
// RemoveExecutor provides a mock function with given fields: nodeID
|
||||
func (_m *MockScheduler) RemoveExecutor(nodeID int64) {
|
||||
_m.Called(nodeID)
|
||||
}
|
||||
|
||||
// MockScheduler_RemoveExecutor_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RemoveExecutor'
|
||||
type MockScheduler_RemoveExecutor_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// RemoveExecutor is a helper method to define mock.On call
|
||||
// - nodeID int64
|
||||
func (_e *MockScheduler_Expecter) RemoveExecutor(nodeID interface{}) *MockScheduler_RemoveExecutor_Call {
|
||||
return &MockScheduler_RemoveExecutor_Call{Call: _e.mock.On("RemoveExecutor", nodeID)}
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_RemoveExecutor_Call) Run(run func(nodeID int64)) *MockScheduler_RemoveExecutor_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockScheduler_RemoveExecutor_Call) Return() *MockScheduler_RemoveExecutor_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
// Start provides a mock function with given fields: ctx
|
||||
func (_m *MockScheduler) Start(ctx context.Context) {
|
||||
_m.Called(ctx)
|
||||
|
|
|
@ -36,8 +36,6 @@ const (
|
|||
TaskTypeGrow Type = iota + 1
|
||||
TaskTypeReduce
|
||||
TaskTypeMove
|
||||
|
||||
taskPoolSize = 256
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -54,8 +52,6 @@ var (
|
|||
// No enough memory to load segment
|
||||
ErrResourceNotEnough = errors.New("ResourceNotEnough")
|
||||
|
||||
ErrTaskQueueFull = errors.New("TaskQueueFull")
|
||||
|
||||
ErrFailedResponse = errors.New("RpcFailed")
|
||||
ErrTaskAlreadyDone = errors.New("TaskAlreadyDone")
|
||||
)
|
||||
|
@ -83,17 +79,17 @@ type replicaChannelIndex struct {
|
|||
}
|
||||
|
||||
type taskQueue struct {
|
||||
// TaskPriority -> Tasks
|
||||
buckets [][]Task
|
||||
|
||||
cap int
|
||||
// TaskPriority -> TaskID -> Task
|
||||
buckets []map[int64]Task
|
||||
}
|
||||
|
||||
func newTaskQueue(cap int) *taskQueue {
|
||||
func newTaskQueue() *taskQueue {
|
||||
buckets := make([]map[int64]Task, len(TaskPriorities))
|
||||
for i := range buckets {
|
||||
buckets[i] = make(map[int64]Task)
|
||||
}
|
||||
return &taskQueue{
|
||||
buckets: make([][]Task, len(TaskPriorities)),
|
||||
|
||||
cap: cap,
|
||||
buckets: buckets,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,35 +102,21 @@ func (queue *taskQueue) Len() int {
|
|||
return taskNum
|
||||
}
|
||||
|
||||
func (queue *taskQueue) Cap() int {
|
||||
return queue.cap
|
||||
}
|
||||
|
||||
func (queue *taskQueue) Add(task Task) bool {
|
||||
if queue.Len() >= queue.Cap() {
|
||||
return false
|
||||
}
|
||||
|
||||
queue.buckets[task.Priority()] = append(queue.buckets[task.Priority()], task)
|
||||
return true
|
||||
func (queue *taskQueue) Add(task Task) {
|
||||
bucket := queue.buckets[task.Priority()]
|
||||
bucket[task.ID()] = task
|
||||
}
|
||||
|
||||
func (queue *taskQueue) Remove(task Task) {
|
||||
bucket := &queue.buckets[task.Priority()]
|
||||
|
||||
for i := range *bucket {
|
||||
if (*bucket)[i].ID() == task.ID() {
|
||||
*bucket = append((*bucket)[:i], (*bucket)[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
bucket := queue.buckets[task.Priority()]
|
||||
delete(bucket, task.ID())
|
||||
}
|
||||
|
||||
// Range iterates all tasks in the queue ordered by priority from high to low
|
||||
func (queue *taskQueue) Range(fn func(task Task) bool) {
|
||||
for priority := len(queue.buckets) - 1; priority >= 0; priority-- {
|
||||
for i := range queue.buckets[priority] {
|
||||
if !fn(queue.buckets[priority][i]) {
|
||||
for _, task := range queue.buckets[priority] {
|
||||
if !fn(task) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -144,6 +126,8 @@ func (queue *taskQueue) Range(fn func(task Task) bool) {
|
|||
type Scheduler interface {
|
||||
Start(ctx context.Context)
|
||||
Stop()
|
||||
AddExecutor(nodeID int64)
|
||||
RemoveExecutor(nodeID int64)
|
||||
Add(task Task) error
|
||||
Dispatch(node int64)
|
||||
RemoveByNode(node int64)
|
||||
|
@ -154,13 +138,14 @@ type Scheduler interface {
|
|||
type taskScheduler struct {
|
||||
rwmutex sync.RWMutex
|
||||
ctx context.Context
|
||||
executor *Executor
|
||||
executors map[int64]*Executor // NodeID -> Executor
|
||||
idAllocator func() UniqueID
|
||||
|
||||
distMgr *meta.DistributionManager
|
||||
meta *meta.Meta
|
||||
targetMgr *meta.TargetManager
|
||||
broker meta.Broker
|
||||
cluster session.Cluster
|
||||
nodeMgr *session.NodeManager
|
||||
|
||||
tasks UniqueSet
|
||||
|
@ -179,8 +164,8 @@ func NewScheduler(ctx context.Context,
|
|||
nodeMgr *session.NodeManager) *taskScheduler {
|
||||
id := int64(0)
|
||||
return &taskScheduler{
|
||||
ctx: ctx,
|
||||
executor: NewExecutor(meta, distMgr, broker, targetMgr, cluster, nodeMgr),
|
||||
ctx: ctx,
|
||||
executors: make(map[int64]*Executor),
|
||||
idAllocator: func() UniqueID {
|
||||
id++
|
||||
return id
|
||||
|
@ -190,22 +175,59 @@ func NewScheduler(ctx context.Context,
|
|||
meta: meta,
|
||||
targetMgr: targetMgr,
|
||||
broker: broker,
|
||||
cluster: cluster,
|
||||
nodeMgr: nodeMgr,
|
||||
|
||||
tasks: make(UniqueSet),
|
||||
segmentTasks: make(map[replicaSegmentIndex]Task),
|
||||
channelTasks: make(map[replicaChannelIndex]Task),
|
||||
processQueue: newTaskQueue(taskPoolSize),
|
||||
waitQueue: newTaskQueue(taskPoolSize * 10),
|
||||
processQueue: newTaskQueue(),
|
||||
waitQueue: newTaskQueue(),
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) Start(ctx context.Context) {
|
||||
scheduler.executor.Start(ctx)
|
||||
}
|
||||
func (scheduler *taskScheduler) Start(ctx context.Context) {}
|
||||
|
||||
func (scheduler *taskScheduler) Stop() {
|
||||
scheduler.executor.Stop()
|
||||
scheduler.rwmutex.Lock()
|
||||
defer scheduler.rwmutex.Unlock()
|
||||
|
||||
for nodeID, executor := range scheduler.executors {
|
||||
executor.Stop()
|
||||
delete(scheduler.executors, nodeID)
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
|
||||
scheduler.rwmutex.Lock()
|
||||
defer scheduler.rwmutex.Unlock()
|
||||
|
||||
if _, exist := scheduler.executors[nodeID]; exist {
|
||||
return
|
||||
}
|
||||
|
||||
executor := NewExecutor(scheduler.meta,
|
||||
scheduler.distMgr,
|
||||
scheduler.broker,
|
||||
scheduler.targetMgr,
|
||||
scheduler.cluster,
|
||||
scheduler.nodeMgr)
|
||||
|
||||
scheduler.executors[nodeID] = executor
|
||||
executor.Start(scheduler.ctx)
|
||||
log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
|
||||
scheduler.rwmutex.Lock()
|
||||
defer scheduler.rwmutex.Unlock()
|
||||
|
||||
executor, ok := scheduler.executors[nodeID]
|
||||
if ok {
|
||||
executor.Stop()
|
||||
delete(scheduler.executors, nodeID)
|
||||
log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
|
||||
}
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) Add(task Task) error {
|
||||
|
@ -217,12 +239,8 @@ func (scheduler *taskScheduler) Add(task Task) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if !scheduler.waitQueue.Add(task) {
|
||||
log.Warn("failed to add task", zap.String("task", task.String()))
|
||||
return ErrTaskQueueFull
|
||||
}
|
||||
|
||||
task.SetID(scheduler.idAllocator())
|
||||
scheduler.waitQueue.Add(task)
|
||||
scheduler.tasks.Insert(task.ID())
|
||||
switch task := task.(type) {
|
||||
case *SegmentTask:
|
||||
|
@ -315,33 +333,35 @@ func (scheduler *taskScheduler) promote(task Task) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if scheduler.processQueue.Add(task) {
|
||||
task.SetStatus(TaskStatusStarted)
|
||||
return nil
|
||||
}
|
||||
|
||||
return ErrTaskQueueFull
|
||||
scheduler.processQueue.Add(task)
|
||||
task.SetStatus(TaskStatusStarted)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (scheduler *taskScheduler) tryPromoteAll() {
|
||||
// Promote waiting tasks
|
||||
toPromote := make([]Task, 0, scheduler.processQueue.Cap()-scheduler.processQueue.Len())
|
||||
toPromote := make([]Task, 0, scheduler.waitQueue.Len())
|
||||
toRemove := make([]Task, 0)
|
||||
scheduler.waitQueue.Range(func(task Task) bool {
|
||||
err := scheduler.promote(task)
|
||||
if errors.Is(err, ErrTaskStale) { // Task canceled or stale
|
||||
task.SetStatus(TaskStatusStale)
|
||||
task.SetErr(err)
|
||||
toRemove = append(toRemove, task)
|
||||
} else if errors.Is(err, ErrTaskCanceled) {
|
||||
|
||||
if err != nil {
|
||||
task.SetStatus(TaskStatusCanceled)
|
||||
if errors.Is(err, ErrTaskStale) { // Task canceled or stale
|
||||
task.SetStatus(TaskStatusStale)
|
||||
}
|
||||
|
||||
log.Warn("failed to promote task",
|
||||
zap.Int64("taskID", task.ID()),
|
||||
zap.Error(err),
|
||||
)
|
||||
task.SetErr(err)
|
||||
toRemove = append(toRemove, task)
|
||||
} else if err == nil {
|
||||
} else {
|
||||
toPromote = append(toPromote, task)
|
||||
}
|
||||
|
||||
return !errors.Is(err, ErrTaskQueueFull)
|
||||
return true
|
||||
})
|
||||
|
||||
for _, task := range toPromote {
|
||||
|
@ -519,7 +539,28 @@ func (scheduler *taskScheduler) process(task Task) bool {
|
|||
zap.Int64("source", task.SourceID()),
|
||||
)
|
||||
|
||||
if !scheduler.executor.Exist(task.ID()) && task.IsFinished(scheduler.distMgr) {
|
||||
actions, step := task.Actions(), task.Step()
|
||||
for step < len(actions) && actions[step].IsFinished(scheduler.distMgr) {
|
||||
task.StepUp()
|
||||
step++
|
||||
}
|
||||
|
||||
if step == len(actions) {
|
||||
step--
|
||||
}
|
||||
|
||||
executor, ok := scheduler.executors[actions[step].Node()]
|
||||
if !ok {
|
||||
log.Warn("no executor for QueryNode",
|
||||
zap.Int("step", step),
|
||||
zap.Int64("nodeID", actions[step].Node()))
|
||||
return false
|
||||
}
|
||||
|
||||
if task.IsFinished(scheduler.distMgr) {
|
||||
if executor.Exist(task.ID()) {
|
||||
return false
|
||||
}
|
||||
task.SetStatus(TaskStatusSucceeded)
|
||||
} else if scheduler.checkCanceled(task) {
|
||||
task.SetStatus(TaskStatusCanceled)
|
||||
|
@ -531,11 +572,10 @@ func (scheduler *taskScheduler) process(task Task) bool {
|
|||
task.SetErr(ErrTaskStale)
|
||||
}
|
||||
|
||||
step := task.Step()
|
||||
log = log.With(zap.Int("step", step))
|
||||
switch task.Status() {
|
||||
case TaskStatusStarted:
|
||||
if scheduler.executor.Execute(task, step) {
|
||||
if executor.Execute(task, step) {
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -74,6 +74,7 @@ type Task interface {
|
|||
Wait() error
|
||||
Actions() []Action
|
||||
Step() int
|
||||
StepUp() int
|
||||
IsFinished(dist *meta.DistributionManager) bool
|
||||
String() string
|
||||
}
|
||||
|
@ -188,18 +189,16 @@ func (task *baseTask) Step() int {
|
|||
return task.step
|
||||
}
|
||||
|
||||
func (task *baseTask) StepUp() int {
|
||||
task.step++
|
||||
return task.step
|
||||
}
|
||||
|
||||
func (task *baseTask) IsFinished(distMgr *meta.DistributionManager) bool {
|
||||
if task.Status() != TaskStatusStarted {
|
||||
return false
|
||||
}
|
||||
|
||||
actions, step := task.Actions(), task.Step()
|
||||
for step < len(actions) && actions[step].IsFinished(distMgr) {
|
||||
task.step++
|
||||
step++
|
||||
}
|
||||
|
||||
return task.Step() >= len(actions)
|
||||
return task.Step() >= len(task.Actions())
|
||||
}
|
||||
|
||||
func (task *baseTask) String() string {
|
||||
|
|
|
@ -132,6 +132,9 @@ func (suite *TaskSuite) SetupTest() {
|
|||
|
||||
suite.scheduler = suite.newScheduler()
|
||||
suite.scheduler.Start(context.Background())
|
||||
suite.scheduler.AddExecutor(1)
|
||||
suite.scheduler.AddExecutor(2)
|
||||
suite.scheduler.AddExecutor(3)
|
||||
}
|
||||
|
||||
func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
|
||||
|
@ -147,7 +150,8 @@ func (suite *TaskSuite) BeforeTest(suiteName, testName string) {
|
|||
"TestTaskCanceled",
|
||||
"TestMoveSegmentTask",
|
||||
"TestSubmitDuplicateLoadSegmentTask",
|
||||
"TestSubmitDuplicateSubscribeChannelTask":
|
||||
"TestSubmitDuplicateSubscribeChannelTask",
|
||||
"TestNoExecutor":
|
||||
suite.meta.PutCollection(&meta.Collection{
|
||||
CollectionLoadInfo: &querypb.CollectionLoadInfo{
|
||||
CollectionID: suite.collection,
|
||||
|
@ -360,7 +364,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
|
|||
|
||||
// Expect
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{
|
||||
Name: "TestSubscribeChannelTask",
|
||||
Name: "TestLoadSegmentTask",
|
||||
}, nil)
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, suite.collection).Return([]int64{100, 101}, nil)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -488,7 +492,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
|
|||
|
||||
// Expect
|
||||
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, suite.collection).Return(&schemapb.CollectionSchema{
|
||||
Name: "TestSubscribeChannelTask",
|
||||
Name: "TestLoadSegmentTask",
|
||||
}, nil)
|
||||
suite.broker.EXPECT().GetPartitions(mock.Anything, suite.collection).Return([]int64{100, 101}, nil)
|
||||
for _, segment := range suite.loadSegments {
|
||||
|
@ -1103,6 +1107,75 @@ func (suite *TaskSuite) TestSegmentTaskReplace() {
|
|||
suite.AssertTaskNum(0, segmentNum, 0, segmentNum)
|
||||
}
|
||||
|
||||
func (suite *TaskSuite) TestNoExecutor() {
|
||||
ctx := context.Background()
|
||||
timeout := 10 * time.Second
|
||||
targetNode := int64(-1)
|
||||
channel := &datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: Params.CommonCfg.RootCoordDml + "-test",
|
||||
}
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(targetNode, "localhost"))
|
||||
suite.meta.ReplicaManager.Put(
|
||||
utils.CreateTestReplica(suite.replica, suite.collection, []int64{1, 2, 3, -1}))
|
||||
|
||||
// Test load segment task
|
||||
suite.dist.ChannelDistManager.Update(targetNode, meta.DmChannelFromVChannel(&datapb.VchannelInfo{
|
||||
CollectionID: suite.collection,
|
||||
ChannelName: channel.ChannelName,
|
||||
}))
|
||||
tasks := []Task{}
|
||||
segments := make([]*datapb.SegmentInfo, 0)
|
||||
for _, segment := range suite.loadSegments {
|
||||
segments = append(segments, &datapb.SegmentInfo{
|
||||
ID: segment,
|
||||
InsertChannel: channel.ChannelName,
|
||||
})
|
||||
task, err := NewSegmentTask(
|
||||
ctx,
|
||||
timeout,
|
||||
0,
|
||||
suite.collection,
|
||||
suite.replica,
|
||||
NewSegmentAction(targetNode, ActionTypeGrow, channel.GetChannelName(), segment),
|
||||
)
|
||||
suite.NoError(err)
|
||||
tasks = append(tasks, task)
|
||||
err = suite.scheduler.Add(task)
|
||||
suite.NoError(err)
|
||||
}
|
||||
suite.target.AddSegment(segments...)
|
||||
segmentsNum := len(suite.loadSegments)
|
||||
suite.AssertTaskNum(0, segmentsNum, 0, segmentsNum)
|
||||
|
||||
// Process tasks
|
||||
suite.dispatchAndWait(targetNode)
|
||||
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
|
||||
|
||||
// Other nodes' HB can't trigger the procedure of tasks
|
||||
suite.dispatchAndWait(targetNode + 1)
|
||||
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
|
||||
|
||||
// Process tasks done
|
||||
// Dist contains channels
|
||||
view := &meta.LeaderView{
|
||||
ID: targetNode,
|
||||
CollectionID: suite.collection,
|
||||
Segments: map[int64]*querypb.SegmentDist{},
|
||||
}
|
||||
for _, segment := range suite.loadSegments {
|
||||
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
|
||||
}
|
||||
suite.dist.LeaderViewManager.Update(targetNode, view)
|
||||
suite.dispatchAndWait(targetNode)
|
||||
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
|
||||
|
||||
for _, task := range tasks {
|
||||
suite.Equal(TaskStatusStarted, task.Status())
|
||||
suite.NoError(task.Err())
|
||||
}
|
||||
}
|
||||
|
||||
func (suite *TaskSuite) AssertTaskNum(process, wait, channel, segment int) {
|
||||
scheduler := suite.scheduler
|
||||
|
||||
|
@ -1122,11 +1195,15 @@ func (suite *TaskSuite) dispatchAndWait(node int64) {
|
|||
for start := time.Now(); time.Since(start) < timeout; {
|
||||
count = 0
|
||||
keys = make([]any, 0)
|
||||
suite.scheduler.executor.executingTasks.Range(func(key, value any) bool {
|
||||
keys = append(keys, key)
|
||||
count++
|
||||
return true
|
||||
})
|
||||
|
||||
for _, executor := range suite.scheduler.executors {
|
||||
executor.executingTasks.Range(func(key, value any) bool {
|
||||
keys = append(keys, key)
|
||||
count++
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
if count == 0 {
|
||||
return
|
||||
}
|
||||
|
|
|
@ -701,9 +701,10 @@ type queryCoordConfig struct {
|
|||
UpdatedTime time.Time
|
||||
|
||||
//---- Task ---
|
||||
RetryNum int32
|
||||
RetryInterval int64
|
||||
TaskMergeCap int32
|
||||
RetryNum int32
|
||||
RetryInterval int64
|
||||
TaskMergeCap int32
|
||||
TaskExecutionCap int32
|
||||
|
||||
//---- Handoff ---
|
||||
AutoHandoff bool
|
||||
|
@ -731,6 +732,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
|||
p.initTaskRetryNum()
|
||||
p.initTaskRetryInterval()
|
||||
p.initTaskMergeCap()
|
||||
p.initTaskExecutionCap()
|
||||
|
||||
//---- Handoff ---
|
||||
p.initAutoHandoff()
|
||||
|
@ -762,6 +764,10 @@ func (p *queryCoordConfig) initTaskMergeCap() {
|
|||
p.TaskMergeCap = p.Base.ParseInt32WithDefault("queryCoord.taskMergeCap", 16)
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initTaskExecutionCap() {
|
||||
p.TaskExecutionCap = p.Base.ParseInt32WithDefault("queryCoord.taskExecutionCap", 256)
|
||||
}
|
||||
|
||||
func (p *queryCoordConfig) initAutoHandoff() {
|
||||
handoff, err := p.Base.Load("queryCoord.autoHandoff")
|
||||
if err != nil {
|
||||
|
|
Loading…
Reference in New Issue