[skip ci] Fix golint in indexcoord/task_scheduler.go ()

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
pull/8901/head
cai.zhang 2021-09-29 23:34:26 +08:00 committed by GitHub
parent 435dcfe2a0
commit 83d6339188
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 0 deletions
internal/indexcoord

View File

@ -27,6 +27,7 @@ import (
oplog "github.com/opentracing/opentracing-go/log" oplog "github.com/opentracing/opentracing-go/log"
) )
// TaskQueue is a queue used to store tasks.
type TaskQueue interface { type TaskQueue interface {
utChan() <-chan int utChan() <-chan int
utEmpty() bool utEmpty() bool
@ -40,6 +41,7 @@ type TaskQueue interface {
tryToRemoveUselessIndexAddTask(indexID UniqueID) []UniqueID tryToRemoveUselessIndexAddTask(indexID UniqueID) []UniqueID
} }
// BaseTaskQueue is a basic instance of TaskQueue.
type BaseTaskQueue struct { type BaseTaskQueue struct {
unissuedTasks *list.List unissuedTasks *list.List
activeTasks map[UniqueID]task activeTasks map[UniqueID]task
@ -90,6 +92,7 @@ func (queue *BaseTaskQueue) addUnissuedTask(t task) error {
// return queue.unissuedTasks.Front().Value.(task) // return queue.unissuedTasks.Front().Value.(task)
//} //}
// PopUnissuedTask pops a task from tasks queue.
func (queue *BaseTaskQueue) PopUnissuedTask() task { func (queue *BaseTaskQueue) PopUnissuedTask() task {
queue.utLock.Lock() queue.utLock.Lock()
defer queue.utLock.Unlock() defer queue.utLock.Unlock()
@ -104,6 +107,7 @@ func (queue *BaseTaskQueue) PopUnissuedTask() task {
return ft.Value.(task) return ft.Value.(task)
} }
// AddActiveTask adds a task to activeTasks.
func (queue *BaseTaskQueue) AddActiveTask(t task) { func (queue *BaseTaskQueue) AddActiveTask(t task) {
queue.atLock.Lock() queue.atLock.Lock()
defer queue.atLock.Unlock() defer queue.atLock.Unlock()
@ -117,6 +121,7 @@ func (queue *BaseTaskQueue) AddActiveTask(t task) {
queue.activeTasks[tID] = t queue.activeTasks[tID] = t
} }
// PopActiveTask tasks out a task from activateTask and the task will be executed.
func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task { func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
queue.atLock.Lock() queue.atLock.Lock()
defer queue.atLock.Unlock() defer queue.atLock.Unlock()
@ -130,6 +135,7 @@ func (queue *BaseTaskQueue) PopActiveTask(tID UniqueID) task {
return nil return nil
} }
// Enqueue adds a task to TaskQueue.
func (queue *BaseTaskQueue) Enqueue(t task) error { func (queue *BaseTaskQueue) Enqueue(t task) error {
tID, _ := queue.sched.idAllocator.AllocOne() tID, _ := queue.sched.idAllocator.AllocOne()
log.Debug("indexcoord", zap.Int64("[Builder] allocate reqID", tID)) log.Debug("indexcoord", zap.Int64("[Builder] allocate reqID", tID))
@ -141,11 +147,13 @@ func (queue *BaseTaskQueue) Enqueue(t task) error {
return queue.addUnissuedTask(t) return queue.addUnissuedTask(t)
} }
// IndexAddTaskQueue is a task queue used to store building index tasks.
type IndexAddTaskQueue struct { type IndexAddTaskQueue struct {
BaseTaskQueue BaseTaskQueue
lock sync.Mutex lock sync.Mutex
} }
// Enqueue adds a building index task to IndexAddTaskQueue.
func (queue *IndexAddTaskQueue) Enqueue(t task) error { func (queue *IndexAddTaskQueue) Enqueue(t task) error {
queue.lock.Lock() queue.lock.Lock()
defer queue.lock.Unlock() defer queue.lock.Unlock()
@ -175,6 +183,7 @@ func (queue *IndexAddTaskQueue) tryToRemoveUselessIndexAddTask(indexID UniqueID)
return indexBuildIDs return indexBuildIDs
} }
// NewIndexAddTaskQueue creates a new IndexAddTaskQueue.
func NewIndexAddTaskQueue(sched *TaskScheduler) *IndexAddTaskQueue { func NewIndexAddTaskQueue(sched *TaskScheduler) *IndexAddTaskQueue {
return &IndexAddTaskQueue{ return &IndexAddTaskQueue{
BaseTaskQueue: BaseTaskQueue{ BaseTaskQueue: BaseTaskQueue{
@ -187,6 +196,7 @@ func NewIndexAddTaskQueue(sched *TaskScheduler) *IndexAddTaskQueue {
} }
} }
// TaskScheduler is a scheduler of indexing tasks.
type TaskScheduler struct { type TaskScheduler struct {
IndexAddQueue TaskQueue IndexAddQueue TaskQueue
@ -199,6 +209,7 @@ type TaskScheduler struct {
cancel context.CancelFunc cancel context.CancelFunc
} }
// NewTaskScheduler creates a new task scheduler of indexing tasks.
func NewTaskScheduler(ctx context.Context, func NewTaskScheduler(ctx context.Context,
idAllocator *allocator.GlobalIDAllocator, idAllocator *allocator.GlobalIDAllocator,
kv kv.BaseKV, kv kv.BaseKV,
@ -272,6 +283,7 @@ func (sched *TaskScheduler) indexAddLoop() {
} }
} }
// Start stats the task scheduler of indexing tasks.
func (sched *TaskScheduler) Start() error { func (sched *TaskScheduler) Start() error {
sched.wg.Add(1) sched.wg.Add(1)
@ -280,6 +292,7 @@ func (sched *TaskScheduler) Start() error {
return nil return nil
} }
// Start closes the task scheduler of indexing tasks.
func (sched *TaskScheduler) Close() { func (sched *TaskScheduler) Close() {
sched.cancel() sched.cancel()
sched.wg.Wait() sched.wg.Wait()