feat(tasks): switch the new scheduler to use clock instead of custom time mocker

pull/15407/head
docmerlin (j. Emrys Landivar) 2019-10-07 17:05:08 -05:00 committed by Jorge Landivar
parent 3fd94cbb69
commit 4b732acb3b
6 changed files with 72 additions and 461 deletions

1
go.mod
View File

@ -9,6 +9,7 @@ require (
github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883
github.com/apache/arrow/go/arrow v0.0.0-20190809133625-b98a560fc561
github.com/aws/aws-sdk-go v1.16.15 // indirect
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3
github.com/benbjohnson/tmpl v1.0.0
github.com/boltdb/bolt v1.3.1 // indirect
github.com/bouk/httprouter v0.0.0-20160817010721-ee8b3818a7f5

2
go.sum
View File

@ -49,6 +49,8 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/aws/aws-sdk-go v1.15.64/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM=
github.com/aws/aws-sdk-go v1.16.15 h1:kQyxfRyjAwIYjf0225sn/pn+WAlncKyI8dmT3+ItMFE=
github.com/aws/aws-sdk-go v1.16.15/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3 h1:wOysYcIdqv3WnvwqFFzrYCFALPED7qkUGaLXu359GSc=
github.com/benbjohnson/clock v0.0.0-20161215174838-7dc76406b6d3/go.mod h1:UMqtWQTnOe4byzwe7Zhwh8f8s+36uszN51sJrSIZlTE=
github.com/benbjohnson/tmpl v1.0.0 h1:T5QPGJD0W6JJxyEEAlVnX3co/IkUrfHen1/42nlgAHo=
github.com/benbjohnson/tmpl v1.0.0/go.mod h1:igT620JFIi44B6awvU9IsDhR77IXWtFigTLil/RPdps=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=

View File

@ -6,6 +6,8 @@ import (
"testing"
"time"
"github.com/benbjohnson/clock"
"github.com/influxdata/influxdb/task/backend/scheduler"
)
@ -57,7 +59,7 @@ func (m *mockSchedulableService) UpdateLastScheduled(ctx context.Context, id sch
}
func TestSchedule_Next(t *testing.T) {
t.Run("fires properly", func(t *testing.T) {
t.Run("fires properly with non-mocked time", func(t *testing.T) {
now := time.Now()
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
@ -96,7 +98,8 @@ func TestSchedule_Next(t *testing.T) {
}
})
t.Run("doesn't fire when the task isn't ready", func(t *testing.T) {
now := time.Now()
mockTime := clock.NewMock()
mockTime.Set(time.Now())
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
select {
@ -107,7 +110,6 @@ func TestSchedule_Next(t *testing.T) {
t.Errorf("called the executor too many times")
}
}}
mockTime := scheduler.NewMockTime(now)
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
@ -124,12 +126,14 @@ func TestSchedule_Next(t *testing.T) {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now.Add(time.Second)})
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: mockTime.Now().UTC().Add(time.Second)})
if err != nil {
t.Fatal(err)
}
go func() {
mockTime.Set(mockTime.T.Add(2 * time.Second))
sch.Lock()
mockTime.Set(mockTime.Now().Add(2 * time.Second))
sch.Unlock()
}()
select {
@ -141,7 +145,6 @@ func TestSchedule_Next(t *testing.T) {
})
t.Run("fires the correct number of times for the interval with a single schedulable", func(t *testing.T) {
now := time.Now().UTC()
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
select {
@ -150,7 +153,8 @@ func TestSchedule_Next(t *testing.T) {
case c <- scheduledAt:
}
}}
mockTime := scheduler.NewMockTime(now)
mockTime := clock.NewMock()
mockTime.Set(time.Now())
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
@ -167,12 +171,14 @@ func TestSchedule_Next(t *testing.T) {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now})
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: mockTime.Now().UTC()})
if err != nil {
t.Fatal(err)
}
go func() {
mockTime.Set(mockTime.T.Add(17 * time.Second))
sch.Lock()
mockTime.Set(mockTime.Now().UTC().Add(17 * time.Second))
sch.Unlock()
}()
after := time.After(6 * time.Second)
@ -183,9 +189,10 @@ func TestSchedule_Next(t *testing.T) {
t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i)
}
}
go func() {
mockTime.Set(mockTime.T.Add(2 * time.Second))
sch.Lock()
mockTime.Set(mockTime.Now().UTC().Add(2 * time.Second))
sch.Unlock()
}()
after = time.After(6 * time.Second)
@ -224,7 +231,8 @@ func TestSchedule_Next(t *testing.T) {
}:
}
}}
mockTime := scheduler.NewMockTime(now)
mockTime := clock.NewMock()
mockTime.Set(now)
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
@ -254,9 +262,10 @@ func TestSchedule_Next(t *testing.T) {
if err != nil {
t.Fatal(err)
}
go func() {
mockTime.Set(mockTime.T.Add(17 * time.Second))
sch.Lock()
mockTime.Set(mockTime.Now().Add(17 * time.Second))
sch.Unlock()
}()
after := time.After(6 * time.Second)
@ -269,7 +278,9 @@ func TestSchedule_Next(t *testing.T) {
}
go func() {
mockTime.Set(mockTime.T.Add(2 * time.Second))
sch.Lock()
mockTime.Set(mockTime.Now().Add(2 * time.Second))
sch.Unlock()
}()
after = time.After(6 * time.Second)
@ -292,7 +303,8 @@ func TestSchedule_Next(t *testing.T) {
func TestTreeScheduler_Stop(t *testing.T) {
now := time.Now().Add(-20 * time.Second)
mockTime := scheduler.NewMockTime(now)
mockTime := clock.NewMock()
mockTime.Set(now)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {}}
sch, _, err := scheduler.NewScheduler(exe, &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil

View File

@ -1,237 +0,0 @@
package scheduler
import (
"sync"
"time"
)
// Time is an interface to allow us to mock time.
type Time interface {
Now() time.Time
Unix(seconds, nanoseconds int64) time.Time
NewTimer(d time.Duration) Timer
Until(time.Time) time.Duration
}
type stdTime struct{}
// Now gives us the current time as time.Time would
func (stdTime) Now() time.Time {
return time.Now()
}
// Unix gives us the time given seconds and nanoseconds.
func (stdTime) Unix(sec, nsec int64) time.Time {
return time.Unix(sec, nsec)
}
func (stdTime) Until(t time.Time) time.Duration {
return time.Until(t)
}
// NewTimer gives us a Timer that fires after duration d.
func (stdTime) NewTimer(d time.Duration) Timer {
t := time.NewTimer(d)
return &stdTimer{*t}
}
// Timer is an interface to allow us to mock out timers. It has behavior like time.Timer
type Timer interface {
C() <-chan time.Time
Reset(d time.Duration) bool
Stop() bool
}
// stdTimer is a Timer that wraps time.Time.
type stdTimer struct {
time.Timer
}
// C returns a <-chan time.Time and can be used much like time.Timer.C.
func (t *stdTimer) C() <-chan time.Time {
return t.Timer.C
}
// MockTime is a time that mocks out some methods of time.Time.
// It doesn't advance the time over time, but only changes it with calls to Set.
// Use NewMockTime to create Mocktimes, don't instanciate the struct directly unless you want to mess with the sync Cond.
type MockTime struct {
sync.RWMutex
*sync.Cond
T time.Time
}
// NewMockTime create a mock of time that returns the underlying time.Time.
func NewMockTime(t time.Time) *MockTime {
mt := &MockTime{
T: t,
Cond: sync.NewCond(&sync.Mutex{}),
}
return mt
}
// Now returns the stored time.Time, It is to mock out time.Now().
func (t *MockTime) Now() time.Time {
t.RLock()
defer t.RUnlock()
return t.T
}
// Unix creates a time.Time given seconds and nanoseconds. It just wraps time.Unix.
func (*MockTime) Unix(sec, nsec int64) time.Time {
return time.Unix(sec, nsec)
}
// Util is equivalent to t.T.Sub(ts). We need it to mock out time, because the non-mocked implementation needs to be monotonic.
func (t *MockTime) Until(ts time.Time) time.Duration {
t.RLock()
defer t.RUnlock()
return ts.Sub(t.T)
}
func (t *MockTime) Set(ts time.Time) {
t.Lock()
defer t.Unlock()
t.Cond.L.Lock()
t.T = ts
t.Cond.Broadcast()
t.Cond.L.Unlock()
}
// MockTimer is a struct to mock out Timer.
type MockTimer struct {
T *MockTime
fireTime time.Time
c chan time.Time
stopch chan struct{}
active bool
wg sync.WaitGroup
starting sync.WaitGroup
}
// NewTimer returns a timer that will fire after d time.Duration from the underlying time in the MockTime. It doesn't
// actually fire after a duration, but fires when you Set the MockTime used to create it, to a time greater than or
// equal to the underlying MockTime when it was created plus duration d.
func (t *MockTime) NewTimer(d time.Duration) Timer {
t.Cond.L.Lock()
timer := &MockTimer{
T: t,
fireTime: t.T.Add(d),
stopch: make(chan struct{}, 1),
c: make(chan time.Time, 1),
}
timer.start(d)
t.Cond.L.Unlock()
return timer
}
func (t *MockTimer) C() <-chan time.Time {
return t.c
}
func (t *MockTimer) Reset(d time.Duration) bool {
t.starting.Wait()
t.T.Cond.L.Lock()
// clear the channels
{
select {
case <-t.stopch:
default:
}
select {
case <-t.c:
default:
}
}
defer t.T.Cond.L.Unlock()
t.fireTime = t.T.Now().Add(d)
t.start(d)
t.T.Cond.Broadcast()
return false
}
func (t *MockTimer) Stop() (active bool) {
t.starting.Wait()
t.T.Cond.L.Lock()
defer func() {
t.T.Cond.Broadcast()
t.T.Cond.L.Unlock()
t.wg.Wait()
}()
if !t.active {
select {
case t.c <- t.fireTime:
default:
}
return false
}
select {
case t.stopch <- struct{}{}:
default:
}
if !t.active {
select {
case t.c <- t.fireTime:
default:
}
}
return t.active
}
func (t *MockTimer) start(ts time.Duration) {
if ts <= 0 {
t.c <- t.fireTime
return
}
t.wg.Add(1)
t.starting.Add(1)
go func() {
defer func() {
t.active = false
t.T.Cond.L.Unlock()
t.wg.Done()
}()
for {
t.T.Cond.L.Lock()
if !t.active {
t.active = true // this needs to be after we tale the lock, but before we exit the starting state
t.starting.Done() // this needs to be after we take the lock on start, to ensure this goroutine starts before we stop or reset
}
//check it should already be fired/stopped
if !t.T.T.Before(t.fireTime) {
select {
case t.c <- t.fireTime:
return
case <-t.stopch:
return
default:
}
}
t.T.Cond.Wait()
select {
case <-t.stopch:
return
default:
}
// check it needs to be be fired/stopped
if !t.T.T.Before(t.fireTime) {
select {
case t.c <- t.fireTime:
return
case <-t.stopch:
return
}
}
select {
case <-t.stopch:
return
default:
}
t.T.Cond.L.Unlock()
}
}()
}

View File

@ -1,176 +0,0 @@
package scheduler
import (
"testing"
"time"
)
func TestStdTime_Now(t *testing.T) {
t1 := stdTime{}.Now()
time.Sleep(time.Nanosecond)
t2 := stdTime{}.Now()
if !t1.Before(t2) {
t.Fatal()
}
}
func TestStdTime_Unix(t *testing.T) {
now := time.Now()
t1 := stdTime{}.Unix(now.Unix(), int64(now.Nanosecond()))
if !t1.Equal(now) {
t.Fatal("expected the two times to be equivalent but they were not")
}
}
func TestMockTimer(t *testing.T) {
timeForComparison := time.Now() //time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC)
mt := NewMockTime(timeForComparison)
timer := mt.NewTimer(10 * time.Second)
select {
case <-timer.C():
t.Fatalf("expected timer not to fire till time was up, but did")
default:
}
go mt.Set(timeForComparison.Add(10 * time.Second))
select {
case <-timer.C():
case <-time.After(3 * time.Second):
t.Fatal("expected timer to fire when time was up, but it didn't, it fired after a 3 second timeout")
}
timer.Reset(33 * time.Second)
go mt.Set(timeForComparison.Add(50 * time.Second))
select {
case <-timer.C():
case <-time.After(4 * time.Second):
t.Fatal("expected timer to fire when time was up, but it didn't, it fired after a 4 second timeout")
}
if !timer.Stop() {
<-timer.C()
}
timer.Reset(10000 * time.Second)
select {
case ts := <-timer.C():
t.Errorf("expected timer to NOT fire if time was not up, but it did at ts: %s", ts)
default:
}
timer2 := mt.NewTimer(10000 * time.Second)
select {
case ts := <-timer2.C():
t.Errorf("expected timer to NOT fire if time was not up, but it did at ts: %s", ts)
case <-time.After(4 * time.Second):
}
if !timer2.Stop() {
<-timer2.C()
}
timer2.Reset(0)
select {
case <-time.After(4 * time.Second):
t.Error("expected timer to fire when it was reset to 0, but it didn't")
case <-timer2.C():
}
if !timer2.Stop() {
<-timer2.C()
}
timer2.Reset(-time.Second)
select {
case <-time.After(4 * time.Second):
t.Error("expected timer to fire when it was reset to a negative duration, but it didn't")
case <-timer2.C():
}
if !timer2.Stop() {
<-timer2.C()
}
timer2.Reset(1 * time.Second)
go func() {
mt.Set(mt.T.Add(1 * time.Second))
}()
select {
case <-time.After(4 * time.Second):
t.Error("expected timer to fire after it was reset to a small duration, but it didn't")
case <-timer2.C():
}
timer2.Reset(1 * time.Second)
go func() {
mt.Set(mt.T.Add(time.Second / 2))
}()
select {
case <-time.After(time.Second):
case <-timer2.C():
t.Error("expected timer to not fire after it was reset to a too small duration, but it did")
}
}
func TestMockTimer_Stop(t *testing.T) {
timeForComparison := time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC)
mt := NewMockTime(timeForComparison)
timer := mt.NewTimer(10 * time.Second)
if !timer.Stop() {
t.Fatal("expected MockTimer.Stop() to be true if it hadn't fired yet")
}
if !timer.Stop() {
select {
case <-timer.C():
case <-time.After(2 * time.Second):
t.Fatal("timer didn't fire to clear when it should have")
}
} else {
t.Fatalf("Expected MockTimer.Stop() to be false when it was already stopped but it wasn't")
}
timer.Reset(time.Second)
go mt.Set(timeForComparison.Add(20 * time.Second))
select {
case <-timer.C():
case <-time.After(2 * time.Second):
t.Fatal("timer didn't fire when it should have")
}
if !timer.Stop() {
select {
case <-timer.C():
case <-time.After(2 * time.Second):
t.Fatal("timer didn't fire to clear when it should have")
}
} else {
t.Fatalf("Expected MockTimer.Stop() to be false when it was already fired but it wasn't")
}
}
func TestMockTime_Until(t *testing.T) {
tests := []struct {
name string
mocktime time.Time
ts time.Time
want time.Duration
}{{
name: "happy",
mocktime: time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC),
ts: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
want: 2*time.Minute + 50*time.Second + 4*time.Nanosecond,
}, {
name: "negative",
mocktime: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
ts: time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC),
want: -(2*time.Minute + 50*time.Second + 4*time.Nanosecond),
}, {
name: "zero",
mocktime: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
ts: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
want: 0,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tm := NewMockTime(tt.mocktime)
if got := tm.Until(tt.ts); got != tt.want {
t.Errorf("MockTime.Until() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -8,6 +8,7 @@ import (
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/cespare/xxhash"
"github.com/google/btree"
)
@ -67,8 +68,8 @@ type TreeScheduler struct {
when time.Time
executor Executor
onErr ErrorFunc
time Time
timer Timer
time clock.Clock
timer *clock.Timer
done chan struct{}
workchans []chan Item
wg sync.WaitGroup
@ -97,7 +98,7 @@ func WithMaxConcurrentWorkers(n int) treeSchedulerOptFunc {
}
}
func WithTime(t Time) treeSchedulerOptFunc {
func WithTime(t clock.Clock) treeSchedulerOptFunc {
return func(sch *TreeScheduler) error {
sch.time = t
return nil
@ -111,7 +112,7 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr
scheduled: btree.New(degreeBtreeScheduled),
nextTime: map[ID]ordering{},
onErr: func(_ context.Context, _ ID, _ time.Time, _ error) {},
time: stdTime{},
time: clock.New(),
done: make(chan struct{}, 1),
checkpointer: checkpointer,
}
@ -135,7 +136,7 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr
s.sm = NewSchedulerMetrics(s)
s.when = s.time.Now().Add(maxWaitTime)
s.timer = s.time.NewTimer(maxWaitTime)
s.timer = s.time.Timer(maxWaitTime)
if executor == nil {
return nil, nil, errors.New("Executor must be a non-nil function")
}
@ -154,33 +155,41 @@ func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...tr
}
s.Unlock()
return
case <-s.timer.C():
s.Lock()
min := s.scheduled.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue
s.when = s.time.Now().Add(maxWaitTime)
s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired.
case <-s.timer.C:
fired:
for {
s.Lock()
min := s.scheduled.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue
s.when = s.time.Now().Add(maxWaitTime)
s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired.
s.Unlock()
continue schedulerLoop
}
it := min.(Item)
if it.when > s.when.UTC().Unix() {
s.Unlock()
continue schedulerLoop
}
s.process()
min = s.scheduled.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue after processing
s.when = s.time.Now().Add(maxWaitTime)
s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired.
s.Unlock()
continue schedulerLoop
}
it = min.(Item)
s.when = time.Unix(it.when, 0)
until := s.when.Sub(s.time.Now())
if until > 0 {
s.timer.Reset(until) // we can reset without stop, because its fired.
s.Unlock()
break fired
}
s.Unlock()
continue schedulerLoop
}
it := min.(Item)
if it.when > s.when.UTC().Unix() {
s.Unlock()
continue schedulerLoop
}
s.process()
min = s.scheduled.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue after processing
s.when = s.time.Now().Add(maxWaitTime)
s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired.
s.Unlock()
continue schedulerLoop
}
it = min.(Item)
s.when = time.Unix(it.when, 0)
until := s.time.Until(s.when)
s.timer.Reset(until) // we can reset without stop, because its fired.
s.Unlock()
}
}
}()
@ -332,11 +341,11 @@ func (s *TreeScheduler) Schedule(sch Schedulable) error {
if s.when.After(nt) {
s.when = nt
s.timer.Stop()
until := s.time.Until(s.when)
until := s.when.Sub(s.time.Now())
if until <= 0 {
s.timer.Reset(0)
} else {
s.timer.Reset(s.time.Until(s.when))
s.timer.Reset(s.when.Sub(s.time.Now()))
}
}
nextTime, ok := s.nextTime[it.id]