From 3fd94cbb6978f6de390f4300d1bf4a0acfe74a9d Mon Sep 17 00:00:00 2001 From: "j. Emrys Landivar (docmerlin)" Date: Mon, 12 Aug 2019 22:19:39 -0500 Subject: [PATCH] feat(task): new scheduler now with more tests --- go.mod | 1 + go.sum | 2 + task/backend/scheduler/scheduler.go | 54 ++- task/backend/scheduler/scheduler_metrics.go | 21 +- task/backend/scheduler/scheduler_test.go | 333 ++++++++++++- task/backend/scheduler/time.go | 237 ++++++++++ task/backend/scheduler/time_test.go | 176 +++++++ task/backend/scheduler/treescheduler.go | 490 ++++++++++++-------- 8 files changed, 1093 insertions(+), 221 deletions(-) create mode 100644 task/backend/scheduler/time.go create mode 100644 task/backend/scheduler/time_test.go diff --git a/go.mod b/go.mod index 3c44585a1d..18668d7b5c 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/hashicorp/go-msgpack v0.0.0-20150518234257-fa3f63826f7c // indirect github.com/hashicorp/raft v1.0.0 // indirect github.com/hashicorp/vault/api v1.0.2 + github.com/influxdata/cron v0.0.0-20190812233253-38faece03642 github.com/influxdata/flux v0.49.0 github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 diff --git a/go.sum b/go.sum index c55128feea..9501afd9ca 100644 --- a/go.sum +++ b/go.sum @@ -233,6 +233,8 @@ github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/changelog v1.0.0 h1:RstJD6H48zLQj0GdE6E6k/6RPwtUjkyzIe/T1E/xuWU= github.com/influxdata/changelog v1.0.0/go.mod h1:uzpGWE/qehT8L426YuXwpMQub+a63vIINhIeEI9mnSM= +github.com/influxdata/cron v0.0.0-20190812233253-38faece03642 h1:ae+mZOcsOpcD0GyaVpqAzR/2t2tffQ2cWArPGohs3A8= +github.com/influxdata/cron v0.0.0-20190812233253-38faece03642/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og= github.com/influxdata/flux v0.49.0 h1:uRIUWqPNhAfy3RfTdidnHFjFRL8q5fHZoUumz3qW1Wo= github.com/influxdata/flux v0.49.0/go.mod h1:jnRutnpW4auRnMYcZQdRhhUKI2xrDAf4X1qjlfSdN6c= github.com/influxdata/goreleaser v0.97.0-influx h1:jT5OrcW7WfS0e2QxfwmTBjhLvpIC9CDLRhNgZJyhj8s= diff --git a/task/backend/scheduler/scheduler.go b/task/backend/scheduler/scheduler.go index 1e3a742f23..5082c79362 100644 --- a/task/backend/scheduler/scheduler.go +++ b/task/backend/scheduler/scheduler.go @@ -3,13 +3,16 @@ package scheduler import ( "context" "time" + + "github.com/influxdata/cron" ) // ID duplicates the influxdb ID so users of the scheduler don't have to -// import influxdb for the id. -// TODO(lh): maybe make this its own thing sometime in the future. +// import influxdb for the ID. type ID uint64 +var maxID = ID(int(^uint(0) >> 1)) + // Executor is a system used by the scheduler to actually execute the scheduleable item. type Executor interface { // Execute is used to execute run's for any schedulable object. @@ -21,7 +24,8 @@ type Executor interface { Execute(ctx context.Context, id ID, scheduledAt time.Time) error } -// Schedulable is the interface that encapsulates the state that is required to schedule a job. +// Schedulable is the interface that encapsulates work that +// is to be executed on a specified schedule. type Schedulable interface { // ID is the unique identifier for this Schedulable ID() ID @@ -48,7 +52,34 @@ type SchedulableService interface { UpdateLastScheduled(ctx context.Context, id ID, t time.Time) error } -type Schedule struct { +func NewSchedule(c string) (Schedule, error) { + sch, err := cron.ParseUTC(c) + return Schedule(sch), err +} + +type Schedule cron.Parsed + +func (s Schedule) Next(from time.Time) (time.Time, error) { + return cron.Parsed(s).Next(from) +} + +// NewErrRetry returns an ErrRetry, it accepts a duration and an error. +func NewErrRetry(d time.Duration, err error) *ErrRetry { + return &ErrRetry{d: d, err: err} +} + +// ErrRetry is an error that the Executor must send if it wants the scheduler to retry the task later. +// It also fulfils the stdlib's Unwraper interface. +type ErrRetry struct { + d time.Duration + err error +} + +func (e *ErrRetry) Error() string { + if e.err != nil { + return "error" + e.err.Error() + "we need to retry in " + e.d.String() + } + return "error we need to retry in " + e.d.String() } // Scheduler is a example interface of a Scheduler. @@ -61,3 +92,18 @@ type Scheduler interface { // Release removes the specified task from the scheduler. Release(taskID ID) error } + +func (e *ErrRetry) Unwrap() error { + return e.err +} + +type ErrUnrecoverable struct { + error +} + +func (e *ErrUnrecoverable) Error() string { + if e.error != nil { + return e.error.Error() + } + return "Error unrecoverable error on task run" +} diff --git a/task/backend/scheduler/scheduler_metrics.go b/task/backend/scheduler/scheduler_metrics.go index e70ba14805..fdf4332d68 100644 --- a/task/backend/scheduler/scheduler_metrics.go +++ b/task/backend/scheduler/scheduler_metrics.go @@ -1,9 +1,7 @@ package scheduler import ( - "time" - - "github.com/influxdata/influxdb/task/backend" + //"github.com/influxdata/influxdb/task/backend" "github.com/prometheus/client_golang/prometheus" ) @@ -58,7 +56,7 @@ func NewSchedulerMetrics(te *TreeScheduler) *SchedulerMetrics { Namespace: namespace, Subsystem: subsystem, Name: "schedule_delay", - Help: "The duration between when a item should be scheduled and when it is told to execute.", + Help: "The duration between when a Item should be scheduled and when it is told to execute.", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), @@ -92,18 +90,6 @@ func (em *SchedulerMetrics) release(taskID ID) { em.releaseCalls.Inc() } -func (em *SchedulerMetrics) startExecution(taskID ID, scheduleDelay time.Duration) { - em.totalExecuteCalls.Inc() - em.scheduleDelay.Observe(scheduleDelay.Seconds()) -} - -func (em *SchedulerMetrics) finishExecution(taskID ID, failure bool, status backend.RunStatus, executeDelta time.Duration) { - em.executeDelta.Observe(executeDelta.Seconds()) - if failure { - em.totalExecuteFailure.Inc() - } -} - func newExecutingTasks(ts *TreeScheduler) *executingTasks { return &executingTasks{ desc: prometheus.NewDesc( @@ -123,5 +109,6 @@ func (r *executingTasks) Describe(ch chan<- *prometheus.Desc) { // Collect returns the current state of all metrics of the run collector. func (r *executingTasks) Collect(ch chan<- prometheus.Metric) { - ch <- prometheus.MustNewConstMetric(r.desc, prometheus.GaugeValue, float64(len(r.ts.sema))) + // TODO(docmerlin): fix this metric + ch <- prometheus.MustNewConstMetric(r.desc, prometheus.GaugeValue, float64(len(r.ts.workchans))) } diff --git a/task/backend/scheduler/scheduler_test.go b/task/backend/scheduler/scheduler_test.go index cdeab0c4df..89c63bf986 100644 --- a/task/backend/scheduler/scheduler_test.go +++ b/task/backend/scheduler/scheduler_test.go @@ -6,27 +6,348 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/task/backend" - "github.com/influxdata/influxdb/task/backend/scheduler" ) type mockExecutor struct { sync.Mutex + fn func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) + Err error } -func (e *mockExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) (backend.RunPromise, error) { +type mockSchedulable struct { + id scheduler.ID + schedule scheduler.Schedule + offset time.Duration + lastScheduled time.Time +} + +func (s mockSchedulable) ID() scheduler.ID { + return s.id +} + +func (s mockSchedulable) Schedule() scheduler.Schedule { + return s.schedule +} +func (s mockSchedulable) Offset() time.Duration { + return s.offset +} +func (s mockSchedulable) LastScheduled() time.Time { + return s.lastScheduled +} + +func (e *mockExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) error { + done := make(chan struct{}, 1) select { case <-ctx.Done(): + default: + e.fn(&sync.Mutex{}, ctx, id, scheduledAt) + done <- struct{}{} } + return nil +} + +type mockSchedulableService struct { + fn func(ctx context.Context, id scheduler.ID, t time.Time) error +} + +func (m *mockSchedulableService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error { + + return nil } func TestSchedule_Next(t *testing.T) { + t.Run("fires properly", func(t *testing.T) { + now := time.Now() + c := make(chan time.Time, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- scheduledAt: + default: + t.Errorf("called the executor too many times") + } + }} + sch, _, err := scheduler.NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error { + return nil + }}, + scheduler.WithMaxConcurrentWorkers(2)) + if err != nil { + t.Fatal(err) + } + defer sch.Stop() + schedule, err := scheduler.NewSchedule("* * * * * * *") + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now.Add(-20 * time.Second)}) + if err != nil { + t.Fatal(err) + } + + select { + case <-c: + case <-time.After(10 * time.Second): + t.Fatal("test timed out", sch.Now().Unix(), sch.When().Unix()) + } + }) + t.Run("doesn't fire when the task isn't ready", func(t *testing.T) { + now := time.Now() + c := make(chan time.Time, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- scheduledAt: + default: + t.Errorf("called the executor too many times") + } + }} + mockTime := scheduler.NewMockTime(now) + sch, _, err := scheduler.NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error { + return nil + }}, + scheduler.WithTime(mockTime), + scheduler.WithMaxConcurrentWorkers(2)) + if err != nil { + t.Fatal(err) + } + defer sch.Stop() + schedule, err := scheduler.NewSchedule("* * * * * * *") + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now.Add(time.Second)}) + if err != nil { + t.Fatal(err) + } + go func() { + mockTime.Set(mockTime.T.Add(2 * time.Second)) + }() + + select { + case <-c: + t.Fatal("test timed out", sch.Now().Unix(), sch.When().Unix()) + case <-time.After(2 * time.Second): + } + + }) + + t.Run("fires the correct number of times for the interval with a single schedulable", func(t *testing.T) { + now := time.Now().UTC() + c := make(chan time.Time, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- scheduledAt: + } + }} + mockTime := scheduler.NewMockTime(now) + sch, _, err := scheduler.NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error { + return nil + }}, + scheduler.WithTime(mockTime), + scheduler.WithMaxConcurrentWorkers(20)) + if err != nil { + t.Fatal(err) + } + defer sch.Stop() + schedule, err := scheduler.NewSchedule("* * * * * * *") + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now}) + if err != nil { + t.Fatal(err) + } + go func() { + mockTime.Set(mockTime.T.Add(17 * time.Second)) + }() + + after := time.After(6 * time.Second) + for i := 0; i < 16; i++ { + select { + case <-c: + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 16 times", i) + } + } + + go func() { + mockTime.Set(mockTime.T.Add(2 * time.Second)) + }() + + after = time.After(6 * time.Second) + + for i := 0; i < 2; i++ { + select { + case <-c: + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 2 times", i) + } + } + + select { + case <-c: + t.Fatalf("test scheduler fired too many times") + case <-time.After(2 * time.Second): + } + }) + + t.Run("fires the correct number of times for the interval with multiple schedulables", func(t *testing.T) { + now := time.Date(2016, 0, 0, 0, 1, 1, 0, time.UTC) + c := make(chan struct { + ts time.Time + id scheduler.ID + }, 100) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) { + select { + case <-ctx.Done(): + t.Log("ctx done") + case c <- struct { + ts time.Time + id scheduler.ID + }{ + ts: scheduledAt, + id: id, + }: + } + }} + mockTime := scheduler.NewMockTime(now) + sch, _, err := scheduler.NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error { + return nil + }}, + scheduler.WithTime(mockTime), + scheduler.WithMaxConcurrentWorkers(20)) + if err != nil { + t.Fatal(err) + } + defer sch.Stop() + schedule, err := scheduler.NewSchedule("* * * * * * *") + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now}) + if err != nil { + t.Fatal(err) + } + schedule2, err := scheduler.NewSchedule("*/2 * * * * * *") + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 2, schedule: schedule2, offset: time.Second, lastScheduled: now}) + if err != nil { + t.Fatal(err) + } + + go func() { + mockTime.Set(mockTime.T.Add(17 * time.Second)) + }() + + after := time.After(6 * time.Second) + for i := 0; i < 24; i++ { + select { + case <-c: + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 24 times", i) + } + } + + go func() { + mockTime.Set(mockTime.T.Add(2 * time.Second)) + }() + + after = time.After(6 * time.Second) + + for i := 0; i < 3; i++ { + select { + case <-c: + case <-after: + t.Fatalf("test timed out, only fired %d times but should have fired 3 times", i) + } + } + + select { + case <-c: + t.Fatalf("test scheduler fired too many times") + case <-time.After(2 * time.Second): + } + }) +} + +func TestTreeScheduler_Stop(t *testing.T) { now := time.Now().Add(-20 * time.Second) - exe := mockExecutor{} - sch, err := scheduler.NewScheduler(exe.Execute) + mockTime := scheduler.NewMockTime(now) + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) {}} + sch, _, err := scheduler.NewScheduler(exe, &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error { + return nil + }}, + scheduler.WithTime(mockTime)) if err != nil { t.Fatal(err) } - sch.Schedule(1, "* * * * * * *", 10*time.Second, now.Add(20*time.Second)) + sch.Stop() +} + +func TestSchedule_panic(t *testing.T) { + // panics in the executor should be treated as errors + now := time.Now().UTC() + c := make(chan struct { + ts time.Time + err error + }, 1) + + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id scheduler.ID, scheduledAt time.Time) { + panic("yikes oh no!") + }} + + sch, _, err := scheduler.NewScheduler( + exe, + &mockSchedulableService{fn: func(ctx context.Context, id scheduler.ID, t time.Time) error { + return nil + }}, + scheduler.WithMaxConcurrentWorkers(1), // to make debugging easier + scheduler.WithOnErrorFn(func(_ context.Context, _ scheduler.ID, ts time.Time, err error) { + c <- struct { + ts time.Time + err error + }{ + ts: ts, + err: err, + } + })) + if err != nil { + t.Fatal(err) + } + + schedule, err := scheduler.NewSchedule("* * * * * * *") + if err != nil { + t.Fatal(err) + } + + err = sch.Schedule(mockSchedulable{id: 1, schedule: schedule, offset: time.Second, lastScheduled: now.Add(-20 * time.Second)}) + if err != nil { + t.Fatal(err) + } + + select { + case <-c: // panic was caught and error handler used + case <-time.After(10 * time.Second): + t.Fatal("test timed out", now.UTC().Unix(), sch.Now().Unix(), sch.When().Unix()) + } } diff --git a/task/backend/scheduler/time.go b/task/backend/scheduler/time.go new file mode 100644 index 0000000000..282bbff7ff --- /dev/null +++ b/task/backend/scheduler/time.go @@ -0,0 +1,237 @@ +package scheduler + +import ( + "sync" + "time" +) + +// Time is an interface to allow us to mock time. +type Time interface { + Now() time.Time + Unix(seconds, nanoseconds int64) time.Time + NewTimer(d time.Duration) Timer + Until(time.Time) time.Duration +} + +type stdTime struct{} + +// Now gives us the current time as time.Time would +func (stdTime) Now() time.Time { + return time.Now() +} + +// Unix gives us the time given seconds and nanoseconds. +func (stdTime) Unix(sec, nsec int64) time.Time { + return time.Unix(sec, nsec) +} + +func (stdTime) Until(t time.Time) time.Duration { + return time.Until(t) +} + +// NewTimer gives us a Timer that fires after duration d. +func (stdTime) NewTimer(d time.Duration) Timer { + t := time.NewTimer(d) + return &stdTimer{*t} +} + +// Timer is an interface to allow us to mock out timers. It has behavior like time.Timer +type Timer interface { + C() <-chan time.Time + Reset(d time.Duration) bool + Stop() bool +} + +// stdTimer is a Timer that wraps time.Time. +type stdTimer struct { + time.Timer +} + +// C returns a <-chan time.Time and can be used much like time.Timer.C. +func (t *stdTimer) C() <-chan time.Time { + return t.Timer.C +} + +// MockTime is a time that mocks out some methods of time.Time. +// It doesn't advance the time over time, but only changes it with calls to Set. +// Use NewMockTime to create Mocktimes, don't instanciate the struct directly unless you want to mess with the sync Cond. +type MockTime struct { + sync.RWMutex + *sync.Cond + T time.Time +} + +// NewMockTime create a mock of time that returns the underlying time.Time. +func NewMockTime(t time.Time) *MockTime { + mt := &MockTime{ + T: t, + Cond: sync.NewCond(&sync.Mutex{}), + } + return mt +} + +// Now returns the stored time.Time, It is to mock out time.Now(). +func (t *MockTime) Now() time.Time { + t.RLock() + defer t.RUnlock() + return t.T +} + +// Unix creates a time.Time given seconds and nanoseconds. It just wraps time.Unix. +func (*MockTime) Unix(sec, nsec int64) time.Time { + return time.Unix(sec, nsec) +} + +// Util is equivalent to t.T.Sub(ts). We need it to mock out time, because the non-mocked implementation needs to be monotonic. +func (t *MockTime) Until(ts time.Time) time.Duration { + t.RLock() + defer t.RUnlock() + return ts.Sub(t.T) +} + +func (t *MockTime) Set(ts time.Time) { + t.Lock() + defer t.Unlock() + t.Cond.L.Lock() + t.T = ts + t.Cond.Broadcast() + t.Cond.L.Unlock() + +} + +// MockTimer is a struct to mock out Timer. +type MockTimer struct { + T *MockTime + fireTime time.Time + c chan time.Time + stopch chan struct{} + active bool + wg sync.WaitGroup + starting sync.WaitGroup +} + +// NewTimer returns a timer that will fire after d time.Duration from the underlying time in the MockTime. It doesn't +// actually fire after a duration, but fires when you Set the MockTime used to create it, to a time greater than or +// equal to the underlying MockTime when it was created plus duration d. +func (t *MockTime) NewTimer(d time.Duration) Timer { + t.Cond.L.Lock() + timer := &MockTimer{ + T: t, + fireTime: t.T.Add(d), + stopch: make(chan struct{}, 1), + c: make(chan time.Time, 1), + } + timer.start(d) + t.Cond.L.Unlock() + return timer +} + +func (t *MockTimer) C() <-chan time.Time { + return t.c +} + +func (t *MockTimer) Reset(d time.Duration) bool { + t.starting.Wait() + t.T.Cond.L.Lock() + // clear the channels + { + select { + case <-t.stopch: + default: + } + select { + case <-t.c: + default: + } + } + defer t.T.Cond.L.Unlock() + t.fireTime = t.T.Now().Add(d) + t.start(d) + t.T.Cond.Broadcast() + return false + +} + +func (t *MockTimer) Stop() (active bool) { + t.starting.Wait() + t.T.Cond.L.Lock() + defer func() { + t.T.Cond.Broadcast() + t.T.Cond.L.Unlock() + t.wg.Wait() + }() + if !t.active { + select { + case t.c <- t.fireTime: + default: + + } + return false + } + select { + case t.stopch <- struct{}{}: + default: + } + if !t.active { + select { + case t.c <- t.fireTime: + default: + } + } + return t.active +} + +func (t *MockTimer) start(ts time.Duration) { + if ts <= 0 { + t.c <- t.fireTime + return + } + t.wg.Add(1) + t.starting.Add(1) + go func() { + defer func() { + t.active = false + t.T.Cond.L.Unlock() + t.wg.Done() + }() + for { + t.T.Cond.L.Lock() + if !t.active { + t.active = true // this needs to be after we tale the lock, but before we exit the starting state + t.starting.Done() // this needs to be after we take the lock on start, to ensure this goroutine starts before we stop or reset + } + //check it should already be fired/stopped + if !t.T.T.Before(t.fireTime) { + select { + case t.c <- t.fireTime: + return + case <-t.stopch: + return + default: + } + } + t.T.Cond.Wait() + select { + case <-t.stopch: + return + default: + } + // check it needs to be be fired/stopped + + if !t.T.T.Before(t.fireTime) { + select { + case t.c <- t.fireTime: + return + case <-t.stopch: + return + } + } + select { + case <-t.stopch: + return + default: + } + t.T.Cond.L.Unlock() + } + }() +} diff --git a/task/backend/scheduler/time_test.go b/task/backend/scheduler/time_test.go new file mode 100644 index 0000000000..b1c0cab209 --- /dev/null +++ b/task/backend/scheduler/time_test.go @@ -0,0 +1,176 @@ +package scheduler + +import ( + "testing" + "time" +) + +func TestStdTime_Now(t *testing.T) { + t1 := stdTime{}.Now() + time.Sleep(time.Nanosecond) + t2 := stdTime{}.Now() + if !t1.Before(t2) { + t.Fatal() + } +} + +func TestStdTime_Unix(t *testing.T) { + now := time.Now() + t1 := stdTime{}.Unix(now.Unix(), int64(now.Nanosecond())) + if !t1.Equal(now) { + t.Fatal("expected the two times to be equivalent but they were not") + } +} + +func TestMockTimer(t *testing.T) { + timeForComparison := time.Now() //time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC) + mt := NewMockTime(timeForComparison) + timer := mt.NewTimer(10 * time.Second) + select { + case <-timer.C(): + t.Fatalf("expected timer not to fire till time was up, but did") + default: + } + go mt.Set(timeForComparison.Add(10 * time.Second)) + select { + case <-timer.C(): + case <-time.After(3 * time.Second): + t.Fatal("expected timer to fire when time was up, but it didn't, it fired after a 3 second timeout") + } + timer.Reset(33 * time.Second) + go mt.Set(timeForComparison.Add(50 * time.Second)) + select { + case <-timer.C(): + case <-time.After(4 * time.Second): + t.Fatal("expected timer to fire when time was up, but it didn't, it fired after a 4 second timeout") + } + if !timer.Stop() { + <-timer.C() + } + timer.Reset(10000 * time.Second) + select { + case ts := <-timer.C(): + t.Errorf("expected timer to NOT fire if time was not up, but it did at ts: %s", ts) + default: + } + + timer2 := mt.NewTimer(10000 * time.Second) + select { + case ts := <-timer2.C(): + t.Errorf("expected timer to NOT fire if time was not up, but it did at ts: %s", ts) + case <-time.After(4 * time.Second): + } + + if !timer2.Stop() { + <-timer2.C() + } + timer2.Reset(0) + select { + case <-time.After(4 * time.Second): + t.Error("expected timer to fire when it was reset to 0, but it didn't") + case <-timer2.C(): + } + + if !timer2.Stop() { + <-timer2.C() + } + timer2.Reset(-time.Second) + select { + case <-time.After(4 * time.Second): + t.Error("expected timer to fire when it was reset to a negative duration, but it didn't") + case <-timer2.C(): + } + + if !timer2.Stop() { + <-timer2.C() + } + timer2.Reset(1 * time.Second) + go func() { + mt.Set(mt.T.Add(1 * time.Second)) + }() + select { + case <-time.After(4 * time.Second): + t.Error("expected timer to fire after it was reset to a small duration, but it didn't") + case <-timer2.C(): + } + + timer2.Reset(1 * time.Second) + go func() { + mt.Set(mt.T.Add(time.Second / 2)) + }() + select { + case <-time.After(time.Second): + case <-timer2.C(): + t.Error("expected timer to not fire after it was reset to a too small duration, but it did") + + } + +} + +func TestMockTimer_Stop(t *testing.T) { + timeForComparison := time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC) + mt := NewMockTime(timeForComparison) + timer := mt.NewTimer(10 * time.Second) + if !timer.Stop() { + t.Fatal("expected MockTimer.Stop() to be true if it hadn't fired yet") + } + + if !timer.Stop() { + select { + case <-timer.C(): + case <-time.After(2 * time.Second): + t.Fatal("timer didn't fire to clear when it should have") + } + } else { + t.Fatalf("Expected MockTimer.Stop() to be false when it was already stopped but it wasn't") + } + + timer.Reset(time.Second) + go mt.Set(timeForComparison.Add(20 * time.Second)) + select { + case <-timer.C(): + case <-time.After(2 * time.Second): + t.Fatal("timer didn't fire when it should have") + } + if !timer.Stop() { + select { + case <-timer.C(): + case <-time.After(2 * time.Second): + t.Fatal("timer didn't fire to clear when it should have") + } + } else { + t.Fatalf("Expected MockTimer.Stop() to be false when it was already fired but it wasn't") + } +} + +func TestMockTime_Until(t *testing.T) { + tests := []struct { + name string + mocktime time.Time + ts time.Time + want time.Duration + }{{ + name: "happy", + mocktime: time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC), + ts: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC), + want: 2*time.Minute + 50*time.Second + 4*time.Nanosecond, + }, { + name: "negative", + mocktime: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC), + ts: time.Date(2016, 2, 3, 4, 5, 6, 7, time.UTC), + want: -(2*time.Minute + 50*time.Second + 4*time.Nanosecond), + }, { + name: "zero", + mocktime: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC), + ts: time.Date(2016, 2, 3, 4, 7, 56, 11, time.UTC), + want: 0, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tm := NewMockTime(tt.mocktime) + if got := tm.Until(tt.ts); got != tt.want { + t.Errorf("MockTime.Until() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/task/backend/scheduler/treescheduler.go b/task/backend/scheduler/treescheduler.go index 0e6e9dfbff..8ddc739148 100644 --- a/task/backend/scheduler/treescheduler.go +++ b/task/backend/scheduler/treescheduler.go @@ -2,212 +2,258 @@ package scheduler import ( "context" + "encoding/binary" "errors" + "math" "sync" "time" + "github.com/cespare/xxhash" "github.com/google/btree" - "github.com/influxdata/cron" - "github.com/influxdata/influxdb/task/backend" ) -const cancelTimeOut = 30 * time.Second +const ( + maxWaitTime = time.Hour + degreeBtreeScheduled = 3 + defaultMaxWorkers = 32 +) -const defaultMaxRunsOutstanding = 1 << 16 - -type runningItem struct { - cancel func() - runID ID - taskID ID -} - -func (it runningItem) Less(bItem btree.Item) bool { - it2 := bItem.(runningItem) - return it.taskID < it2.taskID || (it.taskID == it2.taskID && it.runID < it2.runID) -} - -// TreeScheduler is a Scheduler based on a btree +// TreeScheduler is a Scheduler based on a btree. +// It calls Executor in-order per ID. That means you are guaranteed that for a specific ID, +// +// If a call to an Executorfunc returns an *ErrRetry then all calls to Executor of the entire task will be delayed +// temporarily by the amount specified in *ErrRetry, but future calls to Executor for that task will proceed normally. +// +// - The scheduler should, after creation, automatically call ExecutorFunc, when a task should run as defined by its Schedulable. +// +// - the scheduler's should not be able to get into a state where blocks Release and Schedule indefinitely. +// +// - Schedule should add a Schedulable to being scheduled, and Release should remove a task from being scheduled. +// +// - Calling of ExecutorFunc should be serial in time on a per taskID basis. I.E.: the run at 12:00 will go before the run at 12:01. +// +// Design: +// +// The core of the scheduler is a btree keyed by time, a nonce, and a task ID, and a map keyed by task ID and containing a +// nonce and a time (called a uniqueness index from now on). +// The map is to ensure task uniqueness in the tree, so we can replace or delete tasks in the tree. +// +// Scheduling in the tree consists of a main loop that feeds a fixed set of workers, each with their own communication channel. +// Distribution is handled by hashing the TaskID (to ensure uniform distribution) and then distributing over those channels +// evenly based on the hashed ID. This is to ensure that all tasks of the same ID go to the same worker. +// +//The workers call ExecutorFunc handle any errors and update the LastScheduled time internally and also via the Checkpointer. +// +// The main loop: +// +// The main loop waits on a time.Timer to grab the task with the minimum time. Once it successfully grabs a task ready +// to trigger, it will start walking the btree from the item nearest +// +// Putting a task into the scheduler: +// +// Adding a task to the scheduler acquires a write lock, grabs the task from the uniqueness map, and replaces the item +// in the uniqueness index and btree. If new task would trigger sooner than the current soonest triggering task, it +// replaces the Timer when added to the scheduler. Finally it releases the write lock. +// +// Removing a task from the scheduler: +// +// Removing a task from the scheduler acquires a write lock, deletes the task from the uniqueness index and from the +// btree, then releases the lock. We do not have to readjust the time on delete, because, if the minimum task isn't +// ready yet, the main loop just resets the timer and keeps going. type TreeScheduler struct { sync.RWMutex - scheduled btree.BTree - running btree.BTree - nextTime map[ID]int64 // we need this index so we can delete items from the scheduled - when time.Time - executor func(ctx context.Context, id ID, scheduledAt time.Time) (backend.RunPromise, error) - onErr func(ctx context.Context, id ID, scheduledAt time.Time, err error) bool - timer *time.Timer - done chan struct{} - sema chan struct{} - wg sync.WaitGroup + scheduled *btree.BTree + nextTime map[ID]ordering // we need this index so we can delete items from the scheduled + when time.Time + executor Executor + onErr ErrorFunc + time Time + timer Timer + done chan struct{} + workchans []chan Item + wg sync.WaitGroup + checkpointer SchedulableService sm *SchedulerMetrics } -// clearTask is a method for deleting a range of tasks. -// TODO(docmerlin): add an actual ranged delete to github.com/google/btree -func (s *TreeScheduler) clearTask(taskID ID) btree.ItemIterator { - return func(i btree.Item) bool { - del := i.(runningItem).taskID == taskID - if !del { - return false - } - s.running.Delete(runningItem{taskID: taskID}) - return true - } -} +type ExecutorFunc func(ctx context.Context, id ID, scheduledAt time.Time) error -// clearTask is a method for deleting a range of tasks. -func (s *TreeScheduler) runs(taskID ID, limit int) btree.ItemIterator { - - acc := make([]ID, 0, limit) - - return func(i btree.Item) bool { - match := i.(runningItem).taskID == taskID - if !match { - return false - } - - return true - } -} - -const maxWaitTime = 1000000 * time.Hour - -type ExecutorFunc func(ctx context.Context, id ID, scheduledAt time.Time) (backend.RunPromise, error) - -type ErrorFunc func(ctx context.Context, id ID, scheduledAt time.Time, err error) bool +type ErrorFunc func(ctx context.Context, taskID ID, scheduledAt time.Time, err error) type treeSchedulerOptFunc func(t *TreeScheduler) error -func WithOnErrorFn(fn func(ctx context.Context, id ID, scheduledAt time.Time, err error) bool) treeSchedulerOptFunc { +func WithOnErrorFn(fn ErrorFunc) treeSchedulerOptFunc { return func(t *TreeScheduler) error { t.onErr = fn return nil } } -func WithMaxRunsOutsanding(n int) treeSchedulerOptFunc { +func WithMaxConcurrentWorkers(n int) treeSchedulerOptFunc { return func(t *TreeScheduler) error { - t.sema = make(chan struct{}, n) + t.workchans = make([]chan Item, n) + return nil + } +} + +func WithTime(t Time) treeSchedulerOptFunc { + return func(sch *TreeScheduler) error { + sch.time = t return nil } } // Executor is any function that accepts an ID, a time, and a duration. -// OnErr is a function that takes am error, it is called when we cannot find a viable time before jan 1, 2100. The default behavior is to drop the task on error. -func NewScheduler(Executor ExecutorFunc, opts ...treeSchedulerOptFunc) (*TreeScheduler, *SchedulerMetrics, error) { +func NewScheduler(executor Executor, checkpointer SchedulableService, opts ...treeSchedulerOptFunc) (*TreeScheduler, *SchedulerMetrics, error) { s := &TreeScheduler{ - executor: Executor, - onErr: func(_ context.Context, _ ID, _ time.Time, _ error) bool { return true }, - sema: make(chan struct{}, defaultMaxRunsOutstanding), + executor: executor, + scheduled: btree.New(degreeBtreeScheduled), + nextTime: map[ID]ordering{}, + onErr: func(_ context.Context, _ ID, _ time.Time, _ error) {}, + time: stdTime{}, + done: make(chan struct{}, 1), + checkpointer: checkpointer, } // apply options for i := range opts { if err := opts[i](s); err != nil { - return nil, err + return nil, nil, err } } + if s.workchans == nil { + s.workchans = make([]chan Item, defaultMaxWorkers) + + } + + s.wg.Add(len(s.workchans)) + for i := 0; i < len(s.workchans); i++ { + s.workchans[i] = make(chan Item) + go s.work(i) + } s.sm = NewSchedulerMetrics(s) - s.when = time.Now().Add(maxWaitTime) - s.timer = time.NewTimer(time.Until(s.when)) //time.Until(s.when)) - if Executor == nil { - return nil, errors.New("Executor must be a non-nil function") + s.when = s.time.Now().Add(maxWaitTime) + s.timer = s.time.NewTimer(maxWaitTime) + if executor == nil { + return nil, nil, errors.New("Executor must be a non-nil function") } + s.wg.Add(1) go func() { + defer s.wg.Done() + schedulerLoop: for { select { case <-s.done: s.Lock() s.timer.Stop() + // close workchans + for i := range s.workchans { + close(s.workchans[i]) + } s.Unlock() - close(s.sema) return - case <-s.timer.C: - iti := s.scheduled.DeleteMin() - if iti == nil { - s.Lock() - s.timer.Reset(maxWaitTime) + case <-s.timer.C(): + s.Lock() + min := s.scheduled.Min() + if min == nil { // grab a new item, because there could be a different item at the top of the queue + s.when = s.time.Now().Add(maxWaitTime) + s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired. s.Unlock() - continue + continue schedulerLoop } - if iti == nil { - s.Lock() - s.timer.Reset(maxWaitTime) + it := min.(Item) + if it.when > s.when.UTC().Unix() { s.Unlock() - continue + continue schedulerLoop } - it := iti.(item) - s.sm.startExecution(it.id, time.Since(time.Unix(it.next, 0))) - if prom, err := s.executor(context.Background(), it.id, time.Unix(it.next, 0)); err == nil { - t, err := it.cron.Next(time.Unix(it.next, 0)) - it.next = t.Unix() - // we need to return the item to the scheduled before calling s.onErr - if err != nil { - it.nonce++ - s.onErr(context.TODO(), it.id, time.Unix(it.next, 0), err) - } - s.scheduled.ReplaceOrInsert(it) - if prom == nil { - break - } - run := prom.Run() - s.Lock() - s.running.ReplaceOrInsert(runningItem{cancel: prom.Cancel, runID: ID(run.RunID), taskID: ID(run.TaskID)}) + s.process() + min = s.scheduled.Min() + if min == nil { // grab a new item, because there could be a different item at the top of the queue after processing + s.when = s.time.Now().Add(maxWaitTime) + s.timer.Reset(maxWaitTime) // we can reset without stop, because its fired. s.Unlock() - - s.wg.Add(1) - - s.sema <- struct{}{} - go func() { - defer func() { - s.wg.Done() - <-s.sema - }() - res, err := prom.Wait() - if err != nil { - s.onErr(context.TODO(), it.id, time.Unix(it.next, 0), err) - } - // belt and suspenders - if res == nil { - return - } - run := prom.Run() - s.Lock() - s.running.Delete(runningItem{cancel: prom.Cancel, runID: ID(run.RunID), taskID: ID(run.TaskID)}) - s.Unlock() - - s.sm.finishExecution(it.id, res.Err() == nil, time.Since(time.Unix(it.next, 0))) - - if err = res.Err(); err != nil { - s.onErr(context.TODO(), it.id, time.Unix(it.next, 0), err) - return - } - // TODO(docmerlin); handle statistics on the run - }() - } else if err != nil { - s.onErr(context.Background(), it.id, time.Unix(it.next, 0), err) + continue schedulerLoop } + it = min.(Item) + s.when = time.Unix(it.when, 0) + until := s.time.Until(s.when) + s.timer.Reset(until) // we can reset without stop, because its fired. + s.Unlock() } } }() - return s, nil + return s, s.sm, nil } func (s *TreeScheduler) Stop() { - s.RLock() - semaCap := cap(s.sema) - s.RUnlock() - s.done <- struct{}{} - - // this is to make sure the semaphore is closed. It tries to pull cap+1 empty structs from the semaphore, only possible when closed - for i := 0; i <= semaCap; i++ { - <-s.sema - } + s.Lock() + close(s.done) + s.Unlock() s.wg.Wait() } +type unsent struct { + items []Item +} + +func (u *unsent) append(i Item) { + u.items = append(u.items, i) +} + +func (s *TreeScheduler) process() { + iter, toReAdd := s.iterator(s.time.Now()) + s.scheduled.Ascend(iter) + for i := range toReAdd.items { + s.nextTime[toReAdd.items[i].id] = toReAdd.items[i].ordering + s.scheduled.ReplaceOrInsert(toReAdd.items[i]) + } +} + +func (s *TreeScheduler) iterator(ts time.Time) (btree.ItemIterator, *unsent) { + itemsToPlace := &unsent{} + return func(i btree.Item) bool { + if i == nil { + return false + } + it := i.(Item) // we want it to panic if things other than Items are populating the scheduler, as it is something we can't recover from. + if time.Unix(it.next+it.Offset, 0).After(ts) { + return false + } + // distribute to the right worker. + { + buf := [8]byte{} + binary.LittleEndian.PutUint64(buf[:], uint64(it.id)) + wc := xxhash.Sum64(buf[:]) % uint64(len(s.workchans)) // we just hash so that the number is uniformly distributed + select { + case s.workchans[wc] <- it: + s.scheduled.Delete(it) + if err := it.updateNext(); err != nil { + s.onErr(context.Background(), it.id, it.Next(), err) + } + itemsToPlace.append(it) + + case <-s.done: + return false + default: + s.scheduled.Delete(it) + it.incrementNonce() + itemsToPlace.append(it) + return true + } + } + return true + }, itemsToPlace +} + +func (s *TreeScheduler) Now() time.Time { + s.RLock() + now := s.time.Now().UTC() + s.RUnlock() + return now +} + // When gives us the next time the scheduler will run a task. func (s *TreeScheduler) When() time.Time { s.RLock() @@ -216,86 +262,142 @@ func (s *TreeScheduler) When() time.Time { return w } +func (s *TreeScheduler) release(taskID ID) { + ordering, ok := s.nextTime[taskID] + if !ok { + return + } + + // delete the old task run time + s.scheduled.Delete(Item{id: taskID, ordering: ordering}) + delete(s.nextTime, taskID) +} + // Release releases a task, if it doesn't own the task it just returns. // Release also cancels the running task. // Task deletion would be faster if the tree supported deleting ranges. -func (s *TreeScheduler) Release(taskID ID) error { +func (s *TreeScheduler) Release(taskID ID) { s.sm.release(taskID) s.Lock() - defer s.Unlock() - nextTime, ok := s.nextTime[taskID] - if !ok { - return nil - } - - // delete the old task run time - s.scheduled.Delete(item{ - next: nextTime, - id: taskID, - }) - - s.running.AscendGreaterOrEqual(runningItem{taskID: taskID}, s.clearTask(taskID)) - return nil + s.release(taskID) + s.Unlock() } -// put puts an Item on the TreeScheduler. -func (s *TreeScheduler) Schedule(id ID, cronString string, offset time.Duration, since time.Time) error { - s.sm.schedule(taskID) - crSch, err := cron.ParseUTC(cronString) +// work does work and reschedules the work as necessary. +// it handles the resceduling, because we need to be able to reschedule based on executor error +func (s *TreeScheduler) work(i int) { + var it Item + defer func() { + s.wg.Done() + }() + for it = range s.workchans[i] { + t := time.Unix(it.next, 0) + err := func() (err error) { + defer func() { + if r := recover(); r != nil { + err = &ErrUnrecoverable{errors.New("Executor panicked")} + } + }() + return s.executor.Execute(context.Background(), it.id, t) + }() + if err != nil { + s.onErr(context.Background(), it.id, it.Next(), err) + } + if err := s.checkpointer.UpdateLastScheduled(context.TODO(), it.id, t); err != nil { + s.onErr(context.Background(), it.id, it.Next(), err) + } + } +} + +// Schedule put puts a Schedulable on the TreeScheduler. +func (s *TreeScheduler) Schedule(sch Schedulable) error { + s.sm.schedule(sch.ID()) + it := Item{ + cron: sch.Schedule(), + id: sch.ID(), + Offset: int64(sch.Offset().Seconds()), + //last: sch.LastScheduled().Unix(), + } + nt, err := it.cron.Next(sch.LastScheduled()) if err != nil { return err } - nt, err := crSch.Next(since) - if err != nil { - return err - } - it := item{ - cron: crSch, - next: nt.Add(offset).Unix(), - id: id, - } + it.next = nt.UTC().Unix() + it.ordering.when = it.next + it.Offset + s.Lock() defer s.Unlock() - nextTime, ok := s.nextTime[id] - if !ok { - s.scheduled.ReplaceOrInsert(it) - return nil - } - if s.when.Before(nt) { + nt = nt.Add(sch.Offset()) + if s.when.After(nt) { s.when = nt - s.timer.Reset(time.Until(s.when)) + s.timer.Stop() + until := s.time.Until(s.when) + if until <= 0 { + s.timer.Reset(0) + } else { + s.timer.Reset(s.time.Until(s.when)) + } } + nextTime, ok := s.nextTime[it.id] - // delete the old task run time - s.scheduled.Delete(item{ - next: nextTime, - id: id, - }) + if ok { + // delete the old task run time + s.scheduled.Delete(Item{ + ordering: nextTime, + id: it.id, + }) + } + s.nextTime[it.id] = ordering{when: it.next + it.Offset + it.wait} // insert the new task run time s.scheduled.ReplaceOrInsert(it) return nil } -func (s *TreeScheduler) Runs(taskID ID) { - s.RLock() - defer s.RUnlock() - s.running.AscendGreaterOrEqual(runningItem{taskID: 0}) +var maxItem = Item{ + ordering: ordering{ + when: math.MaxInt64, + nonce: int(^uint(0) >> 1), + }, + id: maxID, +} +type ordering struct { + when int64 + nonce int // for retries +} + +func (k *ordering) incrementNonce() { + k.nonce++ } // Item is a task in the scheduler. -type item struct { - cron cron.Parsed - next int64 - nonce int // for retries - offset int +type Item struct { + ordering id ID + cron Schedule + next int64 + wait int64 + Offset int64 +} + +func (it Item) Next() time.Time { + return time.Unix(it.next, 0) } // Less tells us if one Item is less than another -func (it item) Less(bItem btree.Item) bool { - it2 := bItem.(item) - return it.next < it2.next || (it.next == it2.next && (it.nonce < it2.nonce || it.nonce == it2.nonce && it.id < it2.id)) +func (it Item) Less(bItem btree.Item) bool { + it2 := bItem.(Item) + return it.when < it2.when || (it.when == it2.when && (it.nonce < it2.nonce || it.nonce == it2.nonce && it.id < it2.id)) +} + +func (it *Item) updateNext() error { + newNext, err := it.cron.Next(time.Unix(it.next, 0)) + if err != nil { + return err + } + it.next = newNext.UTC().Unix() + it.when = it.next + it.Offset + return nil }