From e643c434f6989a19bf6680503ecad14398a8f8ba Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Fri, 12 Oct 2018 08:24:22 -0600 Subject: [PATCH] Allow the scheduler to Update tasks. (#1058) * Allow the scheduler to Update tasks. * update the coordinator to use the new update action * remove unclean code --- task/backend/coordinator/coordinator.go | 6 +-- task/backend/coordinator/coordinator_test.go | 12 +---- task/backend/scheduler.go | 29 +++++++++++ task/backend/scheduler_test.go | 51 ++++++++++++++++++++ task/mock/scheduler.go | 27 +++++++++++ 5 files changed, 110 insertions(+), 15 deletions(-) diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index 147cdcab09..3378de2b1c 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -70,11 +70,7 @@ func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript return err } - if err := c.sch.ReleaseTask(id); err != nil { - return err - } - - if err := c.sch.ClaimTask(task, meta); err != nil { + if err := c.sch.UpdateTask(task, meta); err != nil { return err } diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index fbed599132..4683fce62e 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -29,6 +29,7 @@ func TestCoordinator(t *testing.T) { coord := coordinator.New(sched, st) createChan := sched.TaskCreateChan() releaseChan := sched.TaskReleaseChan() + updateChan := sched.TaskUpdateChan() orgID := platformtesting.MustIDBase16("69746f7175650d0a") usrID := platformtesting.MustIDBase16("6c61757320657420") @@ -109,16 +110,7 @@ func TestCoordinator(t *testing.T) { t.Fatal(err) } - task, err = timeoutSelector(releaseChan) - if err != nil { - t.Fatal(err) - } - - if task.Script != script { - t.Fatal("task sent to scheduler doesnt match task created") - } - - task, err = timeoutSelector(createChan) + task, err = timeoutSelector(updateChan) if err != nil { t.Fatal(err) } diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 0fe2095698..cab8d8f2e5 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -97,6 +97,9 @@ type Scheduler interface { // ClaimTask begins control of task execution in this scheduler. ClaimTask(task *StoreTask, meta *StoreTaskMeta) error + // UpdateTask will update the concurrency and the runners for a task + UpdateTask(task *StoreTask, meta *StoreTaskMeta) error + // ReleaseTask immediately cancels any in-progress runs for the given task ID, // and releases any resources related to management of that task. ReleaseTask(taskID platform.ID) error @@ -268,6 +271,32 @@ func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err err return nil } +func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error { + s.schedulerMu.Lock() + defer s.schedulerMu.Unlock() + + tid := task.ID.String() + ts, ok := s.taskSchedulers[tid] + if !ok { + return ErrTaskNotClaimed + } + ts.Cancel() + + nts, err := newTaskScheduler(s.ctx, s.wg, s, task, meta, s.metrics) + if err != nil { + return err + } + + s.taskSchedulers[tid] = nts + + next, hasQueue := ts.NextDue() + if now := atomic.LoadInt64(&s.now); now >= next || hasQueue { + ts.Work() + } + + return nil +} + func (s *TickScheduler) ReleaseTask(taskID platform.ID) error { s.schedulerMu.Lock() defer s.schedulerMu.Unlock() diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index 9bb752784d..9ecba22db1 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -197,6 +197,57 @@ func TestScheduler_Release(t *testing.T) { } } +func TestScheduler_UpdateTask(t *testing.T) { + d := mock.NewDesiredState() + e := mock.NewExecutor() + s := backend.NewScheduler(d, e, backend.NopLogWriter{}, 3059, backend.WithLogger(zaptest.NewLogger(t))) + s.Start(context.Background()) + defer s.Stop() + + task := &backend.StoreTask{ + ID: platform.ID(1), + } + meta := &backend.StoreTaskMeta{ + MaxConcurrency: 1, + EffectiveCron: "* * * * *", // Every minute. + LatestCompleted: 3000, + } + + d.SetTaskMeta(task.ID, *meta) + if err := s.ClaimTask(task, meta); err != nil { + t.Fatal(err) + } + + s.Tick(3060) + p, err := e.PollForNumberRunning(task.ID, 1) + if err != nil { + t.Fatal(err) + } + + p[0].Finish(mock.NewRunResult(nil, false), nil) + + meta.EffectiveCron = "0 * * * *" + meta.MaxConcurrency = 30 + d.SetTaskMeta(task.ID, *meta) + + if err := s.UpdateTask(task, meta); err != nil { + t.Fatal(err) + } + + s.Tick(3061) + p, err = e.PollForNumberRunning(task.ID, 0) + if err != nil { + t.Fatal(err) + } + + s.Tick(3600) + p, err = e.PollForNumberRunning(task.ID, 1) + if err != nil { + t.Fatal(err) + } + p[0].Finish(mock.NewRunResult(nil, false), nil) +} + func TestScheduler_Queue(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index cefedb11ed..fe53dca4b4 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -26,6 +26,7 @@ type Scheduler struct { createChan chan *Task releaseChan chan *Task + updateChan chan *Task claimError error releaseError error @@ -83,6 +84,28 @@ func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMe return nil } +func (s *Scheduler) UpdateTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error { + s.Lock() + defer s.Unlock() + + _, ok := s.claims[task.ID.String()] + if !ok { + return errors.New("task not in list") + } + + s.meta[task.ID.String()] = *meta + + t := &Task{Script: task.Script, StartExecution: meta.LatestCompleted, ConcurrencyLimit: uint8(meta.MaxConcurrency)} + + s.claims[task.ID.String()] = t + + if s.updateChan != nil { + s.updateChan <- t + } + + return nil +} + func (s *Scheduler) ReleaseTask(taskID platform.ID) error { if s.releaseError != nil { return s.releaseError @@ -117,6 +140,10 @@ func (s *Scheduler) TaskReleaseChan() <-chan *Task { s.releaseChan = make(chan *Task, 10) return s.releaseChan } +func (s *Scheduler) TaskUpdateChan() <-chan *Task { + s.updateChan = make(chan *Task, 10) + return s.updateChan +} // ClaimError sets an error to be returned by s.ClaimTask, if err is not nil. func (s *Scheduler) ClaimError(err error) {