feat(task): new scheduler now with more tests

pull/15407/head
j. Emrys Landivar (docmerlin) 2019-08-12 22:19:39 -05:00 committed by Jorge Landivar
parent 84bc9a8293
commit 3fd94cbb69
8 changed files with 1093 additions and 221 deletions

1
go.mod
View File

@ -36,6 +36,7 @@ require (
github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect
github.com/hashicorp/raft v1.0.0 // indirect
github.com/hashicorp/vault/api v1.0.2
github.com/influxdata/cron v0.0.0-20190812233253-38faece03642
github.com/influxdata/flux v0.49.0
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368

2
go.sum
View File

@ -233,6 +233,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/changelog v1.0.0 h1:RstJD6H48zLQj0GdE6E6k/6RPwtUjkyzIe/T1E/xuWU=
github.com/influxdata/changelog v1.0.0/go.mod h1:uzpGWE/qehT8L426YuXwpMQub+a63vIINhIeEI9mnSM=
github.com/influxdata/cron v0.0.0-20190812233253-38faece03642 h1:ae+mZOcsOpcD0GyaVpqAzR/2t2tffQ2cWArPGohs3A8=
github.com/influxdata/cron v0.0.0-20190812233253-38faece03642/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og=
github.com/influxdata/flux v0.49.0 h1:uRIUWqPNhAfy3RfTdidnHFjFRL8q5fHZoUumz3qW1Wo=
github.com/influxdata/flux v0.49.0/go.mod h1:jnRutnpW4auRnMYcZQdRhhUKI2xrDAf4X1qjlfSdN6c=
github.com/influxdata/goreleaser v0.97.0-influx h1:jT5OrcW7WfS0e2QxfwmTBjhLvpIC9CDLRhNgZJyhj8s=

View File

@ -3,13 +3,16 @@ package scheduler
import (
"context"
"time"
"github.com/influxdata/cron"
)
// ID duplicates the influxdb ID so users of the scheduler don't have to
// import influxdb for the id.
// TODO(lh): maybe make this its own thing sometime in the future.
// import influxdb for the ID.
type ID uint64
var maxID = ID(int(^uint(0) >> 1))
// Executor is a system used by the scheduler to actually execute the scheduleable item.
type Executor interface {
// Execute is used to execute run's for any schedulable object.
@ -21,7 +24,8 @@ type Executor interface {
Execute(ctx context.Context, id ID, scheduledAt time.Time) error
}
// Schedulable is the interface that encapsulates the state that is required to schedule a job.
// Schedulable is the interface that encapsulates work that
// is to be executed on a specified schedule.
type Schedulable interface {
// ID is the unique identifier for this Schedulable
ID() ID
@ -48,7 +52,34 @@ type SchedulableService interface {
UpdateLastScheduled(ctx context.Context, id ID, t time.Time) error
}
type Schedule struct {
func NewSchedule(c string) (Schedule, error) {
sch, err := cron.ParseUTC(c)
return Schedule(sch), err
}
type Schedule cron.Parsed
func (s Schedule) Next(from time.Time) (time.Time, error) {
return cron.Parsed(s).Next(from)
}
// NewErrRetry returns an ErrRetry, it accepts a duration and an error.
func NewErrRetry(d time.Duration, err error) *ErrRetry {
return &ErrRetry{d: d, err: err}
}
// ErrRetry is an error that the Executor must send if it wants the scheduler to retry the task later.
// It also fulfils the stdlib's Unwraper interface.
type ErrRetry struct {
d time.Duration
err error
}
func (e *ErrRetry) Error() string {
if e.err != nil {
return "error" + e.err.Error() + "we need to retry in " + e.d.String()
}
return "error we need to retry in " + e.d.String()
}
// Scheduler is a example interface of a Scheduler.
@ -61,3 +92,18 @@ type Scheduler interface {
// Release removes the specified task from the scheduler.
Release(taskID ID) error
}
func (e *ErrRetry) Unwrap() error {
return e.err
}
type ErrUnrecoverable struct {
error
}
func (e *ErrUnrecoverable) Error() string {
if e.error != nil {
return e.error.Error()
}
return "Error unrecoverable error on task run"
}

View File

@ -1,9 +1,7 @@
package scheduler
import (
"time"
"github.com/influxdata/influxdb/task/backend"
//"github.com/influxdata/influxdb/task/backend"
"github.com/prometheus/client_golang/prometheus"
)
@ -58,7 +56,7 @@ func NewSchedulerMetrics(te *TreeScheduler) *SchedulerMetrics {
Namespace: namespace,
Subsystem: subsystem,
Name: "schedule_delay",
Help: "The duration between when a item should be scheduled and when it is told to execute.",
Help: "The duration between when a Item should be scheduled and when it is told to execute.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
@ -92,18 +90,6 @@ func (em *SchedulerMetrics) release(taskID ID) {
em.releaseCalls.Inc()
}
func (em *SchedulerMetrics) startExecution(taskID ID, scheduleDelay time.Duration) {
em.totalExecuteCalls.Inc()
em.scheduleDelay.Observe(scheduleDelay.Seconds())
}
func (em *SchedulerMetrics) finishExecution(taskID ID, failure bool, status backend.RunStatus, executeDelta time.Duration) {
em.executeDelta.Observe(executeDelta.Seconds())
if failure {
em.totalExecuteFailure.Inc()
}
}
func newExecutingTasks(ts *TreeScheduler) *executingTasks {
return &executingTasks{
desc: prometheus.NewDesc(
@ -123,5 +109,6 @@ func (r *executingTasks) Describe(ch chan<- *prometheus.Desc) {
// Collect returns the current state of all metrics of the run collector.
func (r *executingTasks) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(r.desc, prometheus.GaugeValue, float64(len(r.ts.sema)))
// TODO(docmerlin): fix this metric
ch <- prometheus.MustNewConstMetric(r.desc, prometheus.GaugeValue, float64(len(r.ts.workchans)))
}

View File

@ -6,27 +6,348 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/scheduler"
)
type mockExecutor struct {
sync.Mutex
fn func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time)
Err error
}
func (e *mockExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) (backend.RunPromise, error) {
type mockSchedulable struct {
id scheduler.ID
schedule scheduler.Schedule
offset time.Duration
lastScheduled time.Time
}
func (s mockSchedulable) ID() scheduler.ID {
return s.id
}
func (s mockSchedulable) Schedule() scheduler.Schedule {
return s.schedule
}
func (s mockSchedulable) Offset() time.Duration {
return s.offset
}
func (s mockSchedulable) LastScheduled() time.Time {
return s.lastScheduled
}
func (e *mockExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) error {
done := make(chan struct{}, 1)
select {
case <-ctx.Done():
default:
e.fn(&sync.Mutex{}, ctx, id, scheduledAt)
done <- struct{}{}
}
return nil
}
type mockSchedulableService struct {
fn func(ctx context.Context, id scheduler.ID, t time.Time) error
}
func (m *mockSchedulableService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil
}
func TestSchedule_Next(t *testing.T) {
t.Run("fires properly", func(t *testing.T) {
now := time.Now()
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- scheduledAt:
default:
t.Errorf("called the executor too many times")
}
}}
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil
}},
scheduler.WithMaxConcurrentWorkers(2))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
schedule, err := scheduler.NewSchedule("* * * * * * *")
if err != nil {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now.Add(-20 * time.Second)})
if err != nil {
t.Fatal(err)
}
select {
case <-c:
case <-time.After(10 * time.Second):
t.Fatal("test timed out", sch.Now().Unix(), sch.When().Unix())
}
})
t.Run("doesn't fire when the task isn't ready", func(t *testing.T) {
now := time.Now()
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- scheduledAt:
default:
t.Errorf("called the executor too many times")
}
}}
mockTime := scheduler.NewMockTime(now)
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil
}},
scheduler.WithTime(mockTime),
scheduler.WithMaxConcurrentWorkers(2))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
schedule, err := scheduler.NewSchedule("* * * * * * *")
if err != nil {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now.Add(time.Second)})
if err != nil {
t.Fatal(err)
}
go func() {
mockTime.Set(mockTime.T.Add(2 * time.Second))
}()
select {
case <-c:
t.Fatal("test timed out", sch.Now().Unix(), sch.When().Unix())
case <-time.After(2 * time.Second):
}
})
t.Run("fires the correct number of times for the interval with a single schedulable", func(t *testing.T) {
now := time.Now().UTC()
c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- scheduledAt:
}
}}
mockTime := scheduler.NewMockTime(now)
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil
}},
scheduler.WithTime(mockTime),
scheduler.WithMaxConcurrentWorkers(20))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
schedule, err := scheduler.NewSchedule("* * * * * * *")
if err != nil {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now})
if err != nil {
t.Fatal(err)
}
go func() {
mockTime.Set(mockTime.T.Add(17 * time.Second))
}()
after := time.After(6 * time.Second)
for i := 0; i < 16; i++ {
select {
case <-c:
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i)
}
}
go func() {
mockTime.Set(mockTime.T.Add(2 * time.Second))
}()
after = time.After(6 * time.Second)
for i := 0; i < 2; i++ {
select {
case <-c:
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 2 times", i)
}
}
select {
case <-c:
t.Fatalf("test scheduler fired too many times")
case <-time.After(2 * time.Second):
}
})
t.Run("fires the correct number of times for the interval with multiple schedulables", func(t *testing.T) {
now := time.Date(2016, 0, 0, 0, 1, 1, 0, time.UTC)
c := make(chan struct {
ts time.Time
id scheduler.ID
}, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
select {
case <-ctx.Done():
t.Log("ctx done")
case c <- struct {
ts time.Time
id scheduler.ID
}{
ts: scheduledAt,
id: id,
}:
}
}}
mockTime := scheduler.NewMockTime(now)
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil
}},
scheduler.WithTime(mockTime),
scheduler.WithMaxConcurrentWorkers(20))
if err != nil {
t.Fatal(err)
}
defer sch.Stop()
schedule, err := scheduler.NewSchedule("* * * * * * *")
if err != nil {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now})
if err != nil {
t.Fatal(err)
}
schedule2, err := scheduler.NewSchedule("*/2 * * * * * *")
if err != nil {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 2, schedule: schedule2, offset: time.Second, lastScheduled: now})
if err != nil {
t.Fatal(err)
}
go func() {
mockTime.Set(mockTime.T.Add(17 * time.Second))
}()
after := time.After(6 * time.Second)
for i := 0; i < 24; i++ {
select {
case <-c:
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 24 times", i)
}
}
go func() {
mockTime.Set(mockTime.T.Add(2 * time.Second))
}()
after = time.After(6 * time.Second)
for i := 0; i < 3; i++ {
select {
case <-c:
case <-after:
t.Fatalf("test timed out, only fired %d times but should have fired 3 times", i)
}
}
select {
case <-c:
t.Fatalf("test scheduler fired too many times")
case <-time.After(2 * time.Second):
}
})
}
func TestTreeScheduler_Stop(t *testing.T) {
now := time.Now().Add(-20 * time.Second)
exe := mockExecutor{}
sch, err := scheduler.NewScheduler(exe.Execute)
mockTime := scheduler.NewMockTime(now)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {}}
sch, _, err := scheduler.NewScheduler(exe, &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil
}},
scheduler.WithTime(mockTime))
if err != nil {
t.Fatal(err)
}
sch.Schedule(1, "* * * * * * *", 10*time.Second, now.Add(20*time.Second))
sch.Stop()
}
func TestSchedule_panic(t *testing.T) {
// panics in the executor should be treated as errors
now := time.Now().UTC()
c := make(chan struct {
ts time.Time
err error
}, 1)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {
panic("yikes oh no!")
}}
sch, _, err := scheduler.NewScheduler(
exe,
&mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error {
return nil
}},
scheduler.WithMaxConcurrentWorkers(1), // to make debugging easier
scheduler.WithOnErrorFn(func(_ context.Context, _ scheduler.ID, ts time.Time, err error) {
c <- struct {
ts time.Time
err error
}{
ts: ts,
err: err,
}
}))
if err != nil {
t.Fatal(err)
}
schedule, err := scheduler.NewSchedule("* * * * * * *")
if err != nil {
t.Fatal(err)
}
err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now.Add(-20 * time.Second)})
if err != nil {
t.Fatal(err)
}
select {
case <-c: // panic was caught and error handler used
case <-time.After(10 * time.Second):
t.Fatal("test timed out", now.UTC().Unix(), sch.Now().Unix(), sch.When().Unix())
}
}

