mirror of https://github.com/milvus-io/milvus.git
[skip ci]Fix golint in indexnode/task_scheduler.go (#9117)
Signed-off-by: cai.zhang <cai.zhang@zilliz.com>pull/9119/head
parent
1f231853ae
commit
4ec9937998
|
@ -26,6 +26,7 @@ import (
|
|||
oplog "github.com/opentracing/opentracing-go/log"
|
||||
)
|
||||
|
||||
// TaskQueue is a queue used to store tasks.
|
||||
type TaskQueue interface {
|
||||
utChan() <-chan int
|
||||
utEmpty() bool
|
||||
|
@ -39,6 +40,7 @@ type TaskQueue interface {
|
|||
//tryToRemoveUselessIndexBuildTask(indexID UniqueID) []UniqueID
|
||||
}
|
||||
|
||||
// BaseTaskQueue is a basic instance of TaskQueue.
|
||||
type BaseTaskQueue struct {
|
||||
unissuedTasks *list.List
|
||||
activeTasks map[UniqueID]task
|
||||
|
@ -89,6 +91,7 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error {
|
|||
// return queue.unissuedTasks.Front().Value.(task)
|
||||
//}
|
||||
|
||||
// PopUnissuedTask pops a task from tasks queue.
|
||||
func (queue *BaseTaskQueue) PopUnissuedTask() task {
|
||||
queue.utLock.Lock()
|
||||
defer queue.utLock.Unlock()
|
||||
|
@ -103,6 +106,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
|
|||
return ft.Value.(task)
|
||||
}
|
||||
|
||||
// AddActiveTask adds a task to activeTasks.
|
||||
func (queue *BaseTaskQueue) AddActiveTask(t task) {
|
||||
queue.atLock.Lock()
|
||||
defer queue.atLock.Unlock()
|
||||
|
@ -116,6 +120,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
|
|||
queue.activeTasks[tID] = t
|
||||
}
|
||||
|
||||
// PopActiveTask tasks out a task from activateTask and the task will be executed.
|
||||
func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
|
||||
queue.atLock.Lock()
|
||||
defer queue.atLock.Unlock()
|
||||
|
@ -150,6 +155,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
|
|||
// return indexBuildIDs
|
||||
//}
|
||||
|
||||
// Enqueue adds a task to TaskQueue.
|
||||
func (queue *BaseTaskQueue) Enqueue(t task) error {
|
||||
err := t.OnEnqueue()
|
||||
if err != nil {
|
||||
|
@ -158,10 +164,12 @@ func (queue *BaseTaskQueue) Enqueue(t task) error {
|
|||
return queue.addUnissuedTask(t)
|
||||
}
|
||||
|
||||
// IndexBuildTaskQueue is a task queue used to store building index tasks.
|
||||
type IndexBuildTaskQueue struct {
|
||||
BaseTaskQueue
|
||||
}
|
||||
|
||||
// NewIndexBuildTaskQueue creates a new IndexBuildTaskQueue.
|
||||
func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexBuildTaskQueue {
|
||||
return &IndexBuildTaskQueue{
|
||||
BaseTaskQueue: BaseTaskQueue{
|
||||
|
@ -174,6 +182,7 @@ func NewIndexBuildTaskQueue(sched *TaskScheduler) *IndexBuildTaskQueue {
|
|||
}
|
||||
}
|
||||
|
||||
// TaskScheduler is a scheduler of indexing tasks.
|
||||
type TaskScheduler struct {
|
||||
IndexBuildQueue TaskQueue
|
||||
|
||||
|
@ -184,6 +193,7 @@ type TaskScheduler struct {
|
|||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewTaskScheduler creates a new task scheduler of indexing tasks.
|
||||
func NewTaskScheduler(ctx context.Context,
|
||||
kv kv.BaseKV) (*TaskScheduler, error) {
|
||||
ctx1, cancel := context.WithCancel(ctx)
|
||||
|
@ -280,6 +290,7 @@ func (sched *TaskScheduler) indexBuildLoop() {
|
|||
}
|
||||
}
|
||||
|
||||
// Start stats the task scheduler of indexing tasks.
|
||||
func (sched *TaskScheduler) Start() error {
|
||||
|
||||
sched.wg.Add(1)
|
||||
|
@ -287,6 +298,7 @@ func (sched *TaskScheduler) Start() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Close closes the task scheduler of indexing tasks.
|
||||
func (sched *TaskScheduler) Close() {
|
||||
sched.cancel()
|
||||
sched.wg.Wait()
|
||||
|
|
Loading…
Reference in New Issue