fix(task): Allow task scheduler to be stopped and started. (#857)

* fix(task): Allow task scheduler to be stopped and started.

* make create task scheduling a bit more strict
pull/10616/head
Lyon Hill 2018-09-25 23:16:26 -06:00 committed by GitHub
parent 86916f71e7
commit 98fe509465
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 9 deletions

View File

@ -210,6 +210,7 @@ func platformF(cmd *cobra.Command, args []string) {
// TODO(lh): Replace NopLogWriter with real log writer
scheduler := taskbackend.NewScheduler(boltStore, executor, taskbackend.NopLogWriter{}, time.Now().UTC().Unix())
scheduler.Start(context.Background())
// TODO(lh): Replace NopLogReader with real log reader
taskSvc = task.PlatformAdapter(coordinator.New(scheduler, boltStore), taskbackend.NopLogReader{})

View File

@ -85,6 +85,12 @@ type RunResult interface {
// which likely means we will change the method signatures to something where
// we can wait for the result to complete and possibly inspect any relevant output.
type Scheduler interface {
// Start allows the scheduler to Tick. A scheduler without start will do nothing
Start(ctx context.Context)
// Stop a scheduler from ticking.
Stop()
// Tick updates the time of the scheduler.
// Any owned tasks who are due to execute and who have a free concurrency slot,
// will begin a new execution.
@ -98,6 +104,7 @@ type Scheduler interface {
ReleaseTask(taskID platform.ID) error
}
// SchedulerOption is a option you can use to modify the schedulers behavior.
type SchedulerOption func(Scheduler)
func WithTicker(ctx context.Context, d time.Duration) SchedulerOption {
@ -140,6 +147,7 @@ func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, no
now: now,
taskSchedulers: make(map[string]*taskScheduler),
logger: zap.NewNop(),
wg: &sync.WaitGroup{},
metrics: newSchedulerMetrics(),
}
@ -160,16 +168,31 @@ type outerScheduler struct {
metrics *schedulerMetrics
ctx context.Context
cancel context.CancelFunc
wg *sync.WaitGroup
schedulerMu sync.Mutex // Protects access and modification of taskSchedulers map.
taskSchedulers map[string]*taskScheduler // Stringified task ID -> task scheduler.
}
func (s *outerScheduler) Tick(now int64) {
atomic.StoreInt64(&s.now, now)
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
if s.ctx == nil {
return
}
select {
case <-s.ctx.Done():
return
default:
// do nothing and allow ticks
}
atomic.StoreInt64(&s.now, now)
affected := 0
for _, ts := range s.taskSchedulers {
if nextDue, hasQueue := ts.NextDue(); now >= nextDue || hasQueue {
@ -180,10 +203,50 @@ func (s *outerScheduler) Tick(now int64) {
s.logger.Info("Ticked", zap.Int64("now", now), zap.Int("tasks_affected", affected))
}
func (s *outerScheduler) Start(ctx context.Context) {
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
s.ctx, s.cancel = context.WithCancel(ctx)
}
func (s *outerScheduler) Stop() {
defer s.wg.Wait()
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
// if I was never started I cant stop
if s.cancel == nil {
return
}
s.cancel()
// release tasks
for id := range s.taskSchedulers {
delete(s.taskSchedulers, id)
s.metrics.ReleaseTask(id)
}
}
func (s *outerScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error) {
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
if s.ctx == nil {
return errors.New("can not claim tasks when i've not been started")
}
select {
case <-s.ctx.Done():
return errors.New("can not claim a task if not started")
default:
// do nothing and allow ticks
}
defer s.metrics.ClaimTask(err == nil)
ts, err := newTaskScheduler(s, task, meta, s.metrics)
ts, err := newTaskScheduler(s.ctx, s.wg, s, task, meta, s.metrics)
if err != nil {
return err
}
@ -195,17 +258,13 @@ func (s *outerScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err er
}
tid := task.ID.String()
s.schedulerMu.Lock()
_, ok := s.taskSchedulers[tid]
if ok {
s.schedulerMu.Unlock()
return errors.New("task has already been claimed")
}
s.taskSchedulers[tid] = ts
s.schedulerMu.Unlock()
next, hasQueue := ts.NextDue()
if now := atomic.LoadInt64(&s.now); now >= next || hasQueue {
ts.Work()
@ -245,6 +304,7 @@ type taskScheduler struct {
// CancelFunc for context passed to runners, to enable Cancel method.
cancel context.CancelFunc
wg *sync.WaitGroup
// Fixed-length slice of runners.
runners []*runner
@ -260,6 +320,8 @@ type taskScheduler struct {
}
func newTaskScheduler(
ctx context.Context,
wg *sync.WaitGroup,
s *outerScheduler,
task *StoreTask,
meta *StoreTaskMeta,
@ -270,11 +332,12 @@ func newTaskScheduler(
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
ts := &taskScheduler{
now: &s.now,
task: task,
cancel: cancel,
wg: wg,
runners: make([]*runner, meta.MaxConcurrency),
logger: s.logger.With(zap.String("task_id", task.ID.String())),
metrics: s.metrics,
@ -285,7 +348,7 @@ func newTaskScheduler(
for i := range ts.runners {
logger := ts.logger.With(zap.Int("run_slot", i))
ts.runners[i] = newRunner(ctx, logger, task, s.desiredState, s.executor, s.logWriter, ts)
ts.runners[i] = newRunner(ctx, wg, logger, task, s.desiredState, s.executor, s.logWriter, ts)
}
return ts, nil
@ -351,6 +414,7 @@ type runner struct {
// Cancelable context from parent taskScheduler.
ctx context.Context
wg *sync.WaitGroup
task *StoreTask
@ -366,6 +430,7 @@ type runner struct {
func newRunner(
ctx context.Context,
wg *sync.WaitGroup,
logger *zap.Logger,
task *StoreTask,
desiredState DesiredState,
@ -375,6 +440,7 @@ func newRunner(
) *runner {
return &runner{
ctx: ctx,
wg: wg,
state: new(uint32),
task: task,
desiredState: desiredState,
@ -424,6 +490,7 @@ func (r *runner) RestartRun(qr QueuedRun) bool {
// create a QueuedRun because we cant stm.CreateNextRun
runLogger := r.logger.With(zap.String("run_id", qr.RunID.String()), zap.Int64("now", qr.Now))
r.wg.Add(1)
go r.executeAndWait(qr, runLogger)
r.updateRunState(qr, RunStarted, runLogger)
@ -454,12 +521,14 @@ func (r *runner) startFromWorking(now int64) {
runLogger := r.logger.With(zap.String("run_id", qr.RunID.String()), zap.Int64("now", qr.Now))
runLogger.Info("Created run; beginning execution")
r.wg.Add(1)
go r.executeAndWait(qr, runLogger)
r.updateRunState(qr, RunStarted, runLogger)
}
func (r *runner) executeAndWait(qr QueuedRun, runLogger *zap.Logger) {
defer r.wg.Done()
rp, err := r.executor.Execute(r.ctx, qr)
if err != nil {
// TODO(mr): retry? and log error.

View File

@ -21,6 +21,8 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5, backend.WithLogger(zaptest.NewLogger(t)))
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID{1},
@ -93,6 +95,8 @@ func TestScheduler_CreateNextRunOnTick(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID{1},
@ -161,6 +165,8 @@ func TestScheduler_Release(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID{1},
@ -195,6 +201,8 @@ func TestScheduler_Queue(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 3059, backend.WithLogger(zaptest.NewLogger(t)))
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID{1},
@ -313,6 +321,8 @@ func TestScheduler_RunLog(t *testing.T) {
e := mock.NewExecutor()
rl := backend.NewInMemRunReaderWriter()
s := backend.NewScheduler(d, e, rl, 5, backend.WithLogger(zaptest.NewLogger(t)))
s.Start(context.Background())
defer s.Stop()
// Claim a task that starts later.
task := &backend.StoreTask{
@ -398,6 +408,8 @@ func TestScheduler_Metrics(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
s := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
s.Start(context.Background())
defer s.Stop()
reg := prom.NewRegistry()
// PrometheusCollector isn't part of the Scheduler interface. Yet.

View File

@ -56,6 +56,10 @@ func (s *Scheduler) Tick(now int64) {
func (s *Scheduler) WithLogger(l *zap.Logger) {}
func (s *Scheduler) Start(context.Context) {}
func (s *Scheduler) Stop() {}
func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error {
if s.claimError != nil {
return s.claimError