refactor(task): use CreateNextRun in scheduler

pull/10616/head
Mark Rushakoff 2018-08-17 11:03:12 -07:00
parent 64ad4bd1c2
commit 11fe3acf05
6 changed files with 238 additions and 267 deletions

2
Gopkg.lock generated
View File

@ -1092,6 +1092,8 @@
"github.com/kevinburke/go-bindata",
"github.com/mna/pigeon",
"github.com/opentracing/opentracing-go",
"github.com/opentracing/opentracing-go/ext",
"github.com/opentracing/opentracing-go/log",
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promhttp",

View File

@ -3,11 +3,9 @@ package coordinator
import (
"context"
"fmt"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/options"
)
type Coordinator struct {
@ -41,11 +39,6 @@ func New(scheduler backend.Scheduler, st backend.Store, opts ...Option) backend.
}
func (c *Coordinator) CreateTask(ctx context.Context, org, user platform.ID, script string, scheduleAfter int64) (platform.ID, error) {
opt, err := options.FromScript(script)
if err != nil {
return nil, err
}
id, err := c.Store.CreateTask(ctx, org, user, script, scheduleAfter)
if err != nil {
return id, err
@ -56,7 +49,12 @@ func (c *Coordinator) CreateTask(ctx context.Context, org, user platform.ID, scr
return id, err
}
if err := c.sch.ClaimTask(task, time.Now().UTC().Unix(), &opt); err != nil {
meta, err := c.Store.FindTaskMetaByID(ctx, id)
if err != nil {
return id, err
}
if err := c.sch.ClaimTask(task, meta); err != nil {
_, delErr := c.Store.DeleteTask(ctx, id)
if delErr != nil {
return id, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
@ -68,11 +66,6 @@ func (c *Coordinator) CreateTask(ctx context.Context, org, user platform.ID, scr
}
func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript string) error {
opt, err := options.FromScript(newScript)
if err != nil {
return err
}
if err := c.Store.ModifyTask(ctx, id, newScript); err != nil {
return err
}
@ -91,7 +84,7 @@ func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript
return err
}
if err := c.sch.ClaimTask(task, meta.LastCompleted, &opt); err != nil {
if err := c.sch.ClaimTask(task, meta); err != nil {
return err
}
@ -108,12 +101,12 @@ func (c *Coordinator) EnableTask(ctx context.Context, id platform.ID) error {
return err
}
opt, err := options.FromScript(task.Script)
meta, err := c.Store.FindTaskMetaByID(ctx, id)
if err != nil {
return err
}
if err := c.sch.ClaimTask(task, time.Now().UTC().Unix(), &opt); err != nil {
if err := c.sch.ClaimTask(task, meta); err != nil {
return err
}

View File

@ -38,6 +38,8 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e
return RunCreation{}, errors.New("cannot create next run when max concurrency already reached")
}
// Not calling stm.DueAt here because we use sch a second time later.
// We can definitely optimize (minimize) cron parsing at a later point in time.
sch, err := cron.Parse(stm.EffectiveCron)
if err != nil {
return RunCreation{}, err
@ -76,3 +78,19 @@ func (stm *StoreTaskMeta) CreateNextRun(now int64, makeID func() (platform.ID, e
NextDue: sch.Next(nextScheduled).Unix() + int64(stm.Delay),
}, nil
}
func (stm *StoreTaskMeta) NextDueRun() (int64, error) {
sch, err := cron.Parse(stm.EffectiveCron)
if err != nil {
return 0, err
}
latest := stm.LastCompleted
for _, cr := range stm.CurrentlyRunning {
if cr.Now > latest {
latest = cr.Now
}
}
return sch.Next(time.Unix(latest, 0)).Unix() + int64(stm.Delay), nil
}

View File

@ -4,15 +4,14 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/task/options"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"gopkg.in/robfig/cron.v2"
)
var ErrRunCanceled = errors.New("run canceled")
@ -20,9 +19,8 @@ var ErrTaskNotClaimed = errors.New("task not claimed")
// DesiredState persists the desired state of a run.
type DesiredState interface {
// CreateRun returns a run ID for a task and a now timestamp.
// If a run already exists for taskID and now, CreateRun must return an error without queuing a new run.
CreateRun(ctx context.Context, taskID platform.ID, now int64) (QueuedRun, error)
// CreateNextRun requests the next run from the desired state, occurring no later than the Unix timestamp now.
CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error)
// FinishRun indicates that the given run is no longer intended to be executed.
// This may be called after a successful or failed execution, or upon cancellation.
@ -88,10 +86,7 @@ type Scheduler interface {
Tick(now int64)
// ClaimTask begins control of task execution in this scheduler.
// The timing schedule for the task is parsed from the script.
// startExecutionFrom is an exclusive timestamp, after which execution should start;
// you can set startExecutionFrom in the past to backfill a task.
ClaimTask(task *StoreTask, startExecutionFrom int64, opt *options.Options) error
ClaimTask(task *StoreTask, meta *StoreTaskMeta) error
// ReleaseTask immediately cancels any in-progress runs for the given task ID,
// and releases any resources related to management of that task.
@ -115,23 +110,6 @@ func WithTicker(ctx context.Context, d time.Duration) SchedulerOption {
}
}
func WithCronTimer(ctx context.Context) SchedulerOption {
return func(s Scheduler) {
switch sched := s.(type) {
case *outerScheduler:
sched.cronTimer = cron.New()
sched.cronTimer.Start()
go func() {
<-ctx.Done()
sched.cronTimer.Stop()
}()
default:
panic(fmt.Sprintf("cannot apply WithCronTimer to Scheduler of type %T", s))
}
}
}
// WithLogger sets the logger for the scheduler.
// If not set, the scheduler will use a no-op logger.
func WithLogger(logger *zap.Logger) SchedulerOption {
@ -148,13 +126,13 @@ func WithLogger(logger *zap.Logger) SchedulerOption {
// NewScheduler returns a new scheduler with the given desired state and the given now UTC timestamp.
func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, now int64, opts ...SchedulerOption) Scheduler {
o := &outerScheduler{
desiredState: desiredState,
executor: executor,
logWriter: lw,
now: now,
tasks: make(map[string]*taskScheduler),
logger: zap.NewNop(),
metrics: newSchedulerMetrics(),
desiredState: desiredState,
executor: executor,
logWriter: lw,
now: now,
taskSchedulers: make(map[string]*taskScheduler),
logger: zap.NewNop(),
metrics: newSchedulerMetrics(),
}
for _, opt := range opts {
@ -169,93 +147,68 @@ type outerScheduler struct {
executor Executor
logWriter LogWriter
now int64
logger *zap.Logger
cronTimer *cron.Cron
now int64
logger *zap.Logger
metrics *schedulerMetrics
mu sync.Mutex
tasks map[string]*taskScheduler
schedulerMu sync.Mutex // Protects access and modification of taskSchedulers map.
taskSchedulers map[string]*taskScheduler // Stringified task ID -> task scheduler.
}
func (s *outerScheduler) Tick(now int64) {
atomic.StoreInt64(&s.now, now)
s.mu.Lock()
defer s.mu.Unlock()
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
for _, ts := range s.tasks {
ts.Start(now)
for _, ts := range s.taskSchedulers {
if now >= ts.NextDue() {
ts.Work(now)
}
}
}
func (s *outerScheduler) ClaimTask(task *StoreTask, startExecutionFrom int64, opts *options.Options) (err error) {
func (s *outerScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error) {
defer s.metrics.ClaimTask(err == nil)
if err := opts.Validate(); err != nil {
return fmt.Errorf("cannot claim task with invalid options: %v", err)
}
timer := opts.EffectiveCronString()
if timer == "" {
return errors.New("cannot claim task without a schedule")
}
sch, err := cron.Parse(timer)
ts, err := newTaskScheduler(s, task, meta, s.metrics)
if err != nil {
return fmt.Errorf("error parsing cron expression: %v", err)
return err
}
ts := newTaskScheduler(
s,
task,
sch,
startExecutionFrom,
uint8(opts.Concurrency),
)
if s.cronTimer != nil {
cronID, err := s.cronTimer.AddFunc(timer, func() {
ts.Start(time.Now().Unix())
})
if err != nil {
return fmt.Errorf("error starting cron timer: %v", err)
}
ts.cronID = cronID
}
s.mu.Lock()
_, ok := s.tasks[task.ID.String()]
tid := task.ID.String()
s.schedulerMu.Lock()
_, ok := s.taskSchedulers[tid]
if ok {
s.mu.Unlock()
s.schedulerMu.Unlock()
return errors.New("task has already been claimed")
}
s.tasks[task.ID.String()] = ts
s.taskSchedulers[tid] = ts
s.mu.Unlock()
s.schedulerMu.Unlock()
ts.Start(s.now)
// Okay to read ts.nextDue without locking,
// because we just created it and there won't be any concurrent access.
if now := atomic.LoadInt64(&s.now); now >= ts.nextDue {
ts.Work(now)
}
return nil
}
func (s *outerScheduler) ReleaseTask(taskID platform.ID) error {
s.mu.Lock()
defer s.mu.Unlock()
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
tid := taskID.String()
t, ok := s.tasks[tid]
t, ok := s.taskSchedulers[tid]
if !ok {
return ErrTaskNotClaimed
}
if s.cronTimer != nil {
s.cronTimer.Remove(t.cronID)
}
t.Cancel()
delete(s.tasks, tid)
delete(s.taskSchedulers, tid)
s.metrics.ReleaseTask(tid)
@ -271,65 +224,56 @@ type taskScheduler struct {
// Task we are scheduling for.
task *StoreTask
// Seconds since UTC epoch.
now int64
// CancelFunc for context passed to runners, to enable Cancel method.
cancel context.CancelFunc
// cronID is used when we need to remove a taskSchedule from the cron scheduler.
cronID cron.EntryID
// Fixed-length slice of runners.
runners []*runner
// Record updates to run state.
logWriter LogWriter
logger *zap.Logger
metrics *schedulerMetrics
nextDueMu sync.RWMutex // Protects following fields.
nextDue int64 // Unix timestamp of next due.
nextDueSource int64 // Run time that produced nextDue.
}
func newTaskScheduler(
s *outerScheduler,
task *StoreTask,
cron cron.Schedule,
startExecutionFrom int64,
concurrencyLimit uint8,
) *taskScheduler {
firstScheduled := cron.Next(time.Unix(startExecutionFrom, 0).UTC()).Unix()
ctx, cancel := context.WithCancel(context.Background())
ts := &taskScheduler{
task: task,
now: startExecutionFrom,
cancel: cancel,
runners: make([]*runner, concurrencyLimit),
logger: s.logger.With(zap.String("task_id", task.ID.String())),
meta *StoreTaskMeta,
metrics *schedulerMetrics,
) (*taskScheduler, error) {
firstDue, err := meta.NextDueRun()
if err != nil {
return nil, err
}
tt := &taskTimer{
taskNow: &ts.now,
cron: cron,
nextScheduledRun: firstScheduled,
latestInProgress: startExecutionFrom,
metrics: s.metrics,
ctx, cancel := context.WithCancel(context.Background())
ts := &taskScheduler{
task: task,
cancel: cancel,
runners: make([]*runner, meta.MaxConcurrency),
logger: s.logger.With(zap.String("task_id", task.ID.String())),
metrics: s.metrics,
nextDue: firstDue,
nextDueSource: math.MinInt64,
}
for i := range ts.runners {
logger := ts.logger.With(zap.Int("run_slot", i))
ts.runners[i] = newRunner(ctx, logger, task, s.desiredState, s.executor, s.logWriter, tt)
ts.runners[i] = newRunner(ctx, logger, task, s.desiredState, s.executor, s.logWriter, ts)
}
return ts
return ts, nil
}
// Start enqueues as many immediate jobs as possible,
// without exceeding the now timestamp from the outer scheduler.
func (ts *taskScheduler) Start(now int64) {
atomic.StoreInt64(&ts.now, now)
// Work begins a work cycle on the taskScheduler.
// As many runners are started as possible.
func (ts *taskScheduler) Work(now int64) {
for _, r := range ts.runners {
r.Start()
r.Start(now)
if r.IsIdle() {
// Ran out of jobs to start.
break
@ -337,63 +281,28 @@ func (ts *taskScheduler) Start(now int64) {
}
}
// Cancel interrupts this taskScheduler and its runners.
func (ts *taskScheduler) Cancel() {
ts.cancel()
}
// taskTimer holds information about global timing, and scheduled and in-progress runs of a task.
// A single taskTimer is shared among many runners in a taskScheduler.
type taskTimer struct {
// Reference to task scheduler's now field that gets updated by Start().
// By using a pointer here we are allowing any Start() calls to the task scheduler to
// update the taskTimer with a single atomic update.
// This value must be accessed with sync.Atomic.
taskNow *int64
// Schedule of task.
cron cron.Schedule
metrics *schedulerMetrics
mu sync.RWMutex
// Timestamp of the next scheduled run.
nextScheduledRun int64
// Timestamp of the latest run in progress, i.e. a run that has been created.
// This value is not affected by a single runner going idle.
latestInProgress int64
// NextDue returns the next due timestamp.
func (ts *taskScheduler) NextDue() int64 {
ts.nextDueMu.RLock()
defer ts.nextDueMu.RUnlock()
return ts.nextDue
}
// NextScheduledRun returns the timestamp of the next run that should be scheduled,
// and whether it is okay to schedule that run now.
func (tt *taskTimer) NextScheduledRun() (int64, bool) {
tt.mu.RLock()
next := tt.nextScheduledRun
tt.mu.RUnlock()
return next, next <= atomic.LoadInt64(tt.taskNow)
}
// StartRun updates tt's internal state to indicate that a run is starting with the given timestamp.
func (tt *taskTimer) StartRun(now int64) {
tt.mu.Lock()
if now > tt.latestInProgress {
tt.latestInProgress = now
}
if tt.latestInProgress == tt.nextScheduledRun {
tt.nextScheduledRun = tt.cron.Next(time.Unix(now, 0).UTC()).Unix()
} else if tt.latestInProgress > tt.nextScheduledRun {
panic(fmt.Sprintf("skipped a scheduled run: %d", tt.nextScheduledRun))
}
tt.mu.Unlock()
// SetNextDue sets the next due timestamp and records the source (the now value of the run who reported nextDue).
func (ts *taskScheduler) SetNextDue(nextDue, source int64) {
// TODO(mr): we may need some logic around source to handle if SetNextDue is called out of order.
ts.nextDueMu.Lock()
defer ts.nextDueMu.Unlock()
ts.nextDue = nextDue
ts.nextDueSource = source
}
// A runner is one eligible "concurrency slot" for a given task.
// Each runner in a taskScheduler shares a taskTimer, and by locking that taskTimer they decide whether
// there is a new run that needs to be created and executed.
type runner struct {
state *uint32
@ -406,7 +315,8 @@ type runner struct {
executor Executor
logWriter LogWriter
tt *taskTimer
// Parent taskScheduler.
ts *taskScheduler
logger *zap.Logger
}
@ -418,7 +328,7 @@ func newRunner(
desiredState DesiredState,
executor Executor,
logWriter LogWriter,
tt *taskTimer,
ts *taskScheduler,
) *runner {
return &runner{
ctx: ctx,
@ -427,7 +337,7 @@ func newRunner(
desiredState: desiredState,
executor: executor,
logWriter: logWriter,
tt: tt,
ts: ts,
logger: logger,
}
}
@ -451,46 +361,48 @@ func (r *runner) IsIdle() bool {
// Start checks if a new run is ready to be scheduled, and if so,
// creates a run on this goroutine and begins executing it on a separate goroutine.
func (r *runner) Start() {
func (r *runner) Start(now int64) {
if !atomic.CompareAndSwapUint32(r.state, runnerIdle, runnerWorking) {
// Already working. Cannot start.
return
}
r.startFromWorking()
r.startFromWorking(now)
}
// startFromWorking attempts to create a run if one is due, and then begins execution on a separate goroutine.
// r.state must be runnerWorking when this is called.
func (r *runner) startFromWorking() {
if next, ready := r.tt.NextScheduledRun(); ready {
// It's possible that two runners may attempt to create the same run for this "next" timestamp,
// but the contract of DesiredState requires that only one succeeds.
qr, err := r.desiredState.CreateRun(r.ctx, r.task.ID, next)
if err != nil {
r.logger.Info("Failed to create run", zap.Error(err))
atomic.StoreUint32(r.state, runnerIdle)
return
}
// Create a new child logger for the individual run.
// We can't do r.logger = r.logger.With(zap.String("run_id", qr.RunID.String()) because zap doesn't deduplicate fields,
// and we'll quickly end up with many run_ids associated with the log.
runLogger := r.logger.With(zap.String("run_id", qr.RunID.String()))
r.tt.StartRun(next)
go r.executeAndWait(qr, runLogger)
r.updateRunState(qr, RunStarted, runLogger)
func (r *runner) startFromWorking(now int64) {
if now < r.ts.NextDue() {
// Not ready for a new run. Go idle again.
atomic.StoreUint32(r.state, runnerIdle)
return
}
// Wasn't ready for a new run, so we're idle again.
atomic.StoreUint32(r.state, runnerIdle)
rc, err := r.desiredState.CreateNextRun(r.ctx, r.task.ID, now)
if err != nil {
r.logger.Info("Failed to create run", zap.Error(err))
atomic.StoreUint32(r.state, runnerIdle)
return
}
qr := rc.Created
r.ts.SetNextDue(rc.NextDue, qr.Now)
// Create a new child logger for the individual run.
// We can't do r.logger = r.logger.With(zap.String("run_id", qr.RunID.String()) because zap doesn't deduplicate fields,
// and we'll quickly end up with many run_ids associated with the log.
runLogger := r.logger.With(zap.String("run_id", qr.RunID.String()), zap.Int64("now", qr.Now))
// TODO(mr): this used to record metrics or something?
// r.tt.StartRun(next)
runLogger.Info("Beginning execution")
go r.executeAndWait(now, qr, runLogger)
r.updateRunState(qr, RunStarted, runLogger)
}
func (r *runner) executeAndWait(qr QueuedRun, runLogger *zap.Logger) {
func (r *runner) executeAndWait(now int64, qr QueuedRun, runLogger *zap.Logger) {
rp, err := r.executor.Execute(r.ctx, qr)
if err != nil {
// TODO(mr): retry? and log error.
@ -536,19 +448,20 @@ func (r *runner) executeAndWait(qr QueuedRun, runLogger *zap.Logger) {
return
}
r.updateRunState(qr, RunSuccess, runLogger)
runLogger.Info("Execution succeeded")
// Check again if there is a new run available, without returning to idle state.
r.startFromWorking()
r.startFromWorking(now)
}
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger) {
switch s {
case RunStarted:
r.tt.metrics.StartRun(r.task.ID.String())
r.ts.metrics.StartRun(r.task.ID.String())
case RunSuccess:
r.tt.metrics.FinishRun(r.task.ID.String(), true)
r.ts.metrics.FinishRun(r.task.ID.String(), true)
case RunFail, RunCanceled:
r.tt.metrics.FinishRun(r.task.ID.String(), false)
r.ts.metrics.FinishRun(r.task.ID.String(), false)
default:
// We are deliberately not handling RunQueued yet.
// There is not really a notion of being queued in this runner architecture.

View File

@ -4,7 +4,6 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/prom"
@ -12,49 +11,24 @@ import (
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/mock"
"github.com/influxdata/platform/task/options"
"go.uber.org/zap/zaptest"
)
func TestScheduler_EveryValidation(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
task := &backend.StoreTask{
ID: platform.ID{1},
}
badOptions := []options.Options{
{
Every: time.Millisecond,
},
{
Every: time.Hour * -1,
},
{
Every: 1500 * time.Millisecond,
},
{
Every: 1232 * time.Millisecond,
},
}
for _, badOption := range badOptions {
if err := o.ClaimTask(task, 3, &badOption); err == nil {
t.Fatal("no error returned for :", badOption)
}
}
}
func TestScheduler_StartScriptOnClaim(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5, backend.WithLogger(zaptest.NewLogger(t)))
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Minute, Name: "x", Retry: 1, Concurrency: 1}
if err := o.ClaimTask(task, 3, opts); err != nil {
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "* * * * *",
LastCompleted: 3,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}
@ -67,8 +41,13 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) {
task = &backend.StoreTask{
ID: platform.ID{2},
}
opts = &options.Options{Every: time.Second, Concurrency: 99, Retry: 1, Name: "y"}
if err := o.ClaimTask(task, 3, opts); err != nil {
meta = &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LastCompleted: 3,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}
@ -85,9 +64,14 @@ func TestScheduler_CreateRunOnTick(t *testing.T) {
task := &backend.StoreTask{
ID: platform.ID{1},
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 2,
EffectiveCron: "@every 1s",
LastCompleted: 5,
}
opts := &options.Options{Every: time.Second, Concurrency: 2, Name: "x", Retry: 1}
if err := o.ClaimTask(task, 5, opts); err != nil {
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}
@ -128,9 +112,14 @@ func TestScheduler_Release(t *testing.T) {
task := &backend.StoreTask{
ID: platform.ID{1},
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LastCompleted: 5,
}
opts := &options.Options{Every: time.Second, Concurrency: 99, Name: "x", Retry: 1}
if err := o.ClaimTask(task, 5, opts); err != nil {
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}
@ -159,9 +148,14 @@ func TestScheduler_RunLog(t *testing.T) {
task := &backend.StoreTask{
ID: platform.ID{1},
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LastCompleted: 5,
}
opts := &options.Options{Every: time.Second, Concurrency: 99, Name: "x", Retry: 1}
if err := s.ClaimTask(task, 5, opts); err != nil {
d.SetTaskMeta(task.ID, *meta)
if err := s.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}
@ -294,9 +288,14 @@ func TestScheduler_Metrics(t *testing.T) {
task := &backend.StoreTask{
ID: platform.ID{1},
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LastCompleted: 5,
}
opts := &options.Options{Every: time.Second, Concurrency: 99, Name: "x", Retry: 1}
if err := s.ClaimTask(task, 5, opts); err != nil {
d.SetTaskMeta(task.ID, *meta)
if err := s.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}

View File

@ -13,7 +13,6 @@ import (
"github.com/influxdata/platform"
"github.com/influxdata/platform/task/backend"
scheduler "github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/options"
"go.uber.org/zap"
)
@ -24,6 +23,7 @@ type Scheduler struct {
lastTick int64
claims map[string]*Task
meta map[string]backend.StoreTaskMeta
createChan chan *Task
releaseChan chan *Task
@ -42,6 +42,7 @@ type Task struct {
func NewScheduler() *Scheduler {
return &Scheduler{
claims: map[string]*Task{},
meta: map[string]backend.StoreTaskMeta{},
}
}
@ -54,7 +55,7 @@ func (s *Scheduler) Tick(now int64) {
func (s *Scheduler) WithLogger(l *zap.Logger) {}
func (s *Scheduler) ClaimTask(task *backend.StoreTask, startExecutionFrom int64, opts *options.Options) error {
func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error {
if s.claimError != nil {
return s.claimError
}
@ -66,8 +67,9 @@ func (s *Scheduler) ClaimTask(task *backend.StoreTask, startExecutionFrom int64,
if ok {
return errors.New("task already in list")
}
s.meta[task.ID.String()] = *meta
t := &Task{task.Script, startExecutionFrom, uint8(opts.Concurrency)}
t := &Task{task.Script, meta.LastCompleted, uint8(meta.MaxConcurrency)}
s.claims[task.ID.String()] = t
@ -95,6 +97,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error {
}
delete(s.claims, taskID.String())
delete(s.meta, taskID.String())
return nil
}
@ -130,6 +133,9 @@ type DesiredState struct {
// Map of stringified, concatenated task and platform ID, to runs that have been created.
created map[string]backend.QueuedRun
// Map of stringified task ID to task meta.
meta map[string]backend.StoreTaskMeta
}
var _ backend.DesiredState = (*DesiredState)(nil)
@ -138,6 +144,7 @@ func NewDesiredState() *DesiredState {
return &DesiredState{
runIDs: make(map[string]uint32),
created: make(map[string]backend.QueuedRun),
meta: make(map[string]backend.StoreTaskMeta),
}
}
@ -162,6 +169,45 @@ func (d *DesiredState) CreateRun(_ context.Context, taskID platform.ID, now int6
return qr, nil
}
// SetTaskMeta sets the task meta for the given task ID.
// SetTaskMeta must be called before CreateNextRun, for a given task ID.
func (d *DesiredState) SetTaskMeta(taskID platform.ID, meta backend.StoreTaskMeta) {
d.mu.Lock()
defer d.mu.Unlock()
d.meta[taskID.String()] = meta
}
// CreateNextRun creates the next run for the given task.
// Refer to the documentation for SetTaskPeriod to understand how the times are determined.
func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) {
d.mu.Lock()
defer d.mu.Unlock()
tid := taskID.String()
meta, ok := d.meta[tid]
if !ok {
panic(fmt.Sprintf("meta not set for task with ID %s", tid))
}
makeID := func() (platform.ID, error) {
d.runIDs[tid]++
runID := make([]byte, 4)
binary.BigEndian.PutUint32(runID, d.runIDs[tid])
return platform.ID(runID), nil
}
rc, err := meta.CreateNextRun(now, makeID)
if err != nil {
return backend.RunCreation{}, err
}
d.meta[tid] = meta
rc.Created.TaskID = append([]byte(nil), taskID...)
d.created[tid+rc.Created.RunID.String()] = rc.Created
return rc, nil
}
func (d *DesiredState) FinishRun(_ context.Context, taskID, runID platform.ID) error {
d.mu.Lock()
defer d.mu.Unlock()