fix(tasks): reduce memory allocations in scheduler

This will reduce memory allocations in the scheduler by removing
unneccessary delete/replace actions on the btree that is used as an
internal priority queue.
pull/16778/head
docmerlin (j. Emrys Landivar) 2020-02-06 19:19:47 -06:00 committed by Jorge Landivar
parent 43beabdc7a
commit 06102ba3d4
1 changed files with 42 additions and 54 deletions

View File

@ -61,18 +61,18 @@ const (
// btree, then releases the lock. We do not have to readjust the time on delete, because, if the minimum task isn't
// ready yet, the main loop just resets the timer and keeps going.
type TreeScheduler struct {
mu sync.RWMutex
scheduled *btree.BTree
nextTime map[ID]ordering // we need this index so we can delete items from the scheduled
when time.Time
executor Executor
onErr ErrorFunc
time clock.Clock
timer *clock.Timer
done chan struct{}
workchans []chan Item
wg sync.WaitGroup
checkpointer SchedulableService
mu sync.RWMutex
priorityQueue *btree.BTree
nextTime map[ID]int64 // we need this index so we can delete items from the scheduled
when time.Time
executor Executor
onErr ErrorFunc
time clock.Clock
timer *clock.Timer
done chan struct{}
workchans []chan Item
wg sync.WaitGroup
checkpointer SchedulableService
sm *SchedulerMetrics
}
@ -111,13 +111,13 @@ func WithTime(t clock.Clock) treeSchedulerOptFunc {
// Schedulers should be initialized with this function.
func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...treeSchedulerOptFunc) (*TreeScheduler, *SchedulerMetrics, error) {
s := &TreeScheduler{
executor: executor,
scheduled: btree.New(degreeBtreeScheduled),
nextTime: map[ID]ordering{},
onErr: func(_ context.Context, _ ID, _ time.Time, _ error) {},
time: clock.New(),
done: make(chan struct{}, 1),
checkpointer: checkpointer,
executor: executor,
priorityQueue: btree.New(degreeBtreeScheduled),
nextTime: map[ID]int64{},
onErr: func(_ context.Context, _ ID, _ time.Time, _ error) {},
time: clock.New(),
done: make(chan struct{}, 1),
checkpointer: checkpointer,
}
// apply options
@ -164,27 +164,27 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr
case <-s.timer.C:
for { // this for loop is a work around to the way clock's mock works when you reset duration 0 in a different thread than you are calling your clock.Set
s.mu.Lock()
min := s.scheduled.Min()
min := s.priorityQueue.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue
s.when = time.Time{}
s.mu.Unlock()
continue schedulerLoop
}
it := min.(Item)
if ts := s.time.Now().UTC(); it.when().After(ts) {
s.timer.Reset(ts.Sub(it.when()))
if ts := s.time.Now().UTC(); it.When().After(ts) {
s.timer.Reset(ts.Sub(it.When()))
s.mu.Unlock()
continue schedulerLoop
}
s.process()
min = s.scheduled.Min()
min = s.priorityQueue.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue after processing
s.when = time.Time{}
s.mu.Unlock()
continue schedulerLoop
}
it = min.(Item)
s.when = it.when()
s.when = it.When()
until := s.when.Sub(s.time.Now())
if until > 0 {
@ -216,14 +216,14 @@ type itemList struct {
func (s *TreeScheduler) process() {
iter, toReAdd := s.iterator(s.time.Now())
s.scheduled.Ascend(iter)
s.priorityQueue.Ascend(iter)
for i := range toReAdd.toDelete {
delete(s.nextTime, toReAdd.toDelete[i].id)
s.scheduled.Delete(toReAdd.toDelete[i])
s.priorityQueue.Delete(toReAdd.toDelete[i])
}
for i := range toReAdd.toInsert {
s.nextTime[toReAdd.toInsert[i].id] = toReAdd.toInsert[i].ordering
s.scheduled.ReplaceOrInsert(toReAdd.toInsert[i])
s.nextTime[toReAdd.toInsert[i].id] = toReAdd.toInsert[i].when
s.priorityQueue.ReplaceOrInsert(toReAdd.toInsert[i])
}
}
@ -260,9 +260,6 @@ func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *itemList) {
case <-s.done:
return false
default:
itemsToPlace.toDelete = append(itemsToPlace.toDelete, it)
it.incrementNonce()
itemsToPlace.toInsert = append(itemsToPlace.toInsert, it)
return true
}
}
@ -279,13 +276,13 @@ func (s *TreeScheduler) When() time.Time {
}
func (s *TreeScheduler) release(taskID ID) {
ordering, ok := s.nextTime[taskID]
when, ok := s.nextTime[taskID]
if !ok {
return
}
// delete the old task run time
s.scheduled.Delete(Item{id: taskID, ordering: ordering})
s.priorityQueue.Delete(Item{id: taskID, when: when})
delete(s.nextTime, taskID)
}
@ -318,7 +315,7 @@ func (s *TreeScheduler) work(ctx context.Context, ch chan Item) {
s.sm.reportScheduleDelay(time.Since(it.Next()))
preExec := time.Now()
// execute
err = s.executor.Execute(ctx, it.id, t, it.when())
err = s.executor.Execute(ctx, it.id, t, it.When())
// report how long execution took
s.sm.reportExecution(err, time.Since(preExec))
return err
@ -349,7 +346,7 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error {
return err
}
it.next = nt.UTC().Unix()
it.ordering.when = it.next + it.Offset
it.when = it.next + it.Offset
s.mu.Lock()
defer s.mu.Unlock()
@ -369,30 +366,21 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error {
if ok {
// delete the old task run time
s.scheduled.Delete(Item{
ordering: nextTime,
id: it.id,
s.priorityQueue.Delete(Item{
when: nextTime,
id: it.id,
})
}
s.nextTime[it.id] = ordering{when: it.next + it.Offset}
s.nextTime[it.id] = it.next + it.Offset
// insert the new task run time
s.scheduled.ReplaceOrInsert(it)
s.priorityQueue.ReplaceOrInsert(it)
return nil
}
type ordering struct {
when int64
nonce int // for retries
}
func (k *ordering) incrementNonce() {
k.nonce++
}
// Item is a task in the scheduler.
type Item struct {
ordering
when int64
id ID
cron Schedule
next int64
@ -403,14 +391,14 @@ func (it Item) Next() time.Time {
return time.Unix(it.next, 0)
}
func (it Item) when() time.Time {
return time.Unix(it.ordering.when, 0)
func (it Item) When() time.Time {
return time.Unix(it.when, 0)
}
// Less tells us if one Item is less than another
func (it Item) Less(bItem btree.Item) bool {
it2 := bItem.(Item)
return it.ordering.when < it2.ordering.when || (it.ordering.when == it2.ordering.when && (it.nonce < it2.nonce || it.nonce == it2.nonce && it.id < it2.id))
return it.when < it2.when || ((it.when == it2.when) && it.id < it2.id)
}
func (it *Item) updateNext() error {
@ -419,6 +407,6 @@ func (it *Item) updateNext() error {
return err
}
it.next = newNext.UTC().Unix()
it.ordering.when = it.next + it.Offset
it.when = it.next + it.Offset
return nil
}