// Licensed to the LF AI & Data foundation under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package datacoord import ( "container/heap" "sync" ) // schedulePolicy is the policy of scheduler. type schedulePolicy interface { Push(task Task) // Pop get the task next ready to run. Pop() Task Get(taskID UniqueID) Task Keys() []UniqueID TaskCount() int Exist(taskID UniqueID) bool Remove(taskID UniqueID) } var _ schedulePolicy = &priorityQueuePolicy{} // priorityQueuePolicy implements a priority queue that sorts tasks by taskID (smaller taskID has higher priority) type priorityQueuePolicy struct { tasks map[UniqueID]Task heap *taskHeap lock sync.RWMutex } // taskHeap implements a min-heap for Task objects, sorted by taskID type taskHeap []Task func (h taskHeap) Len() int { return len(h) } func (h taskHeap) Less(i, j int) bool { return h[i].GetTaskID() < h[j].GetTaskID() } func (h taskHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *taskHeap) Push(x interface{}) { *h = append(*h, x.(Task)) } func (h *taskHeap) Pop() interface{} { old := *h n := len(old) item := old[n-1] *h = old[0 : n-1] return item } // newPriorityQueuePolicy creates a new priority queue policy func newPriorityQueuePolicy() *priorityQueuePolicy { h := &taskHeap{} heap.Init(h) return &priorityQueuePolicy{ tasks: make(map[UniqueID]Task), heap: h, lock: sync.RWMutex{}, } } func (pqp *priorityQueuePolicy) Push(task Task) { pqp.lock.Lock() defer pqp.lock.Unlock() taskID := task.GetTaskID() if _, exists := pqp.tasks[taskID]; !exists { pqp.tasks[taskID] = task heap.Push(pqp.heap, task) } } func (pqp *priorityQueuePolicy) Pop() Task { pqp.lock.Lock() defer pqp.lock.Unlock() if pqp.heap.Len() == 0 { return nil } task := heap.Pop(pqp.heap).(Task) delete(pqp.tasks, task.GetTaskID()) return task } func (pqp *priorityQueuePolicy) Get(taskID UniqueID) Task { pqp.lock.RLock() defer pqp.lock.RUnlock() return pqp.tasks[taskID] } func (pqp *priorityQueuePolicy) Keys() []UniqueID { pqp.lock.RLock() defer pqp.lock.RUnlock() keys := make([]UniqueID, 0, len(pqp.tasks)) for taskID := range pqp.tasks { keys = append(keys, taskID) } return keys } func (pqp *priorityQueuePolicy) TaskCount() int { pqp.lock.RLock() defer pqp.lock.RUnlock() return len(pqp.tasks) } func (pqp *priorityQueuePolicy) Exist(taskID UniqueID) bool { pqp.lock.RLock() defer pqp.lock.RUnlock() _, exists := pqp.tasks[taskID] return exists } func (pqp *priorityQueuePolicy) Remove(taskID UniqueID) { pqp.lock.Lock() defer pqp.lock.Unlock() if _, exists := pqp.tasks[taskID]; !exists { return } delete(pqp.tasks, taskID) // Find and remove from heap for i, task := range *pqp.heap { if task.GetTaskID() == taskID { heap.Remove(pqp.heap, i) break } } }