Support to set max task number of task queue (#7974)

Signed-off-by: dragondriver <jiquan.long@zilliz.com>
pull/7975/head
dragondriver 2021-09-15 19:09:50 +08:00 committed by GitHub
parent ff8fef6ad0
commit 7f048ea8c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 76 additions and 18 deletions

View File

@ -30,3 +30,5 @@ proxy:
maxFieldNum: 64
maxDimension: 32768
maxShardNum: 256
maxTaskNum: 1024

View File

@ -61,6 +61,8 @@ type ParamTable struct {
DefaultPartitionName string
DefaultIndexName string
MaxTaskNum int64
PulsarMaxMessageSize int
Log log.Config
RoleName string
@ -99,6 +101,8 @@ func (pt *ParamTable) initParams() {
pt.initPulsarMaxMessageSize()
pt.initRoleName()
pt.initMaxTaskNum()
}
func (pt *ParamTable) InitAlias(alias string) {
@ -303,3 +307,15 @@ func (pt *ParamTable) initMetaRootPath() {
}
pt.MetaRootPath = path.Join(rootPath, subPath)
}
func (pt *ParamTable) initMaxTaskNum() {
str, err := pt.Load("proxy.maxTaskNum")
if err != nil {
panic(err)
}
maxTaskNum, err := strconv.ParseInt(str, 10, 64)
if err != nil {
panic(err)
}
pt.MaxTaskNum = maxTaskNum
}

View File

@ -81,6 +81,10 @@ func TestParamTable_Normal(t *testing.T) {
t.Run("RoleName", func(t *testing.T) {
t.Logf("RoleName: %s", Params.RoleName)
})
t.Run("MaxTaskNum", func(t *testing.T) {
t.Logf("MaxTaskNum: %d", Params.MaxTaskNum)
})
}
func shouldPanic(t *testing.T, name string, f func()) {
@ -149,4 +153,14 @@ func TestParamTable_Panics(t *testing.T) {
Params.Save("proxy.maxDimension", "-asdf")
Params.initMaxDimension()
})
shouldPanic(t, "proxy.maxTaskNum", func() {
Params.Remove("proxy.maxTaskNum")
Params.initMaxTaskNum()
})
shouldPanic(t, "proxy.maxTaskNum", func() {
Params.Save("proxy.maxTaskNum", "-asdf")
Params.initMaxTaskNum()
})
}

View File

@ -1686,8 +1686,8 @@ func TestProxy(t *testing.T) {
// queue full
ddParallel := proxy.sched.ddQueue.maxTaskNum
proxy.sched.ddQueue.maxTaskNum = 0
ddParallel := proxy.sched.ddQueue.getMaxTaskNum()
proxy.sched.ddQueue.setMaxTaskNum(0)
t.Run("failed to create collection, dd queue full", func(t *testing.T) {
resp, err := proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{})
@ -1697,7 +1697,7 @@ func TestProxy(t *testing.T) {
// TODO(dragondriver): other tasks
proxy.sched.ddQueue.maxTaskNum = ddParallel
proxy.sched.ddQueue.setMaxTaskNum(ddParallel)
// timeout

View File

@ -44,11 +44,10 @@ type taskQueue interface {
getTaskByReqID(reqID UniqueID) task
TaskDoneTest(ts Timestamp) bool
Enqueue(t task) error
setMaxTaskNum(num int64)
getMaxTaskNum() int64
}
// TODO(dragondriver): load from config
const maxTaskNum = 1024
type baseTaskQueue struct {
unissuedTasks *list.List
activeTasks map[UniqueID]task
@ -56,7 +55,8 @@ type baseTaskQueue struct {
atLock sync.RWMutex
// maxTaskNum should keep still
maxTaskNum int64
maxTaskNum int64
maxTaskNumMtx sync.RWMutex
utBufChan chan int // to block scheduler
@ -75,7 +75,7 @@ func (queue *baseTaskQueue) utEmpty() bool {
}
func (queue *baseTaskQueue) utFull() bool {
return int64(queue.unissuedTasks.Len()) >= queue.maxTaskNum
return int64(queue.unissuedTasks.Len()) >= queue.getMaxTaskNum()
}
func (queue *baseTaskQueue) addUnissuedTask(t task) error {
@ -203,14 +203,28 @@ func (queue *baseTaskQueue) Enqueue(t task) error {
return queue.addUnissuedTask(t)
}
func (queue *baseTaskQueue) setMaxTaskNum(num int64) {
queue.maxTaskNumMtx.Lock()
defer queue.maxTaskNumMtx.Unlock()
queue.maxTaskNum = num
}
func (queue *baseTaskQueue) getMaxTaskNum() int64 {
queue.maxTaskNumMtx.RLock()
defer queue.maxTaskNumMtx.RUnlock()
return queue.maxTaskNum
}
func newBaseTaskQueue(tsoAllocatorIns tsoAllocator, idAllocatorIns idAllocatorInterface) *baseTaskQueue {
return &baseTaskQueue{
unissuedTasks: list.New(),
activeTasks: make(map[UniqueID]task),
utLock: sync.RWMutex{},
atLock: sync.RWMutex{},
maxTaskNum: maxTaskNum,
utBufChan: make(chan int, maxTaskNum),
maxTaskNum: Params.MaxTaskNum,
utBufChan: make(chan int, Params.MaxTaskNum),
tsoAllocatorIns: tsoAllocatorIns,
idAllocatorIns: idAllocatorIns,
}

View File

@ -22,6 +22,8 @@ import (
)
func TestBaseTaskQueue(t *testing.T) {
Params.Init()
var err error
var unissuedTask task
var activeTask task
@ -108,8 +110,8 @@ func TestBaseTaskQueue(t *testing.T) {
assert.True(t, done)
// test utFull
queue.maxTaskNum = 10 // not accurate, full also means utBufChan block
for i := 0; i < int(queue.maxTaskNum); i++ {
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
err = queue.Enqueue(newDefaultMockTask())
assert.Nil(t, err)
}
@ -119,6 +121,8 @@ func TestBaseTaskQueue(t *testing.T) {
}
func TestDdTaskQueue(t *testing.T) {
Params.Init()
var err error
var unissuedTask task
var activeTask task
@ -205,8 +209,8 @@ func TestDdTaskQueue(t *testing.T) {
assert.True(t, done)
// test utFull
queue.maxTaskNum = 10 // not accurate, full also means utBufChan block
for i := 0; i < int(queue.maxTaskNum); i++ {
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
err = queue.Enqueue(newDefaultMockDdlTask())
assert.Nil(t, err)
}
@ -217,6 +221,8 @@ func TestDdTaskQueue(t *testing.T) {
// test the logic of queue
func TestDmTaskQueue_Basic(t *testing.T) {
Params.Init()
var err error
var unissuedTask task
var activeTask task
@ -303,8 +309,8 @@ func TestDmTaskQueue_Basic(t *testing.T) {
assert.True(t, done)
// test utFull
queue.maxTaskNum = 10 // not accurate, full also means utBufChan block
for i := 0; i < int(queue.maxTaskNum); i++ {
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
err = queue.Enqueue(newDefaultMockDmlTask())
assert.Nil(t, err)
}
@ -315,6 +321,8 @@ func TestDmTaskQueue_Basic(t *testing.T) {
// test the timestamp statistics
func TestDmTaskQueue_TimestampStatistics(t *testing.T) {
Params.Init()
var err error
var unissuedTask task
@ -353,6 +361,8 @@ func TestDmTaskQueue_TimestampStatistics(t *testing.T) {
}
func TestDqTaskQueue(t *testing.T) {
Params.Init()
var err error
var unissuedTask task
var activeTask task
@ -439,8 +449,8 @@ func TestDqTaskQueue(t *testing.T) {
assert.True(t, done)
// test utFull
queue.maxTaskNum = 10 // not accurate, full also means utBufChan block
for i := 0; i < int(queue.maxTaskNum); i++ {
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
err = queue.Enqueue(newDefaultMockDqlTask())
assert.Nil(t, err)
}
@ -450,6 +460,8 @@ func TestDqTaskQueue(t *testing.T) {
}
func TestTaskScheduler(t *testing.T) {
Params.Init()
var err error
ctx := context.Background()