View File

@ -0,0 +1,237 @@
package scheduler
import (
"sync"
"time"
)
// Time is an interface to allow us to mock time.
type Time interface {
Now() time.Time
Unix(seconds, nanoseconds int64) time.Time
NewTimer(d time.Duration) Timer
Until(time.Time) time.Duration
}
type stdTime struct{}
// Now gives us the current time as time.Time would
func (stdTime) Now() time.Time {
return time.Now()
}
// Unix gives us the time given seconds and nanoseconds.
func (stdTime) Unix(sec, nsec int64) time.Time {
return time.Unix(sec, nsec)
}
func (stdTime) Until(t time.Time) time.Duration {
return time.Until(t)
}
// NewTimer gives us a Timer that fires after duration d.
func (stdTime) NewTimer(d time.Duration) Timer {
t := time.NewTimer(d)
return &stdTimer{*t}
}
// Timer is an interface to allow us to mock out timers. It has behavior like time.Timer
type Timer interface {
C() <-chan time.Time
Reset(d time.Duration) bool
Stop() bool
}
// stdTimer is a Timer that wraps time.Time.
type stdTimer struct {
time.Timer
}
// C returns a <-chan time.Time and can be used much like time.Timer.C.
func (t *stdTimer) C() <-chan time.Time {
return t.Timer.C
}
// MockTime is a time that mocks out some methods of time.Time.
// It doesn't advance the time over time, but only changes it with calls to Set.
// Use NewMockTime to create Mocktimes, don't instanciate the struct directly unless you want to mess with the sync Cond.
type MockTime struct {
sync.RWMutex
*sync.Cond
T time.Time
}
// NewMockTime create a mock of time that returns the underlying time.Time.
func NewMockTime(t time.Time) *MockTime {
mt := &MockTime{
T: t,
Cond: sync.NewCond(&sync.Mutex{}),
}
return mt
}
// Now returns the stored time.Time, It is to mock out time.Now().
func (t *MockTime) Now() time.Time {
t.RLock()
defer t.RUnlock()
return t.T
}
// Unix creates a time.Time given seconds and nanoseconds. It just wraps time.Unix.
func (*MockTime) Unix(sec, nsec int64) time.Time {
return time.Unix(sec, nsec)
}
// Util is equivalent to t.T.Sub(ts). We need it to mock out time, because the non-mocked implementation needs to be monotonic.
func (t *MockTime) Until(ts time.Time) time.Duration {
t.RLock()
defer t.RUnlock()
return ts.Sub(t.T)
}
func (t *MockTime) Set(ts time.Time) {
t.Lock()
defer t.Unlock()
t.Cond.L.Lock()
t.T = ts
t.Cond.Broadcast()
t.Cond.L.Unlock()
}
// MockTimer is a struct to mock out Timer.
type MockTimer struct {
T *MockTime
fireTime time.Time
c chan time.Time
stopch chan struct{}
active bool
wg sync.WaitGroup
starting sync.WaitGroup
}
// NewTimer returns a timer that will fire after d time.Duration from the underlying time in the MockTime. It doesn't
// actually fire after a duration, but fires when you Set the MockTime used to create it, to a time greater than or
// equal to the underlying MockTime when it was created plus duration d.
func (t *MockTime) NewTimer(d time.Duration) Timer {
t.Cond.L.Lock()
timer := &MockTimer{
T: t,
fireTime: t.T.Add(d),
stopch: make(chan struct{}, 1),
c: make(chan time.Time, 1),
}
timer.start(d)
t.Cond.L.Unlock()
return timer
}
func (t *MockTimer) C() <-chan time.Time {
return t.c
}
func (t *MockTimer) Reset(d time.Duration) bool {
t.starting.Wait()
t.T.Cond.L.Lock()
// clear the channels
{
select {
case <-t.stopch:
default:
}
select {
case <-t.c:
default:
}
}
defer t.T.Cond.L.Unlock()
t.fireTime = t.T.Now().Add(d)
t.start(d)
t.T.Cond.Broadcast()
return false
}
func (t *MockTimer) Stop() (active bool) {
t.starting.Wait()
t.T.Cond.L.Lock()
defer func() {
t.T.Cond.Broadcast()
t.T.Cond.L.Unlock()
t.wg.Wait()
}()
if !t.active {
select {
case t.c <- t.fireTime:
default:
}
return false
}
select {
case t.stopch <- struct{}{}:
default:
}
if !t.active {
select {
case t.c <- t.fireTime:
default:
}
}
return t.active
}
func (t *MockTimer) start(ts time.Duration) {
if ts <= 0 {
t.c <- t.fireTime
return
}
t.wg.Add(1)
t.starting.Add(1)
go func() {
defer func() {
t.active = false
t.T.Cond.L.Unlock()
t.wg.Done()
}()
for {
t.T.Cond.L.Lock()
if !t.active {
t.active = true // this needs to be after we tale the lock, but before we exit the starting state
t.starting.Done() // this needs to be after we take the lock on start, to ensure this goroutine starts before we stop or reset
}
//check it should already be fired/stopped
if !t.T.T.Before(t.fireTime) {
select {
case t.c <- t.fireTime:
return
case <-t.stopch:
return
default:
}
}
t.T.Cond.Wait()
select {
case <-t.stopch:
return
default:
}
// check it needs to be be fired/stopped
if !t.T.T.Before(t.fireTime) {
select {
case t.c <- t.fireTime:
return
case <-t.stopch:
return
}
}
select {
case <-t.stopch:
return
default:
}
t.T.Cond.L.Unlock()
}
}()
}

