mirror of https://github.com/milvus-io/milvus.git
Remove not used TaskDoneTest of proxy scheduler (#8710)
Signed-off-by: dragondriver <jiquan.long@zilliz.com>pull/8741/head
parent
fb0862e952
commit
bb121c59cb
|
@ -42,7 +42,6 @@ type taskQueue interface {
|
|||
AddActiveTask(t task)
|
||||
PopActiveTask(tID UniqueID) task
|
||||
getTaskByReqID(reqID UniqueID) task
|
||||
TaskDoneTest(ts Timestamp) bool
|
||||
Enqueue(t task) error
|
||||
setMaxTaskNum(num int64)
|
||||
getMaxTaskNum() int64
|
||||
|
@ -162,26 +161,6 @@ func (queue *baseTaskQueue) getTaskByReqID(reqID UniqueID) task {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (queue *baseTaskQueue) TaskDoneTest(ts Timestamp) bool {
|
||||
queue.utLock.RLock()
|
||||
defer queue.utLock.RUnlock()
|
||||
for e := queue.unissuedTasks.Front(); e != nil; e = e.Next() {
|
||||
if e.Value.(task).EndTs() < ts {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
queue.atLock.RLock()
|
||||
defer queue.atLock.RUnlock()
|
||||
for _, task := range queue.activeTasks {
|
||||
if task.BeginTs() < ts {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (queue *baseTaskQueue) Enqueue(t task) error {
|
||||
err := t.OnEnqueue()
|
||||
if err != nil {
|
||||
|
@ -872,12 +851,6 @@ func (sched *taskScheduler) Close() {
|
|||
sched.wg.Wait()
|
||||
}
|
||||
|
||||
func (sched *taskScheduler) TaskDoneTest(ts Timestamp) bool {
|
||||
ddTaskDone := sched.ddQueue.TaskDoneTest(ts)
|
||||
dmTaskDone := sched.dmQueue.TaskDoneTest(ts)
|
||||
return ddTaskDone && dmTaskDone
|
||||
}
|
||||
|
||||
func (sched *taskScheduler) getPChanStatistics() (map[pChan]*pChanStatistics, error) {
|
||||
return sched.dmQueue.getPChanStatsInfo()
|
||||
}
|
||||
|
|
|
@ -16,7 +16,6 @@ import (
|
|||
"math/rand"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -27,7 +26,6 @@ func TestBaseTaskQueue(t *testing.T) {
|
|||
var err error
|
||||
var unissuedTask task
|
||||
var activeTask task
|
||||
var done bool
|
||||
|
||||
tsoAllocatorIns := newMockTsoAllocator()
|
||||
idAllocatorIns := newMockIDAllocatorInterface()
|
||||
|
@ -39,7 +37,6 @@ func TestBaseTaskQueue(t *testing.T) {
|
|||
|
||||
st := newDefaultMockTask()
|
||||
stID := st.ID()
|
||||
stTs := st.BeginTs()
|
||||
|
||||
// no task in queue
|
||||
|
||||
|
@ -52,9 +49,6 @@ func TestBaseTaskQueue(t *testing.T) {
|
|||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.Nil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(stTs)
|
||||
assert.True(t, done)
|
||||
|
||||
// task enqueue, only one task in queue
|
||||
|
||||
err = queue.Enqueue(st)
|
||||
|
@ -71,25 +65,16 @@ func TestBaseTaskQueue(t *testing.T) {
|
|||
unissuedTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.NotNil(t, unissuedTask)
|
||||
assert.True(t, queue.utEmpty())
|
||||
assert.False(t, queue.utFull())
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test active list, no task in queue
|
||||
|
||||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
|
@ -100,15 +85,9 @@ func TestBaseTaskQueue(t *testing.T) {
|
|||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test utFull
|
||||
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
|
||||
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
|
||||
|
@ -126,7 +105,6 @@ func TestDdTaskQueue(t *testing.T) {
|
|||
var err error
|
||||
var unissuedTask task
|
||||
var activeTask task
|
||||
var done bool
|
||||
|
||||
tsoAllocatorIns := newMockTsoAllocator()
|
||||
idAllocatorIns := newMockIDAllocatorInterface()
|
||||
|
@ -138,7 +116,6 @@ func TestDdTaskQueue(t *testing.T) {
|
|||
|
||||
st := newDefaultMockDdlTask()
|
||||
stID := st.ID()
|
||||
stTs := st.BeginTs()
|
||||
|
||||
// no task in queue
|
||||
|
||||
|
@ -151,9 +128,6 @@ func TestDdTaskQueue(t *testing.T) {
|
|||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.Nil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(stTs)
|
||||
assert.True(t, done)
|
||||
|
||||
// task enqueue, only one task in queue
|
||||
|
||||
err = queue.Enqueue(st)
|
||||
|
@ -170,25 +144,16 @@ func TestDdTaskQueue(t *testing.T) {
|
|||
unissuedTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.NotNil(t, unissuedTask)
|
||||
assert.True(t, queue.utEmpty())
|
||||
assert.False(t, queue.utFull())
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test active list, no task in queue
|
||||
|
||||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
|
@ -199,15 +164,9 @@ func TestDdTaskQueue(t *testing.T) {
|
|||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test utFull
|
||||
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
|
||||
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
|
||||
|
@ -226,7 +185,6 @@ func TestDmTaskQueue_Basic(t *testing.T) {
|
|||
var err error
|
||||
var unissuedTask task
|
||||
var activeTask task
|
||||
var done bool
|
||||
|
||||
tsoAllocatorIns := newMockTsoAllocator()
|
||||
idAllocatorIns := newMockIDAllocatorInterface()
|
||||
|
@ -238,7 +196,6 @@ func TestDmTaskQueue_Basic(t *testing.T) {
|
|||
|
||||
st := newDefaultMockDmlTask()
|
||||
stID := st.ID()
|
||||
stTs := st.BeginTs()
|
||||
|
||||
// no task in queue
|
||||
|
||||
|
@ -251,9 +208,6 @@ func TestDmTaskQueue_Basic(t *testing.T) {
|
|||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.Nil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(stTs)
|
||||
assert.True(t, done)
|
||||
|
||||
// task enqueue, only one task in queue
|
||||
|
||||
err = queue.Enqueue(st)
|
||||
|
@ -270,25 +224,16 @@ func TestDmTaskQueue_Basic(t *testing.T) {
|
|||
unissuedTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.NotNil(t, unissuedTask)
|
||||
assert.True(t, queue.utEmpty())
|
||||
assert.False(t, queue.utFull())
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test active list, no task in queue
|
||||
|
||||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
|
@ -299,15 +244,9 @@ func TestDmTaskQueue_Basic(t *testing.T) {
|
|||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test utFull
|
||||
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
|
||||
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
|
||||
|
@ -366,7 +305,6 @@ func TestDqTaskQueue(t *testing.T) {
|
|||
var err error
|
||||
var unissuedTask task
|
||||
var activeTask task
|
||||
var done bool
|
||||
|
||||
tsoAllocatorIns := newMockTsoAllocator()
|
||||
idAllocatorIns := newMockIDAllocatorInterface()
|
||||
|
@ -378,7 +316,6 @@ func TestDqTaskQueue(t *testing.T) {
|
|||
|
||||
st := newDefaultMockDqlTask()
|
||||
stID := st.ID()
|
||||
stTs := st.BeginTs()
|
||||
|
||||
// no task in queue
|
||||
|
||||
|
@ -391,9 +328,6 @@ func TestDqTaskQueue(t *testing.T) {
|
|||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.Nil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(stTs)
|
||||
assert.True(t, done)
|
||||
|
||||
// task enqueue, only one task in queue
|
||||
|
||||
err = queue.Enqueue(st)
|
||||
|
@ -410,25 +344,16 @@ func TestDqTaskQueue(t *testing.T) {
|
|||
unissuedTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, unissuedTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
unissuedTask = queue.PopUnissuedTask()
|
||||
assert.NotNil(t, unissuedTask)
|
||||
assert.True(t, queue.utEmpty())
|
||||
assert.False(t, queue.utFull())
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test active list, no task in queue
|
||||
|
||||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.Nil(t, activeTask)
|
||||
|
||||
|
@ -439,15 +364,9 @@ func TestDqTaskQueue(t *testing.T) {
|
|||
activeTask = queue.getTaskByReqID(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.False(t, done)
|
||||
|
||||
activeTask = queue.PopActiveTask(unissuedTask.ID())
|
||||
assert.NotNil(t, activeTask)
|
||||
|
||||
done = queue.TaskDoneTest(unissuedTask.BeginTs() + 1)
|
||||
assert.True(t, done)
|
||||
|
||||
// test utFull
|
||||
queue.setMaxTaskNum(10) // not accurate, full also means utBufChan block
|
||||
for i := 0; i < int(queue.getMaxTaskNum()); i++ {
|
||||
|
@ -477,8 +396,6 @@ func TestTaskScheduler(t *testing.T) {
|
|||
assert.NoError(t, err)
|
||||
defer sched.Close()
|
||||
|
||||
assert.True(t, sched.TaskDoneTest(Timestamp(time.Now().Nanosecond())))
|
||||
|
||||
stats, err := sched.getPChanStatistics()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0, len(stats))
|
||||
|
|
Loading…
Reference in New Issue