fix(tasks): scheduler btree panic

pull/15710/head
docmerlin (j. Emrys Landivar) 2019-10-31 17:18:22 -05:00 committed by Jorge Landivar
parent cba69aba5f
commit 71a2590fba
2 changed files with 77 additions and 15 deletions

View File

@ -362,6 +362,65 @@ func TestSchedule_panic(t *testing.T) {
}
}
func TestTreeScheduler_LongPanicTest(t *testing.T) {
// This test is to catch one specifgic type of race condition that can occur and isn't caught by race test, but causes a panic
// in the google btree library
now := time.Date(2096, time.December, 30, 0, 0, 0, 0, time.UTC)
mockTime := clock.NewMock()
mockTime.Set(now)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
default:
}
}}
sch, _, err := NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error {
return nil
}},
WithTime(mockTime),
WithMaxConcurrentWorkers(20))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
// this tests for a race condition in the btree that isn't normally caught by the race detector
schedule, err := NewSchedule("* * * * * * *")
if err != nil {
t.Fatal(err)
}
badSchedule, err := NewSchedule("0 0 1 12 *")
if err != nil {
t.Fatal(err)
}
for i := ID(1); i <= 2000; i++ { // since a valid ID probably shouldn't be zero
if i%100 == 0 {
err = sch.Schedule(mockSchedulable{id: i, schedule: badSchedule, offset: 0, lastScheduled: now.Add(-1 * time.Second)})
if err != nil {
t.Fatal(err)
}
} else {
err = sch.Schedule(mockSchedulable{id: i, schedule: schedule, offset: 0, lastScheduled: now.Add(-1 * time.Second)})
if err != nil {
t.Fatal(err)
}
}
}
time.Sleep(2 * time.Second)
sch.mu.Lock()
mockTime.Set(mockTime.Now().UTC().Add(99 * time.Second))
sch.mu.Unlock()
time.Sleep(5 * time.Second)
}
func TestTreeScheduler_Release(t *testing.T) {
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {

View File

@ -207,20 +207,23 @@ func (s *TreeScheduler) Stop() {
s.wg.Wait()
}
type unsent struct {
items []Item
}
func (u *unsent) append(i Item) {
u.items = append(u.items, i)
// itemList is a list of items for deleting and inserting. We have to do them seperately instead of just a re-add,
// because usually the items key must be changed between the delete and insert
type itemList struct {
toInsert []Item
toDelete []Item
}
func (s *TreeScheduler) process() {
iter, toReAdd := s.iterator(s.time.Now())
s.scheduled.Ascend(iter)
for i := range toReAdd.items {
s.nextTime[toReAdd.items[i].id] = toReAdd.items[i].ordering
s.scheduled.ReplaceOrInsert(toReAdd.items[i])
for i := range toReAdd.toDelete {
delete(s.nextTime, toReAdd.toDelete[i].id)
s.scheduled.Delete(toReAdd.toDelete[i])
}
for i := range toReAdd.toInsert {
s.nextTime[toReAdd.toInsert[i].id] = toReAdd.toInsert[i].ordering
s.scheduled.ReplaceOrInsert(toReAdd.toInsert[i])
}
}
@ -229,8 +232,8 @@ func (s *TreeScheduler) resetTimer(whenFromNow time.Duration) {
s.timer.Reset(whenFromNow)
}
func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *unsent) {
itemsToPlace := &unsent{}
func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *itemList) {
itemsToPlace := &itemList{}
return func(i btree.Item) bool {
if i == nil {
return false
@ -246,20 +249,20 @@ func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *unsent) {
wc := xxhash.Sum64(buf[:]) % uint64(len(s.workchans)) // we just hash so that the number is uniformly distributed
select {
case s.workchans[wc] <- it:
s.scheduled.Delete(it)
itemsToPlace.toDelete = append(itemsToPlace.toDelete, it)
if err := it.updateNext(); err != nil {
// in this error case we can't schedule next, so we have to drop the task
s.onErr(context.Background(), it.id, it.Next(), &ErrUnrecoverable{err})
return true
}
itemsToPlace.append(it)
itemsToPlace.toInsert = append(itemsToPlace.toInsert, it)
case <-s.done:
return false
default:
s.scheduled.Delete(it)
itemsToPlace.toDelete = append(itemsToPlace.toDelete, it)
it.incrementNonce()
itemsToPlace.append(it)
itemsToPlace.toInsert = append(itemsToPlace.toInsert, it)
return true
}
}