Allow the scheduler to Update tasks. (#1058)

* Allow the scheduler to Update tasks.

* update the coordinator to use the new update action

* remove unclean code
pull/10616/head
Lyon Hill 2018-10-12 08:24:22 -06:00 committed by GitHub
parent a005792a8c
commit e643c434f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 15 deletions

View File

@ -70,11 +70,7 @@ func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript
return err
}
if err := c.sch.ReleaseTask(id); err != nil {
return err
}
if err := c.sch.ClaimTask(task, meta); err != nil {
if err := c.sch.UpdateTask(task, meta); err != nil {
return err
}

View File

@ -29,6 +29,7 @@ func TestCoordinator(t *testing.T) {
coord := coordinator.New(sched, st)
createChan := sched.TaskCreateChan()
releaseChan := sched.TaskReleaseChan()
updateChan := sched.TaskUpdateChan()
orgID := platformtesting.MustIDBase16("69746f7175650d0a")
usrID := platformtesting.MustIDBase16("6c61757320657420")
@ -109,16 +110,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}
task, err = timeoutSelector(releaseChan)
if err != nil {
t.Fatal(err)
}
if task.Script != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
task, err = timeoutSelector(createChan)
task, err = timeoutSelector(updateChan)
if err != nil {
t.Fatal(err)
}

View File

@ -97,6 +97,9 @@ type Scheduler interface {
// ClaimTask begins control of task execution in this scheduler.
ClaimTask(task *StoreTask, meta *StoreTaskMeta) error
// UpdateTask will update the concurrency and the runners for a task
UpdateTask(task *StoreTask, meta *StoreTaskMeta) error
// ReleaseTask immediately cancels any in-progress runs for the given task ID,
// and releases any resources related to management of that task.
ReleaseTask(taskID platform.ID) error
@ -268,6 +271,32 @@ func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err err
return nil
}
func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error {
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
tid := task.ID.String()
ts, ok := s.taskSchedulers[tid]
if !ok {
return ErrTaskNotClaimed
}
ts.Cancel()
nts, err := newTaskScheduler(s.ctx, s.wg, s, task, meta, s.metrics)
if err != nil {
return err
}
s.taskSchedulers[tid] = nts
next, hasQueue := ts.NextDue()
if now := atomic.LoadInt64(&s.now); now >= next || hasQueue {
ts.Work()
}
return nil
}
func (s *TickScheduler) ReleaseTask(taskID platform.ID) error {
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()

View File

@ -197,6 +197,57 @@ func TestScheduler_Release(t *testing.T) {
}
}
func TestScheduler_UpdateTask(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
s := backend.NewScheduler(d, e, backend.NopLogWriter{}, 3059, backend.WithLogger(zaptest.NewLogger(t)))
s.Start(context.Background())
defer s.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "* * * * *", // Every minute.
LatestCompleted: 3000,
}
d.SetTaskMeta(task.ID, *meta)
if err := s.ClaimTask(task, meta); err != nil {
t.Fatal(err)
}
s.Tick(3060)
p, err := e.PollForNumberRunning(task.ID, 1)
if err != nil {
t.Fatal(err)
}
p[0].Finish(mock.NewRunResult(nil, false), nil)
meta.EffectiveCron = "0 * * * *"
meta.MaxConcurrency = 30
d.SetTaskMeta(task.ID, *meta)
if err := s.UpdateTask(task, meta); err != nil {
t.Fatal(err)
}
s.Tick(3061)
p, err = e.PollForNumberRunning(task.ID, 0)
if err != nil {
t.Fatal(err)
}
s.Tick(3600)
p, err = e.PollForNumberRunning(task.ID, 1)
if err != nil {
t.Fatal(err)
}
p[0].Finish(mock.NewRunResult(nil, false), nil)
}
func TestScheduler_Queue(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()

View File

@ -26,6 +26,7 @@ type Scheduler struct {
createChan chan *Task
releaseChan chan *Task
updateChan chan *Task
claimError error
releaseError error
@ -83,6 +84,28 @@ func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMe
return nil
}
func (s *Scheduler) UpdateTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error {
s.Lock()
defer s.Unlock()
_, ok := s.claims[task.ID.String()]
if !ok {
return errors.New("task not in list")
}
s.meta[task.ID.String()] = *meta
t := &Task{Script: task.Script, StartExecution: meta.LatestCompleted, ConcurrencyLimit: uint8(meta.MaxConcurrency)}
s.claims[task.ID.String()] = t
if s.updateChan != nil {
s.updateChan <- t
}
return nil
}
func (s *Scheduler) ReleaseTask(taskID platform.ID) error {
if s.releaseError != nil {
return s.releaseError
@ -117,6 +140,10 @@ func (s *Scheduler) TaskReleaseChan() <-chan *Task {
s.releaseChan = make(chan *Task, 10)
return s.releaseChan
}
func (s *Scheduler) TaskUpdateChan() <-chan *Task {
s.updateChan = make(chan *Task, 10)
return s.updateChan
}
// ClaimError sets an error to be returned by s.ClaimTask, if err is not nil.
func (s *Scheduler) ClaimError(err error) {