From 98fe5094656bdbe883c3228e177ce26175f026ae Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Tue, 25 Sep 2018 23:16:26 -0600 Subject: [PATCH] fix(task): Allow task scheduler to be stopped and started. (#857) * fix(task): Allow task scheduler to be stopped and started. * make create task scheduling a bit more strict --- cmd/influxd/main.go | 1 + task/backend/scheduler.go | 87 ++++++++++++++++++++++++++++++---- task/backend/scheduler_test.go | 12 +++++ task/mock/scheduler.go | 4 ++ 4 files changed, 95 insertions(+), 9 deletions(-) diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index 32485a94e6..83a53b70eb 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -210,6 +210,7 @@ func platformF(cmd *cobra.Command, args []string) { // TODO(lh): Replace NopLogWriter with real log writer scheduler := taskbackend.NewScheduler(boltStore, executor, taskbackend.NopLogWriter{}, time.Now().UTC().Unix()) + scheduler.Start(context.Background()) // TODO(lh): Replace NopLogReader with real log reader taskSvc = task.PlatformAdapter(coordinator.New(scheduler, boltStore), taskbackend.NopLogReader{}) diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index ac64458789..347c2ac6f8 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -85,6 +85,12 @@ type RunResult interface { // which likely means we will change the method signatures to something where // we can wait for the result to complete and possibly inspect any relevant output. type Scheduler interface { + // Start allows the scheduler to Tick. A scheduler without start will do nothing + Start(ctx context.Context) + + // Stop a scheduler from ticking. + Stop() + // Tick updates the time of the scheduler. // Any owned tasks who are due to execute and who have a free concurrency slot, // will begin a new execution. @@ -98,6 +104,7 @@ type Scheduler interface { ReleaseTask(taskID platform.ID) error } +// SchedulerOption is a option you can use to modify the schedulers behavior. type SchedulerOption func(Scheduler) func WithTicker(ctx context.Context, d time.Duration) SchedulerOption { @@ -140,6 +147,7 @@ func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, no now: now, taskSchedulers: make(map[string]*taskScheduler), logger: zap.NewNop(), + wg: &sync.WaitGroup{}, metrics: newSchedulerMetrics(), } @@ -160,16 +168,31 @@ type outerScheduler struct { metrics *schedulerMetrics + ctx context.Context + cancel context.CancelFunc + wg *sync.WaitGroup + schedulerMu sync.Mutex // Protects access and modification of taskSchedulers map. taskSchedulers map[string]*taskScheduler // Stringified task ID -> task scheduler. } func (s *outerScheduler) Tick(now int64) { - atomic.StoreInt64(&s.now, now) - s.schedulerMu.Lock() defer s.schedulerMu.Unlock() + if s.ctx == nil { + return + } + + select { + case <-s.ctx.Done(): + return + default: + // do nothing and allow ticks + } + + atomic.StoreInt64(&s.now, now) + affected := 0 for _, ts := range s.taskSchedulers { if nextDue, hasQueue := ts.NextDue(); now >= nextDue || hasQueue { @@ -180,10 +203,50 @@ func (s *outerScheduler) Tick(now int64) { s.logger.Info("Ticked", zap.Int64("now", now), zap.Int("tasks_affected", affected)) } +func (s *outerScheduler) Start(ctx context.Context) { + s.schedulerMu.Lock() + defer s.schedulerMu.Unlock() + + s.ctx, s.cancel = context.WithCancel(ctx) +} + +func (s *outerScheduler) Stop() { + defer s.wg.Wait() + + s.schedulerMu.Lock() + defer s.schedulerMu.Unlock() + + // if I was never started I cant stop + if s.cancel == nil { + return + } + + s.cancel() + + // release tasks + for id := range s.taskSchedulers { + delete(s.taskSchedulers, id) + s.metrics.ReleaseTask(id) + } +} + func (s *outerScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error) { + s.schedulerMu.Lock() + defer s.schedulerMu.Unlock() + if s.ctx == nil { + return errors.New("can not claim tasks when i've not been started") + } + + select { + case <-s.ctx.Done(): + return errors.New("can not claim a task if not started") + default: + // do nothing and allow ticks + } + defer s.metrics.ClaimTask(err == nil) - ts, err := newTaskScheduler(s, task, meta, s.metrics) + ts, err := newTaskScheduler(s.ctx, s.wg, s, task, meta, s.metrics) if err != nil { return err } @@ -195,17 +258,13 @@ func (s *outerScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err er } tid := task.ID.String() - s.schedulerMu.Lock() _, ok := s.taskSchedulers[tid] if ok { - s.schedulerMu.Unlock() return errors.New("task has already been claimed") } s.taskSchedulers[tid] = ts - s.schedulerMu.Unlock() - next, hasQueue := ts.NextDue() if now := atomic.LoadInt64(&s.now); now >= next || hasQueue { ts.Work() @@ -245,6 +304,7 @@ type taskScheduler struct { // CancelFunc for context passed to runners, to enable Cancel method. cancel context.CancelFunc + wg *sync.WaitGroup // Fixed-length slice of runners. runners []*runner @@ -260,6 +320,8 @@ type taskScheduler struct { } func newTaskScheduler( + ctx context.Context, + wg *sync.WaitGroup, s *outerScheduler, task *StoreTask, meta *StoreTaskMeta, @@ -270,11 +332,12 @@ func newTaskScheduler( return nil, err } - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) ts := &taskScheduler{ now: &s.now, task: task, cancel: cancel, + wg: wg, runners: make([]*runner, meta.MaxConcurrency), logger: s.logger.With(zap.String("task_id", task.ID.String())), metrics: s.metrics, @@ -285,7 +348,7 @@ func newTaskScheduler( for i := range ts.runners { logger := ts.logger.With(zap.Int("run_slot", i)) - ts.runners[i] = newRunner(ctx, logger, task, s.desiredState, s.executor, s.logWriter, ts) + ts.runners[i] = newRunner(ctx, wg, logger, task, s.desiredState, s.executor, s.logWriter, ts) } return ts, nil @@ -351,6 +414,7 @@ type runner struct { // Cancelable context from parent taskScheduler. ctx context.Context + wg *sync.WaitGroup task *StoreTask @@ -366,6 +430,7 @@ type runner struct { func newRunner( ctx context.Context, + wg *sync.WaitGroup, logger *zap.Logger, task *StoreTask, desiredState DesiredState, @@ -375,6 +440,7 @@ func newRunner( ) *runner { return &runner{ ctx: ctx, + wg: wg, state: new(uint32), task: task, desiredState: desiredState, @@ -424,6 +490,7 @@ func (r *runner) RestartRun(qr QueuedRun) bool { // create a QueuedRun because we cant stm.CreateNextRun runLogger := r.logger.With(zap.String("run_id", qr.RunID.String()), zap.Int64("now", qr.Now)) + r.wg.Add(1) go r.executeAndWait(qr, runLogger) r.updateRunState(qr, RunStarted, runLogger) @@ -454,12 +521,14 @@ func (r *runner) startFromWorking(now int64) { runLogger := r.logger.With(zap.String("run_id", qr.RunID.String()), zap.Int64("now", qr.Now)) runLogger.Info("Created run; beginning execution") + r.wg.Add(1) go r.executeAndWait(qr, runLogger) r.updateRunState(qr, RunStarted, runLogger) } func (r *runner) executeAndWait(qr QueuedRun, runLogger *zap.Logger) { + defer r.wg.Done() rp, err := r.executor.Execute(r.ctx, qr) if err != nil { // TODO(mr): retry? and log error. diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index 4f3ab3c96f..398ebd9210 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -21,6 +21,8 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5, backend.WithLogger(zaptest.NewLogger(t))) + o.Start(context.Background()) + defer o.Stop() task := &backend.StoreTask{ ID: platform.ID{1}, @@ -93,6 +95,8 @@ func TestScheduler_CreateNextRunOnTick(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) + o.Start(context.Background()) + defer o.Stop() task := &backend.StoreTask{ ID: platform.ID{1}, @@ -161,6 +165,8 @@ func TestScheduler_Release(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) + o.Start(context.Background()) + defer o.Stop() task := &backend.StoreTask{ ID: platform.ID{1}, @@ -195,6 +201,8 @@ func TestScheduler_Queue(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 3059, backend.WithLogger(zaptest.NewLogger(t))) + o.Start(context.Background()) + defer o.Stop() task := &backend.StoreTask{ ID: platform.ID{1}, @@ -313,6 +321,8 @@ func TestScheduler_RunLog(t *testing.T) { e := mock.NewExecutor() rl := backend.NewInMemRunReaderWriter() s := backend.NewScheduler(d, e, rl, 5, backend.WithLogger(zaptest.NewLogger(t))) + s.Start(context.Background()) + defer s.Stop() // Claim a task that starts later. task := &backend.StoreTask{ @@ -398,6 +408,8 @@ func TestScheduler_Metrics(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() s := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) + s.Start(context.Background()) + defer s.Stop() reg := prom.NewRegistry() // PrometheusCollector isn't part of the Scheduler interface. Yet. diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index 08f2a32f97..5a956b3ddb 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -56,6 +56,10 @@ func (s *Scheduler) Tick(now int64) { func (s *Scheduler) WithLogger(l *zap.Logger) {} +func (s *Scheduler) Start(context.Context) {} + +func (s *Scheduler) Stop() {} + func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error { if s.claimError != nil { return s.claimError