View File

@ -0,0 +1,176 @@
package scheduler
import (
"testing"
"time"
)
func TestStdTime_Now(t *testing.T) {
t1 := stdTime{}.Now()
time.Sleep(time.Nanosecond)
t2 := stdTime{}.Now()
if !t1.Before(t2) {
t.Fatal()
}
}
func TestStdTime_Unix(t *testing.T) {
now := time.Now()
t1 := stdTime{}.Unix(now.Unix(), int64(now.Nanosecond()))
if !t1.Equal(now) {
t.Fatal("expected the two times to be equivalent but they were not")
}
}
func TestMockTimer(t *testing.T) {
timeForComparison := time.Now() //time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC)
mt := NewMockTime(timeForComparison)
timer := mt.NewTimer(10 * time.Second)
select {
case <-timer.C():
t.Fatalf("expected timer not to fire till time was up, but did")
default:
}
go mt.Set(timeForComparison.Add(10 * time.Second))
select {
case <-timer.C():
case <-time.After(3 * time.Second):
t.Fatal("expected timer to fire when time was up, but it didn't, it fired after a 3 second timeout")
}
timer.Reset(33 * time.Second)
go mt.Set(timeForComparison.Add(50 * time.Second))
select {
case <-timer.C():
case <-time.After(4 * time.Second):
t.Fatal("expected timer to fire when time was up, but it didn't, it fired after a 4 second timeout")
}
if !timer.Stop() {
<-timer.C()
}
timer.Reset(10000 * time.Second)
select {
case ts := <-timer.C():
t.Errorf("expected timer to NOT fire if time was not up, but it did at ts: %s", ts)
default:
}
timer2 := mt.NewTimer(10000 * time.Second)
select {
case ts := <-timer2.C():
t.Errorf("expected timer to NOT fire if time was not up, but it did at ts: %s", ts)
case <-time.After(4 * time.Second):
}
if !timer2.Stop() {
<-timer2.C()
}
timer2.Reset(0)
select {
case <-time.After(4 * time.Second):
t.Error("expected timer to fire when it was reset to 0, but it didn't")
case <-timer2.C():
}
if !timer2.Stop() {
<-timer2.C()
}
timer2.Reset(-time.Second)
select {
case <-time.After(4 * time.Second):
t.Error("expected timer to fire when it was reset to a negative duration, but it didn't")
case <-timer2.C():
}
if !timer2.Stop() {
<-timer2.C()
}
timer2.Reset(1 * time.Second)
go func() {
mt.Set(mt.T.Add(1 * time.Second))
}()
select {
case <-time.After(4 * time.Second):
t.Error("expected timer to fire after it was reset to a small duration, but it didn't")
case <-timer2.C():
}
timer2.Reset(1 * time.Second)
go func() {
mt.Set(mt.T.Add(time.Second / 2))
}()
select {
case <-time.After(time.Second):
case <-timer2.C():
t.Error("expected timer to not fire after it was reset to a too small duration, but it did")
}
}
func TestMockTimer_Stop(t *testing.T) {
timeForComparison := time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC)
mt := NewMockTime(timeForComparison)
timer := mt.NewTimer(10 * time.Second)
if !timer.Stop() {
t.Fatal("expected MockTimer.Stop() to be true if it hadn't fired yet")
}
if !timer.Stop() {
select {
case <-timer.C():
case <-time.After(2 * time.Second):
t.Fatal("timer didn't fire to clear when it should have")
}
} else {
t.Fatalf("Expected MockTimer.Stop() to be false when it was already stopped but it wasn't")
}
timer.Reset(time.Second)
go mt.Set(timeForComparison.Add(20 * time.Second))
select {
case <-timer.C():
case <-time.After(2 * time.Second):
t.Fatal("timer didn't fire when it should have")
}
if !timer.Stop() {
select {
case <-timer.C():
case <-time.After(2 * time.Second):
t.Fatal("timer didn't fire to clear when it should have")
}
} else {
t.Fatalf("Expected MockTimer.Stop() to be false when it was already fired but it wasn't")
}
}
func TestMockTime_Until(t *testing.T) {
tests := []struct {
name string
mocktime time.Time
ts time.Time
want time.Duration
}{{
name: "happy",
mocktime: time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC),
ts: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
want: 2*time.Minute + 50*time.Second + 4*time.Nanosecond,
}, {
name: "negative",
mocktime: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
ts: time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC),
want: -(2*time.Minute + 50*time.Second + 4*time.Nanosecond),
}, {
name: "zero",
mocktime: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
ts: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC),
want: 0,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tm := NewMockTime(tt.mocktime)
if got := tm.Until(tt.ts); got != tt.want {
t.Errorf("MockTime.Until() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -2,212 +2,258 @@ package scheduler
import (
"context"
"encoding/binary"
"errors"
"math"
"sync"
"time"
"github.com/cespare/xxhash"
"github.com/google/btree"
"github.com/influxdata/cron"
"github.com/influxdata/influxdb/task/backend"
)
const cancelTimeOut = 30 * time.Second
const (
maxWaitTime = time.Hour
degreeBtreeScheduled = 3
defaultMaxWorkers = 32
)
const defaultMaxRunsOutstanding = 1 << 16
type runningItem struct {
cancel func()
runID ID
taskID ID
}
func (it runningItem) Less(bItem btree.Item) bool {
it2 := bItem.(runningItem)
return it.taskID < it2.taskID || (it.taskID == it2.taskID && it.runID < it2.runID)
}
// TreeScheduler is a Scheduler based on a btree
// TreeScheduler is a Scheduler based on a btree.
// It calls Executor in-order per ID. That means you are guaranteed that for a specific ID,
//
// If a call to an Executorfunc returns an *ErrRetry then all calls to Executor of the entire task will be delayed
// temporarily by the amount specified in *ErrRetry, but future calls to Executor for that task will proceed normally.
//
// - The scheduler should, after creation, automatically call ExecutorFunc, when a task should run as defined by its Schedulable.
//
// - the scheduler's should not be able to get into a state where blocks Release and Schedule indefinitely.
//
// - Schedule should add a Schedulable to being scheduled, and Release should remove a task from being scheduled.
//
// - Calling of ExecutorFunc should be serial in time on a per taskID basis. I.E.: the run at 12:00 will go before the run at 12:01.
//
// Design:
//
// The core of the scheduler is a btree keyed by time, a nonce, and a task ID, and a map keyed by task ID and containing a
// nonce and a time (called a uniqueness index from now on).
// The map is to ensure task uniqueness in the tree, so we can replace or delete tasks in the tree.
//
// Scheduling in the tree consists of a main loop that feeds a fixed set of workers, each with their own communication channel.
// Distribution is handled by hashing the TaskID (to ensure uniform distribution) and then distributing over those channels
// evenly based on the hashed ID. This is to ensure that all tasks of the same ID go to the same worker.
//
//The workers call ExecutorFunc handle any errors and update the LastScheduled time internally and also via the Checkpointer.
//
// The main loop:
//
// The main loop waits on a time.Timer to grab the task with the minimum time. Once it successfully grabs a task ready
// to trigger, it will start walking the btree from the item nearest
//
// Putting a task into the scheduler:
//
// Adding a task to the scheduler acquires a write lock, grabs the task from the uniqueness map, and replaces the item
// in the uniqueness index and btree. If new task would trigger sooner than the current soonest triggering task, it
// replaces the Timer when added to the scheduler. Finally it releases the write lock.
//
// Removing a task from the scheduler:
//
// Removing a task from the scheduler acquires a write lock, deletes the task from the uniqueness index and from the
// btree, then releases the lock. We do not have to readjust the time on delete, because, if the minimum task isn't
// ready yet, the main loop just resets the timer and keeps going.
type TreeScheduler struct {
sync.RWMutex
scheduled btree.BTree
running btree.BTree
nextTime map[ID]int64 // we need this index so we can delete items from the scheduled
when time.Time
executor func(ctx context.Context, id ID, scheduledAt time.Time) (backend.RunPromise, error)
onErr func(ctx context.Context, id ID, scheduledAt time.Time, err error) bool
timer *time.Timer
done chan struct{}
sema chan struct{}
wg sync.WaitGroup
scheduled *btree.BTree
nextTime map[ID]ordering // we need this index so we can delete items from the scheduled
when time.Time
executor Executor
onErr ErrorFunc
time Time
timer Timer
done chan struct{}
workchans []chan Item
wg sync.WaitGroup
checkpointer SchedulableService
sm *SchedulerMetrics
}
// clearTask is a method for deleting a range of tasks.
// TODO(docmerlin): add an actual ranged delete to github.com/google/btree
func (s *TreeScheduler) clearTask(taskID ID) btree.ItemIterator {
return func(i btree.Item) bool {
del := i.(runningItem).taskID == taskID
if !del {
return false
}
s.running.Delete(runningItem{taskID: taskID})
return true
}
}
type ExecutorFunc func(ctx context.Context, id ID, scheduledAt time.Time) error
// clearTask is a method for deleting a range of tasks.
func (s *TreeScheduler) runs(taskID ID, limit int) btree.ItemIterator {
acc := make([]ID, 0, limit)
return func(i btree.Item) bool {
match := i.(runningItem).taskID == taskID
if !match {
return false
}
return true
}
}
const maxWaitTime = 1000000 * time.Hour
type ExecutorFunc func(ctx context.Context, id ID, scheduledAt time.Time) (backend.RunPromise, error)
type ErrorFunc func(ctx context.Context, id ID, scheduledAt time.Time, err error) bool
type ErrorFunc func(ctx context.Context, taskID ID, scheduledAt time.Time, err error)
type treeSchedulerOptFunc func(t *TreeScheduler) error
func WithOnErrorFn(fn func(ctx context.Context, id ID, scheduledAt time.Time, err error) bool) treeSchedulerOptFunc {
func WithOnErrorFn(fn ErrorFunc) treeSchedulerOptFunc {
return func(t *TreeScheduler) error {
t.onErr = fn
return nil
}
}
func WithMaxRunsOutsanding(n int) treeSchedulerOptFunc {
func WithMaxConcurrentWorkers(n int) treeSchedulerOptFunc {
return func(t *TreeScheduler) error {
t.sema = make(chan struct{}, n)
t.workchans = make([]chan Item, n)
return nil
}
}
func WithTime(t Time) treeSchedulerOptFunc {
return func(sch *TreeScheduler) error {
sch.time = t
return nil
}
}
// Executor is any function that accepts an ID, a time, and a duration.
// OnErr is a function that takes am error, it is called when we cannot find a viable time before jan 1, 2100. The default behavior is to drop the task on error.
func NewScheduler(Executor ExecutorFunc, opts ...treeSchedulerOptFunc) (*TreeScheduler, *SchedulerMetrics, error) {
func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...treeSchedulerOptFunc) (*TreeScheduler, *SchedulerMetrics, error) {
s := &TreeScheduler{
executor: Executor,
onErr: func(_ context.Context, _ ID, _ time.Time, _ error) bool { return true },
sema: make(chan struct{}, defaultMaxRunsOutstanding),
executor: executor,
scheduled: btree.New(degreeBtreeScheduled),
nextTime: map[ID]ordering{},
onErr: func(_ context.Context, _ ID, _ time.Time, _ error) {},
time: stdTime{},
done: make(chan struct{}, 1),
checkpointer: checkpointer,
}
// apply options
for i := range opts {
if err := opts[i](s); err != nil {
return nil, err
return nil, nil, err
}
}
if s.workchans == nil {
s.workchans = make([]chan Item, defaultMaxWorkers)
}
s.wg.Add(len(s.workchans))
for i := 0; i < len(s.workchans); i++ {
s.workchans[i] = make(chan Item)
go s.work(i)
}
s.sm = NewSchedulerMetrics(s)
s.when = time.Now().Add(maxWaitTime)
s.timer = time.NewTimer(time.Until(s.when)) //time.Until(s.when))
if Executor == nil {
return nil, errors.New("Executor must be a non-nil function")
s.when = s.time.Now().Add(maxWaitTime)
s.timer = s.time.NewTimer(maxWaitTime)
if executor == nil {
return nil, nil, errors.New("Executor must be a non-nil function")
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
schedulerLoop:
for {
select {
case <-s.done:
s.Lock()
s.timer.Stop()
// close workchans
for i := range s.workchans {
close(s.workchans[i])
}
s.Unlock()
close(s.sema)
return
case <-s.timer.C:
iti := s.scheduled.DeleteMin()
if iti == nil {
s.Lock()
s.timer.Reset(maxWaitTime)
case <-s.timer.C():
s.Lock()
min := s.scheduled.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue
s.when = s.time.Now().Add(maxWaitTime)
s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired.
s.Unlock()
continue
continue schedulerLoop
}
if iti == nil {
s.Lock()
s.timer.Reset(maxWaitTime)
it := min.(Item)
if it.when > s.when.UTC().Unix() {
s.Unlock()
continue
continue schedulerLoop
}
it := iti.(item)
s.sm.startExecution(it.id, time.Since(time.Unix(it.next, 0)))
if prom, err := s.executor(context.Background(), it.id, time.Unix(it.next, 0)); err == nil {
t, err := it.cron.Next(time.Unix(it.next, 0))
it.next = t.Unix()
// we need to return the item to the scheduled before calling s.onErr
if err != nil {
it.nonce++
s.onErr(context.TODO(), it.id, time.Unix(it.next, 0), err)
}
s.scheduled.ReplaceOrInsert(it)
if prom == nil {
break
}
run := prom.Run()
s.Lock()
s.running.ReplaceOrInsert(runningItem{cancel: prom.Cancel, runID: ID(run.RunID), taskID: ID(run.TaskID)})
s.process()
min = s.scheduled.Min()
if min == nil { // grab a new item, because there could be a different item at the top of the queue after processing
s.when = s.time.Now().Add(maxWaitTime)
s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired.
s.Unlock()
s.wg.Add(1)
s.sema <- struct{}{}
go func() {
defer func() {
s.wg.Done()
<-s.sema
}()
res, err := prom.Wait()
if err != nil {
s.onErr(context.TODO(), it.id, time.Unix(it.next, 0), err)
}
// belt and suspenders
if res == nil {
return
}
run := prom.Run()
s.Lock()
s.running.Delete(runningItem{cancel: prom.Cancel, runID: ID(run.RunID), taskID: ID(run.TaskID)})
s.Unlock()
s.sm.finishExecution(it.id, res.Err() == nil, time.Since(time.Unix(it.next, 0)))
if err = res.Err(); err != nil {
s.onErr(context.TODO(), it.id, time.Unix(it.next, 0), err)
return
}
// TODO(docmerlin); handle statistics on the run
}()
} else if err != nil {
s.onErr(context.Background(), it.id, time.Unix(it.next, 0), err)
continue schedulerLoop
}
it = min.(Item)
s.when = time.Unix(it.when, 0)
until := s.time.Until(s.when)
s.timer.Reset(until) // we can reset without stop, because its fired.
s.Unlock()
}
}
}()
return s, nil
return s, s.sm, nil
}
func (s *TreeScheduler) Stop() {
s.RLock()
semaCap := cap(s.sema)
s.RUnlock()
s.done <- struct{}{}
// this is to make sure the semaphore is closed. It tries to pull cap+1 empty structs from the semaphore, only possible when closed
for i := 0; i <= semaCap; i++ {
<-s.sema
}
s.Lock()
close(s.done)
s.Unlock()
s.wg.Wait()
}
type unsent struct {
items []Item
}
func (u *unsent) append(i Item) {
u.items = append(u.items, i)
}
func (s *TreeScheduler) process() {
iter, toReAdd := s.iterator(s.time.Now())
s.scheduled.Ascend(iter)
for i := range toReAdd.items {
s.nextTime[toReAdd.items[i].id] = toReAdd.items[i].ordering
s.scheduled.ReplaceOrInsert(toReAdd.items[i])
}
}
func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *unsent) {
itemsToPlace := &unsent{}
return func(i btree.Item) bool {
if i == nil {
return false
}
it := i.(Item) // we want it to panic if things other than Items are populating the scheduler, as it is something we can't recover from.
if time.Unix(it.next+it.Offset, 0).After(ts) {
return false
}
// distribute to the right worker.
{
buf := [8]byte{}
binary.LittleEndian.PutUint64(buf[:], uint64(it.id))
wc := xxhash.Sum64(buf[:]) % uint64(len(s.workchans)) // we just hash so that the number is uniformly distributed
select {
case s.workchans[wc] <- it:
s.scheduled.Delete(it)
if err := it.updateNext(); err != nil {
s.onErr(context.Background(), it.id, it.Next(), err)
}
itemsToPlace.append(it)
case <-s.done:
return false
default:
s.scheduled.Delete(it)
it.incrementNonce()
itemsToPlace.append(it)
return true
}
}
return true
}, itemsToPlace
}
func (s *TreeScheduler) Now() time.Time {
s.RLock()
now := s.time.Now().UTC()
s.RUnlock()
return now
}
// When gives us the next time the scheduler will run a task.
func (s *TreeScheduler) When() time.Time {
s.RLock()
@ -216,86 +262,142 @@ func (s *TreeScheduler) When() time.Time {
return w
}
func (s *TreeScheduler) release(taskID ID) {
ordering, ok := s.nextTime[taskID]
if !ok {
return
}
// delete the old task run time
s.scheduled.Delete(Item{id: taskID, ordering: ordering})
delete(s.nextTime, taskID)
}
// Release releases a task, if it doesn't own the task it just returns.
// Release also cancels the running task.
// Task deletion would be faster if the tree supported deleting ranges.
func (s *TreeScheduler) Release(taskID ID) error {
func (s *TreeScheduler) Release(taskID ID) {
s.sm.release(taskID)
s.Lock()
defer s.Unlock()
nextTime, ok := s.nextTime[taskID]
if !ok {
return nil
}
// delete the old task run time
s.scheduled.Delete(item{
next: nextTime,
id: taskID,
})
s.running.AscendGreaterOrEqual(runningItem{taskID: taskID}, s.clearTask(taskID))
return nil
s.release(taskID)
s.Unlock()
}
// put puts an Item on the TreeScheduler.
func (s *TreeScheduler) Schedule(id ID, cronString string, offset time.Duration, since time.Time) error {
s.sm.schedule(taskID)
crSch, err := cron.ParseUTC(cronString)
// work does work and reschedules the work as necessary.
// it handles the resceduling, because we need to be able to reschedule based on executor error
func (s *TreeScheduler) work(i int) {
var it Item
defer func() {
s.wg.Done()
}()
for it = range s.workchans[i] {
t := time.Unix(it.next, 0)
err := func() (err error) {
defer func() {
if r := recover(); r != nil {
err = &ErrUnrecoverable{errors.New("Executor panicked")}
}
}()
return s.executor.Execute(context.Background(), it.id, t)
}()
if err != nil {
s.onErr(context.Background(), it.id, it.Next(), err)
}
if err := s.checkpointer.UpdateLastScheduled(context.TODO(), it.id, t); err != nil {
s.onErr(context.Background(), it.id, it.Next(), err)
}
}
}
// Schedule put puts a Schedulable on the TreeScheduler.
func (s *TreeScheduler) Schedule(sch Schedulable) error {
s.sm.schedule(sch.ID())
it := Item{
cron: sch.Schedule(),
id: sch.ID(),
Offset: int64(sch.Offset().Seconds()),
//last: sch.LastScheduled().Unix(),
}
nt, err := it.cron.Next(sch.LastScheduled())
if err != nil {
return err
}
nt, err := crSch.Next(since)
if err != nil {
return err
}
it := item{
cron: crSch,
next: nt.Add(offset).Unix(),
id: id,
}
it.next = nt.UTC().Unix()
it.ordering.when = it.next + it.Offset
s.Lock()
defer s.Unlock()
nextTime, ok := s.nextTime[id]
if !ok {
s.scheduled.ReplaceOrInsert(it)
return nil
}
if s.when.Before(nt) {
nt = nt.Add(sch.Offset())
if s.when.After(nt) {
s.when = nt
s.timer.Reset(time.Until(s.when))
s.timer.Stop()
until := s.time.Until(s.when)
if until <= 0 {
s.timer.Reset(0)
} else {
s.timer.Reset(s.time.Until(s.when))
}
}
nextTime, ok := s.nextTime[it.id]
// delete the old task run time
s.scheduled.Delete(item{
next: nextTime,
id: id,
})
if ok {
// delete the old task run time
s.scheduled.Delete(Item{
ordering: nextTime,
id: it.id,
})
}
s.nextTime[it.id] = ordering{when: it.next + it.Offset + it.wait}
// insert the new task run time
s.scheduled.ReplaceOrInsert(it)
return nil
}
func (s *TreeScheduler) Runs(taskID ID) {
s.RLock()
defer s.RUnlock()
s.running.AscendGreaterOrEqual(runningItem{taskID: 0})
var maxItem = Item{
ordering: ordering{
when: math.MaxInt64,
nonce: int(^uint(0) >> 1),
},
id: maxID,
}
type ordering struct {
when int64
nonce int // for retries
}
func (k *ordering) incrementNonce() {
k.nonce++
}
// Item is a task in the scheduler.
type item struct {
cron cron.Parsed
next int64
nonce int // for retries
offset int
type Item struct {
ordering
id ID
cron Schedule
next int64
wait int64
Offset int64
}
func (it Item) Next() time.Time {
return time.Unix(it.next, 0)
}
// Less tells us if one Item is less than another
func (it item) Less(bItem btree.Item) bool {
it2 := bItem.(item)
return it.next < it2.next || (it.next == it2.next && (it.nonce < it2.nonce || it.nonce == it2.nonce && it.id < it2.id))
func (it Item) Less(bItem btree.Item) bool {
it2 := bItem.(Item)
return it.when < it2.when || (it.when == it2.when && (it.nonce < it2.nonce || it.nonce == it2.nonce && it.id < it2.id))
}
func (it *Item) updateNext() error {
newNext, err := it.cron.Next(time.Unix(it.next, 0))
if err != nil {
return err
}
it.next = newNext.UTC().Unix()
it.when = it.next + it.Offset
return nil
}