From 11fe3acf05dfe765e038ca7d2af158d15dd64296 Mon Sep 17 00:00:00 2001 From: Mark Rushakoff Date: Fri, 17 Aug 2018 11:03:12 -0700 Subject: [PATCH] refactor(task): use CreateNextRun in scheduler --- Gopkg.lock | 2 + task/backend/coordinator/coordinator.go | 25 +- task/backend/meta.go | 18 ++ task/backend/scheduler.go | 319 +++++++++--------------- task/backend/scheduler_test.go | 89 ++++--- task/mock/scheduler.go | 52 +++- 6 files changed, 238 insertions(+), 267 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 2145ec59df..4f598065af 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1092,6 +1092,8 @@ "github.com/kevinburke/go-bindata", "github.com/mna/pigeon", "github.com/opentracing/opentracing-go", + "github.com/opentracing/opentracing-go/ext", + "github.com/opentracing/opentracing-go/log", "github.com/pkg/errors", "github.com/prometheus/client_golang/prometheus", "github.com/prometheus/client_golang/prometheus/promhttp", diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index d7a8b5c6c5..f7e34618d5 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -3,11 +3,9 @@ package coordinator import ( "context" "fmt" - "time" "github.com/influxdata/platform" "github.com/influxdata/platform/task/backend" - "github.com/influxdata/platform/task/options" ) type Coordinator struct { @@ -41,11 +39,6 @@ func New(scheduler backend.Scheduler, st backend.Store, opts ...Option) backend. } func (c *Coordinator) CreateTask(ctx context.Context, org, user platform.ID, script string, scheduleAfter int64) (platform.ID, error) { - opt, err := options.FromScript(script) - if err != nil { - return nil, err - } - id, err := c.Store.CreateTask(ctx, org, user, script, scheduleAfter) if err != nil { return id, err @@ -56,7 +49,12 @@ func (c *Coordinator) CreateTask(ctx context.Context, org, user platform.ID, scr return id, err } - if err := c.sch.ClaimTask(task, time.Now().UTC().Unix(), &opt); err != nil { + meta, err := c.Store.FindTaskMetaByID(ctx, id) + if err != nil { + return id, err + } + + if err := c.sch.ClaimTask(task, meta); err != nil { _, delErr := c.Store.DeleteTask(ctx, id) if delErr != nil { return id, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr) @@ -68,11 +66,6 @@ func (c *Coordinator) CreateTask(ctx context.Context, org, user platform.ID, scr } func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript string) error { - opt, err := options.FromScript(newScript) - if err != nil { - return err - } - if err := c.Store.ModifyTask(ctx, id, newScript); err != nil { return err } @@ -91,7 +84,7 @@ func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript return err } - if err := c.sch.ClaimTask(task, meta.LastCompleted, &opt); err != nil { + if err := c.sch.ClaimTask(task, meta); err != nil { return err } @@ -108,12 +101,12 @@ func (c *Coordinator) EnableTask(ctx context.Context, id platform.ID) error { return err } - opt, err := options.FromScript(task.Script) + meta, err := c.Store.FindTaskMetaByID(ctx, id) if err != nil { return err } - if err := c.sch.ClaimTask(task, time.Now().UTC().Unix(), &opt); err != nil { + if err := c.sch.ClaimTask(task, meta); err != nil { return err } diff --git a/task/backend/meta.go b/task/backend/meta.go index de500cbde7..8eeb4d0465 100644 --- a/task/backend/meta.go +++ b/task/backend/meta.go @@ -38,6 +38,8 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e return RunCreation{}, errors.New("cannot create next run when max concurrency already reached") } + // Not calling stm.DueAt here because we use sch a second time later. + // We can definitely optimize (minimize) cron parsing at a later point in time. sch, err := cron.Parse(stm.EffectiveCron) if err != nil { return RunCreation{}, err @@ -76,3 +78,19 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e NextDue: sch.Next(nextScheduled).Unix() + int64(stm.Delay), }, nil } + +func (stm *StoreTaskMeta) NextDueRun() (int64, error) { + sch, err := cron.Parse(stm.EffectiveCron) + if err != nil { + return 0, err + } + + latest := stm.LastCompleted + for _, cr := range stm.CurrentlyRunning { + if cr.Now > latest { + latest = cr.Now + } + } + + return sch.Next(time.Unix(latest, 0)).Unix() + int64(stm.Delay), nil +} diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 43410dba10..c9d4c0ff57 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -4,15 +4,14 @@ import ( "context" "errors" "fmt" + "math" "sync" "sync/atomic" "time" "github.com/influxdata/platform" - "github.com/influxdata/platform/task/options" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" - "gopkg.in/robfig/cron.v2" ) var ErrRunCanceled = errors.New("run canceled") @@ -20,9 +19,8 @@ var ErrTaskNotClaimed = errors.New("task not claimed") // DesiredState persists the desired state of a run. type DesiredState interface { - // CreateRun returns a run ID for a task and a now timestamp. - // If a run already exists for taskID and now, CreateRun must return an error without queuing a new run. - CreateRun(ctx context.Context, taskID platform.ID, now int64) (QueuedRun, error) + // CreateNextRun requests the next run from the desired state, occurring no later than the Unix timestamp now. + CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error) // FinishRun indicates that the given run is no longer intended to be executed. // This may be called after a successful or failed execution, or upon cancellation. @@ -88,10 +86,7 @@ type Scheduler interface { Tick(now int64) // ClaimTask begins control of task execution in this scheduler. - // The timing schedule for the task is parsed from the script. - // startExecutionFrom is an exclusive timestamp, after which execution should start; - // you can set startExecutionFrom in the past to backfill a task. - ClaimTask(task *StoreTask, startExecutionFrom int64, opt *options.Options) error + ClaimTask(task *StoreTask, meta *StoreTaskMeta) error // ReleaseTask immediately cancels any in-progress runs for the given task ID, // and releases any resources related to management of that task. @@ -115,23 +110,6 @@ func WithTicker(ctx context.Context, d time.Duration) SchedulerOption { } } -func WithCronTimer(ctx context.Context) SchedulerOption { - return func(s Scheduler) { - switch sched := s.(type) { - case *outerScheduler: - sched.cronTimer = cron.New() - sched.cronTimer.Start() - - go func() { - <-ctx.Done() - sched.cronTimer.Stop() - }() - default: - panic(fmt.Sprintf("cannot apply WithCronTimer to Scheduler of type %T", s)) - } - } -} - // WithLogger sets the logger for the scheduler. // If not set, the scheduler will use a no-op logger. func WithLogger(logger *zap.Logger) SchedulerOption { @@ -148,13 +126,13 @@ func WithLogger(logger *zap.Logger) SchedulerOption { // NewScheduler returns a new scheduler with the given desired state and the given now UTC timestamp. func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, now int64, opts ...SchedulerOption) Scheduler { o := &outerScheduler{ - desiredState: desiredState, - executor: executor, - logWriter: lw, - now: now, - tasks: make(map[string]*taskScheduler), - logger: zap.NewNop(), - metrics: newSchedulerMetrics(), + desiredState: desiredState, + executor: executor, + logWriter: lw, + now: now, + taskSchedulers: make(map[string]*taskScheduler), + logger: zap.NewNop(), + metrics: newSchedulerMetrics(), } for _, opt := range opts { @@ -169,93 +147,68 @@ type outerScheduler struct { executor Executor logWriter LogWriter - now int64 - logger *zap.Logger - cronTimer *cron.Cron + now int64 + logger *zap.Logger metrics *schedulerMetrics - mu sync.Mutex - - tasks map[string]*taskScheduler + schedulerMu sync.Mutex // Protects access and modification of taskSchedulers map. + taskSchedulers map[string]*taskScheduler // Stringified task ID -> task scheduler. } func (s *outerScheduler) Tick(now int64) { atomic.StoreInt64(&s.now, now) - s.mu.Lock() - defer s.mu.Unlock() + s.schedulerMu.Lock() + defer s.schedulerMu.Unlock() - for _, ts := range s.tasks { - ts.Start(now) + for _, ts := range s.taskSchedulers { + if now >= ts.NextDue() { + ts.Work(now) + } } } -func (s *outerScheduler) ClaimTask(task *StoreTask, startExecutionFrom int64, opts *options.Options) (err error) { +func (s *outerScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error) { defer s.metrics.ClaimTask(err == nil) - if err := opts.Validate(); err != nil { - return fmt.Errorf("cannot claim task with invalid options: %v", err) - } - - timer := opts.EffectiveCronString() - if timer == "" { - return errors.New("cannot claim task without a schedule") - } - sch, err := cron.Parse(timer) + ts, err := newTaskScheduler(s, task, meta, s.metrics) if err != nil { - return fmt.Errorf("error parsing cron expression: %v", err) + return err } - ts := newTaskScheduler( - s, - task, - sch, - startExecutionFrom, - uint8(opts.Concurrency), - ) - - if s.cronTimer != nil { - cronID, err := s.cronTimer.AddFunc(timer, func() { - ts.Start(time.Now().Unix()) - }) - if err != nil { - return fmt.Errorf("error starting cron timer: %v", err) - } - ts.cronID = cronID - } - - s.mu.Lock() - _, ok := s.tasks[task.ID.String()] + tid := task.ID.String() + s.schedulerMu.Lock() + _, ok := s.taskSchedulers[tid] if ok { - s.mu.Unlock() + s.schedulerMu.Unlock() return errors.New("task has already been claimed") } - s.tasks[task.ID.String()] = ts + s.taskSchedulers[tid] = ts - s.mu.Unlock() + s.schedulerMu.Unlock() - ts.Start(s.now) + // Okay to read ts.nextDue without locking, + // because we just created it and there won't be any concurrent access. + if now := atomic.LoadInt64(&s.now); now >= ts.nextDue { + ts.Work(now) + } return nil } func (s *outerScheduler) ReleaseTask(taskID platform.ID) error { - s.mu.Lock() - defer s.mu.Unlock() + s.schedulerMu.Lock() + defer s.schedulerMu.Unlock() tid := taskID.String() - t, ok := s.tasks[tid] + t, ok := s.taskSchedulers[tid] if !ok { return ErrTaskNotClaimed } - if s.cronTimer != nil { - s.cronTimer.Remove(t.cronID) - } - t.Cancel() - delete(s.tasks, tid) + delete(s.taskSchedulers, tid) s.metrics.ReleaseTask(tid) @@ -271,65 +224,56 @@ type taskScheduler struct { // Task we are scheduling for. task *StoreTask - // Seconds since UTC epoch. - now int64 - // CancelFunc for context passed to runners, to enable Cancel method. cancel context.CancelFunc - // cronID is used when we need to remove a taskSchedule from the cron scheduler. - cronID cron.EntryID - // Fixed-length slice of runners. runners []*runner - // Record updates to run state. - logWriter LogWriter - logger *zap.Logger + + metrics *schedulerMetrics + + nextDueMu sync.RWMutex // Protects following fields. + nextDue int64 // Unix timestamp of next due. + nextDueSource int64 // Run time that produced nextDue. } func newTaskScheduler( s *outerScheduler, task *StoreTask, - cron cron.Schedule, - startExecutionFrom int64, - concurrencyLimit uint8, -) *taskScheduler { - firstScheduled := cron.Next(time.Unix(startExecutionFrom, 0).UTC()).Unix() - ctx, cancel := context.WithCancel(context.Background()) - ts := &taskScheduler{ - task: task, - now: startExecutionFrom, - cancel: cancel, - runners: make([]*runner, concurrencyLimit), - logger: s.logger.With(zap.String("task_id", task.ID.String())), + meta *StoreTaskMeta, + metrics *schedulerMetrics, +) (*taskScheduler, error) { + firstDue, err := meta.NextDueRun() + if err != nil { + return nil, err } - tt := &taskTimer{ - taskNow: &ts.now, - cron: cron, - - nextScheduledRun: firstScheduled, - latestInProgress: startExecutionFrom, - - metrics: s.metrics, + ctx, cancel := context.WithCancel(context.Background()) + ts := &taskScheduler{ + task: task, + cancel: cancel, + runners: make([]*runner, meta.MaxConcurrency), + logger: s.logger.With(zap.String("task_id", task.ID.String())), + metrics: s.metrics, + nextDue: firstDue, + nextDueSource: math.MinInt64, } for i := range ts.runners { logger := ts.logger.With(zap.Int("run_slot", i)) - ts.runners[i] = newRunner(ctx, logger, task, s.desiredState, s.executor, s.logWriter, tt) + ts.runners[i] = newRunner(ctx, logger, task, s.desiredState, s.executor, s.logWriter, ts) } - return ts + return ts, nil } -// Start enqueues as many immediate jobs as possible, -// without exceeding the now timestamp from the outer scheduler. -func (ts *taskScheduler) Start(now int64) { - atomic.StoreInt64(&ts.now, now) +// Work begins a work cycle on the taskScheduler. +// As many runners are started as possible. +func (ts *taskScheduler) Work(now int64) { for _, r := range ts.runners { - r.Start() + r.Start(now) if r.IsIdle() { // Ran out of jobs to start. break @@ -337,63 +281,28 @@ func (ts *taskScheduler) Start(now int64) { } } +// Cancel interrupts this taskScheduler and its runners. func (ts *taskScheduler) Cancel() { ts.cancel() } -// taskTimer holds information about global timing, and scheduled and in-progress runs of a task. -// A single taskTimer is shared among many runners in a taskScheduler. -type taskTimer struct { - // Reference to task scheduler's now field that gets updated by Start(). - // By using a pointer here we are allowing any Start() calls to the task scheduler to - // update the taskTimer with a single atomic update. - // This value must be accessed with sync.Atomic. - taskNow *int64 - - // Schedule of task. - cron cron.Schedule - - metrics *schedulerMetrics - - mu sync.RWMutex - - // Timestamp of the next scheduled run. - nextScheduledRun int64 - - // Timestamp of the latest run in progress, i.e. a run that has been created. - // This value is not affected by a single runner going idle. - latestInProgress int64 +// NextDue returns the next due timestamp. +func (ts *taskScheduler) NextDue() int64 { + ts.nextDueMu.RLock() + defer ts.nextDueMu.RUnlock() + return ts.nextDue } -// NextScheduledRun returns the timestamp of the next run that should be scheduled, -// and whether it is okay to schedule that run now. -func (tt *taskTimer) NextScheduledRun() (int64, bool) { - tt.mu.RLock() - next := tt.nextScheduledRun - tt.mu.RUnlock() - - return next, next <= atomic.LoadInt64(tt.taskNow) -} - -// StartRun updates tt's internal state to indicate that a run is starting with the given timestamp. -func (tt *taskTimer) StartRun(now int64) { - tt.mu.Lock() - - if now > tt.latestInProgress { - tt.latestInProgress = now - } - if tt.latestInProgress == tt.nextScheduledRun { - tt.nextScheduledRun = tt.cron.Next(time.Unix(now, 0).UTC()).Unix() - } else if tt.latestInProgress > tt.nextScheduledRun { - panic(fmt.Sprintf("skipped a scheduled run: %d", tt.nextScheduledRun)) - } - - tt.mu.Unlock() +// SetNextDue sets the next due timestamp and records the source (the now value of the run who reported nextDue). +func (ts *taskScheduler) SetNextDue(nextDue, source int64) { + // TODO(mr): we may need some logic around source to handle if SetNextDue is called out of order. + ts.nextDueMu.Lock() + defer ts.nextDueMu.Unlock() + ts.nextDue = nextDue + ts.nextDueSource = source } // A runner is one eligible "concurrency slot" for a given task. -// Each runner in a taskScheduler shares a taskTimer, and by locking that taskTimer they decide whether -// there is a new run that needs to be created and executed. type runner struct { state *uint32 @@ -406,7 +315,8 @@ type runner struct { executor Executor logWriter LogWriter - tt *taskTimer + // Parent taskScheduler. + ts *taskScheduler logger *zap.Logger } @@ -418,7 +328,7 @@ func newRunner( desiredState DesiredState, executor Executor, logWriter LogWriter, - tt *taskTimer, + ts *taskScheduler, ) *runner { return &runner{ ctx: ctx, @@ -427,7 +337,7 @@ func newRunner( desiredState: desiredState, executor: executor, logWriter: logWriter, - tt: tt, + ts: ts, logger: logger, } } @@ -451,46 +361,48 @@ func (r *runner) IsIdle() bool { // Start checks if a new run is ready to be scheduled, and if so, // creates a run on this goroutine and begins executing it on a separate goroutine. -func (r *runner) Start() { +func (r *runner) Start(now int64) { if !atomic.CompareAndSwapUint32(r.state, runnerIdle, runnerWorking) { // Already working. Cannot start. return } - r.startFromWorking() + r.startFromWorking(now) } // startFromWorking attempts to create a run if one is due, and then begins execution on a separate goroutine. // r.state must be runnerWorking when this is called. -func (r *runner) startFromWorking() { - if next, ready := r.tt.NextScheduledRun(); ready { - // It's possible that two runners may attempt to create the same run for this "next" timestamp, - // but the contract of DesiredState requires that only one succeeds. - qr, err := r.desiredState.CreateRun(r.ctx, r.task.ID, next) - if err != nil { - r.logger.Info("Failed to create run", zap.Error(err)) - atomic.StoreUint32(r.state, runnerIdle) - return - } - - // Create a new child logger for the individual run. - // We can't do r.logger = r.logger.With(zap.String("run_id", qr.RunID.String()) because zap doesn't deduplicate fields, - // and we'll quickly end up with many run_ids associated with the log. - runLogger := r.logger.With(zap.String("run_id", qr.RunID.String())) - - r.tt.StartRun(next) - - go r.executeAndWait(qr, runLogger) - - r.updateRunState(qr, RunStarted, runLogger) +func (r *runner) startFromWorking(now int64) { + if now < r.ts.NextDue() { + // Not ready for a new run. Go idle again. + atomic.StoreUint32(r.state, runnerIdle) return } - // Wasn't ready for a new run, so we're idle again. - atomic.StoreUint32(r.state, runnerIdle) + rc, err := r.desiredState.CreateNextRun(r.ctx, r.task.ID, now) + if err != nil { + r.logger.Info("Failed to create run", zap.Error(err)) + atomic.StoreUint32(r.state, runnerIdle) + return + } + qr := rc.Created + r.ts.SetNextDue(rc.NextDue, qr.Now) + + // Create a new child logger for the individual run. + // We can't do r.logger = r.logger.With(zap.String("run_id", qr.RunID.String()) because zap doesn't deduplicate fields, + // and we'll quickly end up with many run_ids associated with the log. + runLogger := r.logger.With(zap.String("run_id", qr.RunID.String()), zap.Int64("now", qr.Now)) + + // TODO(mr): this used to record metrics or something? + // r.tt.StartRun(next) + + runLogger.Info("Beginning execution") + go r.executeAndWait(now, qr, runLogger) + + r.updateRunState(qr, RunStarted, runLogger) } -func (r *runner) executeAndWait(qr QueuedRun, runLogger *zap.Logger) { +func (r *runner) executeAndWait(now int64, qr QueuedRun, runLogger *zap.Logger) { rp, err := r.executor.Execute(r.ctx, qr) if err != nil { // TODO(mr): retry? and log error. @@ -536,19 +448,20 @@ func (r *runner) executeAndWait(qr QueuedRun, runLogger *zap.Logger) { return } r.updateRunState(qr, RunSuccess, runLogger) + runLogger.Info("Execution succeeded") // Check again if there is a new run available, without returning to idle state. - r.startFromWorking() + r.startFromWorking(now) } func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger) { switch s { case RunStarted: - r.tt.metrics.StartRun(r.task.ID.String()) + r.ts.metrics.StartRun(r.task.ID.String()) case RunSuccess: - r.tt.metrics.FinishRun(r.task.ID.String(), true) + r.ts.metrics.FinishRun(r.task.ID.String(), true) case RunFail, RunCanceled: - r.tt.metrics.FinishRun(r.task.ID.String(), false) + r.ts.metrics.FinishRun(r.task.ID.String(), false) default: // We are deliberately not handling RunQueued yet. // There is not really a notion of being queued in this runner architecture. diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index c6b50e3aa8..e6cbb8a574 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -4,7 +4,6 @@ import ( "context" "errors" "testing" - "time" "github.com/influxdata/platform" "github.com/influxdata/platform/kit/prom" @@ -12,49 +11,24 @@ import ( _ "github.com/influxdata/platform/query/builtin" "github.com/influxdata/platform/task/backend" "github.com/influxdata/platform/task/mock" - "github.com/influxdata/platform/task/options" + "go.uber.org/zap/zaptest" ) -func TestScheduler_EveryValidation(t *testing.T) { - d := mock.NewDesiredState() - e := mock.NewExecutor() - o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) - task := &backend.StoreTask{ - ID: platform.ID{1}, - } - - badOptions := []options.Options{ - { - Every: time.Millisecond, - }, - { - Every: time.Hour * -1, - }, - { - Every: 1500 * time.Millisecond, - }, - { - Every: 1232 * time.Millisecond, - }, - } - - for _, badOption := range badOptions { - if err := o.ClaimTask(task, 3, &badOption); err == nil { - t.Fatal("no error returned for :", badOption) - } - } -} - func TestScheduler_StartScriptOnClaim(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() - o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) + o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5, backend.WithLogger(zaptest.NewLogger(t))) task := &backend.StoreTask{ ID: platform.ID{1}, } - opts := &options.Options{Every: time.Minute, Name: "x", Retry: 1, Concurrency: 1} - if err := o.ClaimTask(task, 3, opts); err != nil { + meta := &backend.StoreTaskMeta{ + MaxConcurrency: 1, + EffectiveCron: "* * * * *", + LastCompleted: 3, + } + d.SetTaskMeta(task.ID, *meta) + if err := o.ClaimTask(task, meta); err != nil { t.Fatal(err) } @@ -67,8 +41,13 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { task = &backend.StoreTask{ ID: platform.ID{2}, } - opts = &options.Options{Every: time.Second, Concurrency: 99, Retry: 1, Name: "y"} - if err := o.ClaimTask(task, 3, opts); err != nil { + meta = &backend.StoreTaskMeta{ + MaxConcurrency: 99, + EffectiveCron: "@every 1s", + LastCompleted: 3, + } + d.SetTaskMeta(task.ID, *meta) + if err := o.ClaimTask(task, meta); err != nil { t.Fatal(err) } @@ -85,9 +64,14 @@ func TestScheduler_CreateRunOnTick(t *testing.T) { task := &backend.StoreTask{ ID: platform.ID{1}, } + meta := &backend.StoreTaskMeta{ + MaxConcurrency: 2, + EffectiveCron: "@every 1s", + LastCompleted: 5, + } - opts := &options.Options{Every: time.Second, Concurrency: 2, Name: "x", Retry: 1} - if err := o.ClaimTask(task, 5, opts); err != nil { + d.SetTaskMeta(task.ID, *meta) + if err := o.ClaimTask(task, meta); err != nil { t.Fatal(err) } @@ -128,9 +112,14 @@ func TestScheduler_Release(t *testing.T) { task := &backend.StoreTask{ ID: platform.ID{1}, } + meta := &backend.StoreTaskMeta{ + MaxConcurrency: 99, + EffectiveCron: "@every 1s", + LastCompleted: 5, + } - opts := &options.Options{Every: time.Second, Concurrency: 99, Name: "x", Retry: 1} - if err := o.ClaimTask(task, 5, opts); err != nil { + d.SetTaskMeta(task.ID, *meta) + if err := o.ClaimTask(task, meta); err != nil { t.Fatal(err) } @@ -159,9 +148,14 @@ func TestScheduler_RunLog(t *testing.T) { task := &backend.StoreTask{ ID: platform.ID{1}, } + meta := &backend.StoreTaskMeta{ + MaxConcurrency: 99, + EffectiveCron: "@every 1s", + LastCompleted: 5, + } - opts := &options.Options{Every: time.Second, Concurrency: 99, Name: "x", Retry: 1} - if err := s.ClaimTask(task, 5, opts); err != nil { + d.SetTaskMeta(task.ID, *meta) + if err := s.ClaimTask(task, meta); err != nil { t.Fatal(err) } @@ -294,9 +288,14 @@ func TestScheduler_Metrics(t *testing.T) { task := &backend.StoreTask{ ID: platform.ID{1}, } + meta := &backend.StoreTaskMeta{ + MaxConcurrency: 99, + EffectiveCron: "@every 1s", + LastCompleted: 5, + } - opts := &options.Options{Every: time.Second, Concurrency: 99, Name: "x", Retry: 1} - if err := s.ClaimTask(task, 5, opts); err != nil { + d.SetTaskMeta(task.ID, *meta) + if err := s.ClaimTask(task, meta); err != nil { t.Fatal(err) } diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index 3ef5a7266a..18b44de646 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -13,7 +13,6 @@ import ( "github.com/influxdata/platform" "github.com/influxdata/platform/task/backend" scheduler "github.com/influxdata/platform/task/backend" - "github.com/influxdata/platform/task/options" "go.uber.org/zap" ) @@ -24,6 +23,7 @@ type Scheduler struct { lastTick int64 claims map[string]*Task + meta map[string]backend.StoreTaskMeta createChan chan *Task releaseChan chan *Task @@ -42,6 +42,7 @@ type Task struct { func NewScheduler() *Scheduler { return &Scheduler{ claims: map[string]*Task{}, + meta: map[string]backend.StoreTaskMeta{}, } } @@ -54,7 +55,7 @@ func (s *Scheduler) Tick(now int64) { func (s *Scheduler) WithLogger(l *zap.Logger) {} -func (s *Scheduler) ClaimTask(task *backend.StoreTask, startExecutionFrom int64, opts *options.Options) error { +func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error { if s.claimError != nil { return s.claimError } @@ -66,8 +67,9 @@ func (s *Scheduler) ClaimTask(task *backend.StoreTask, startExecutionFrom int64, if ok { return errors.New("task already in list") } + s.meta[task.ID.String()] = *meta - t := &Task{task.Script, startExecutionFrom, uint8(opts.Concurrency)} + t := &Task{task.Script, meta.LastCompleted, uint8(meta.MaxConcurrency)} s.claims[task.ID.String()] = t @@ -95,6 +97,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error { } delete(s.claims, taskID.String()) + delete(s.meta, taskID.String()) return nil } @@ -130,6 +133,9 @@ type DesiredState struct { // Map of stringified, concatenated task and platform ID, to runs that have been created. created map[string]backend.QueuedRun + + // Map of stringified task ID to task meta. + meta map[string]backend.StoreTaskMeta } var _ backend.DesiredState = (*DesiredState)(nil) @@ -138,6 +144,7 @@ func NewDesiredState() *DesiredState { return &DesiredState{ runIDs: make(map[string]uint32), created: make(map[string]backend.QueuedRun), + meta: make(map[string]backend.StoreTaskMeta), } } @@ -162,6 +169,45 @@ func (d *DesiredState) CreateRun(_ context.Context, taskID platform.ID, now int6 return qr, nil } +// SetTaskMeta sets the task meta for the given task ID. +// SetTaskMeta must be called before CreateNextRun, for a given task ID. +func (d *DesiredState) SetTaskMeta(taskID platform.ID, meta backend.StoreTaskMeta) { + d.mu.Lock() + defer d.mu.Unlock() + + d.meta[taskID.String()] = meta +} + +// CreateNextRun creates the next run for the given task. +// Refer to the documentation for SetTaskPeriod to understand how the times are determined. +func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) { + d.mu.Lock() + defer d.mu.Unlock() + + tid := taskID.String() + + meta, ok := d.meta[tid] + if !ok { + panic(fmt.Sprintf("meta not set for task with ID %s", tid)) + } + + makeID := func() (platform.ID, error) { + d.runIDs[tid]++ + runID := make([]byte, 4) + binary.BigEndian.PutUint32(runID, d.runIDs[tid]) + return platform.ID(runID), nil + } + + rc, err := meta.CreateNextRun(now, makeID) + if err != nil { + return backend.RunCreation{}, err + } + d.meta[tid] = meta + rc.Created.TaskID = append([]byte(nil), taskID...) + d.created[tid+rc.Created.RunID.String()] = rc.Created + return rc, nil +} + func (d *DesiredState) FinishRun(_ context.Context, taskID, runID platform.ID) error { d.mu.Lock() defer d.mu.Unlock()