From a9df93b1fdf3b180ae981b4dce876756281fccba Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Thu, 26 Sep 2019 13:55:23 -0700 Subject: [PATCH] refactor(tasks): create coordinator for new scheduler/executor (#15268) --- task.go | 19 +- task/backend/coordinator/coordinator_test.go | 26 +- task/backend/coordinator/support_test.go | 10 +- task/backend/coordinator/task_coordinator.go | 184 ++++++++++++++ .../task_coordinator_support_test.go | 114 +++++++++ .../coordinator/task_coordinator_test.go | 227 ++++++++++++++++++ task/backend/schedulable_task_service.go | 41 ++++ task/backend/schedulable_task_service_test.go | 46 ++++ task/backend/scheduler/scheduler.go | 13 +- task_errors.go | 2 +- 10 files changed, 656 insertions(+), 26 deletions(-) create mode 100644 task/backend/coordinator/task_coordinator.go create mode 100644 task/backend/coordinator/task_coordinator_support_test.go create mode 100644 task/backend/coordinator/task_coordinator_test.go create mode 100644 task/backend/schedulable_task_service.go create mode 100644 task/backend/schedulable_task_service_test.go diff --git a/task.go b/task.go index 1cc19b0894..f265c23ec8 100644 --- a/task.go +++ b/task.go @@ -60,7 +60,24 @@ func (t *Task) EffectiveCron() string { return "" } -// Run is a record created when a run of a task is scheduled. +// LatestCompletedTime gives the time.Time that the task was last queued to be run in RFC3339 format. +func (t *Task) LatestCompletedTime() (time.Time, error) { + tm := t.LatestCompleted + if tm == "" { + tm = t.CreatedAt + } + return time.Parse(time.RFC3339, tm) +} + +// OffsetDuration gives the time.Duration of the Task's Offset property, which represents a delay before execution +func (t *Task) OffsetDuration() (time.Duration, error) { + if t.Offset == "" { + return time.Duration(0), nil + } + return time.ParseDuration(t.Offset) +} + +// Run is a record createId when a run of a task is scheduled. type Run struct { ID ID `json:"id,omitempty"` TaskID ID `json:"taskID"` diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index 95d92581ef..e6cc24a4c8 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -27,7 +27,7 @@ var ( TaskID: one, } - allowUnexported = cmp.AllowUnexported(scheduler{}) + allowUnexported = cmp.AllowUnexported(schedulerS{}) ) func Test_Coordinator(t *testing.T) { @@ -37,7 +37,7 @@ func Test_Coordinator(t *testing.T) { updateErr error releaseErr error call func(*testing.T, *Coordinator) - scheduler *scheduler + scheduler *schedulerS }{ { name: "TaskCreated", @@ -46,7 +46,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ claimCall{taskOne}, }, @@ -59,7 +59,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ updateCall{activeThree}, claimCall{activeThree}, @@ -73,7 +73,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ releaseCall{three}, updateCall{taskThree}, @@ -88,7 +88,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ releaseCall{three}, updateCall{taskThree}, @@ -103,7 +103,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ releaseCall{three}, updateCall{taskThree}, @@ -117,7 +117,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ updateCall{taskTwo}, }, @@ -130,7 +130,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ releaseCall{two}, }, @@ -143,7 +143,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ cancelCall{one, one}, }, @@ -156,7 +156,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ updateCall{taskOne}, }, @@ -169,7 +169,7 @@ func Test_Coordinator(t *testing.T) { t.Errorf("expected nil error found %q", err) } }, - scheduler: &scheduler{ + scheduler: &schedulerS{ calls: []interface{}{ updateCall{taskOne}, }, @@ -178,7 +178,7 @@ func Test_Coordinator(t *testing.T) { } { t.Run(test.name, func(t *testing.T) { var ( - scheduler = &scheduler{ + scheduler = &schedulerS{ claimErr: test.claimErr, updateErr: test.updateErr, releaseErr: test.releaseErr, diff --git a/task/backend/coordinator/support_test.go b/task/backend/coordinator/support_test.go index aba789e1f9..cfb6dafa18 100644 --- a/task/backend/coordinator/support_test.go +++ b/task/backend/coordinator/support_test.go @@ -8,7 +8,7 @@ import ( ) type ( - scheduler struct { + schedulerS struct { backend.Scheduler claimErr, @@ -35,25 +35,25 @@ type ( } ) -func (s *scheduler) ClaimTask(_ context.Context, task *influxdb.Task) error { +func (s *schedulerS) ClaimTask(_ context.Context, task *influxdb.Task) error { s.calls = append(s.calls, claimCall{task}) return s.claimErr } -func (s *scheduler) UpdateTask(_ context.Context, task *influxdb.Task) error { +func (s *schedulerS) UpdateTask(_ context.Context, task *influxdb.Task) error { s.calls = append(s.calls, updateCall{task}) return s.updateErr } -func (s *scheduler) ReleaseTask(taskID influxdb.ID) error { +func (s *schedulerS) ReleaseTask(taskID influxdb.ID) error { s.calls = append(s.calls, releaseCall{taskID}) return s.releaseErr } -func (s *scheduler) CancelRun(_ context.Context, taskID influxdb.ID, runID influxdb.ID) error { +func (s *schedulerS) CancelRun(_ context.Context, taskID influxdb.ID, runID influxdb.ID) error { s.calls = append(s.calls, cancelCall{taskID, runID}) return nil diff --git a/task/backend/coordinator/task_coordinator.go b/task/backend/coordinator/task_coordinator.go new file mode 100644 index 0000000000..3e7329add6 --- /dev/null +++ b/task/backend/coordinator/task_coordinator.go @@ -0,0 +1,184 @@ +package coordinator + +import ( + "context" + "errors" + "time" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/task/backend" + "github.com/influxdata/influxdb/task/backend/executor" + "github.com/influxdata/influxdb/task/backend/middleware" + "github.com/influxdata/influxdb/task/backend/scheduler" + "go.uber.org/zap" +) + +var _ middleware.Coordinator = (*Coordinator)(nil) +var _ Executor = (*executor.TaskExecutor)(nil) + +// DefaultLimit is the maximum number of tasks that a given taskd server can own +const DefaultLimit = 1000 + +// Executor is an abstraction of the task executor with only the functions needed by the coordinator +type Executor interface { + ManualRun(ctx context.Context, id influxdb.ID, runID influxdb.ID) (executor.Promise, error) + Cancel(ctx context.Context, runID influxdb.ID) error +} + +// TaskCoordinator (temporary name) is the intermediary between the scheduling/executing system and the rest of the task system +type TaskCoordinator struct { + logger *zap.Logger + sch scheduler.Scheduler + ex Executor + + limit int +} + +type CoordinatorOption func(*TaskCoordinator) + +// SchedulableTask is a wrapper around the Task struct, giving it methods to make it compatible with the Scheduler +type SchedulableTask struct { + *influxdb.Task +} + +func (t SchedulableTask) ID() scheduler.ID { + return scheduler.ID(t.Task.ID) +} + +// Schedule takes the time a Task is scheduled for and returns a Schedule object +func (t SchedulableTask) Schedule() scheduler.Schedule { + // TODO (al): use new scheduler's NewSchedule method + return scheduler.Schedule{} +} + +// Offset returns a time.Duration for the Task's offset property +func (t SchedulableTask) Offset() time.Duration { + offset, _ := t.OffsetDuration() + return offset +} + +// LastScheduled parses the task's LatestCompleted value as a Time object +func (t SchedulableTask) LastScheduled() time.Time { + tm, _ := t.LatestCompletedTime() + return tm +} + +func WithLimitOpt(i int) CoordinatorOption { + return func(c *TaskCoordinator) { + c.limit = i + } +} + +// NewSchedulableTask transforms an influxdb task to a schedulable task type +func NewSchedulableTask(task *influxdb.Task) (SchedulableTask, error) { + if offset, err := task.OffsetDuration(); offset != time.Duration(0) && err != nil { + return SchedulableTask{}, errors.New("could not create schedulable task: offset duration could not be parsed") + } + + if _, err := task.LatestCompletedTime(); err != nil { + return SchedulableTask{}, errors.New("could not create schedulable task: latest completed time could not be parsed") + } + t := SchedulableTask{task} + + return t, nil +} + +func NewCoordinator(logger *zap.Logger, scheduler scheduler.Scheduler, executor Executor, opts ...CoordinatorOption) *TaskCoordinator { + c := &TaskCoordinator{ + logger: logger, + sch: scheduler, + ex: executor, + limit: DefaultLimit, + } + + for _, opt := range opts { + opt(c) + } + + return c +} + +// TaskCreated asks the Scheduler to schedule the newly created task +func (c *TaskCoordinator) TaskCreated(ctx context.Context, task *influxdb.Task) error { + t, err := NewSchedulableTask(task) + + if err != nil { + return err + } + // func new schedulable task + // catch errors from offset and last scheduled + if err = c.sch.Schedule(t); err != nil { + return err + } + + return nil +} + +// TaskUpdated releases the task if it is being disabled, and schedules it otherwise +func (c *TaskCoordinator) TaskUpdated(ctx context.Context, from, to *influxdb.Task) error { + sid := scheduler.ID(to.ID) + t, err := NewSchedulableTask(to) + if err != nil { + return err + } + + // if disabling the task, release it before schedule update + if to.Status != from.Status && to.Status == string(backend.TaskInactive) { + if err := c.sch.Release(sid); err != nil && err != influxdb.ErrTaskNotClaimed { + return err + } + } else { + if err := c.sch.Schedule(t); err != nil { + return err + } + } + + return nil +} + +//TaskDeleted asks the Scheduler to release the deleted task +func (c *TaskCoordinator) TaskDeleted(ctx context.Context, id influxdb.ID) error { + tid := scheduler.ID(id) + if err := c.sch.Release(tid); err != nil && err != influxdb.ErrTaskNotClaimed { + return err + } + + return nil +} + +// RunCancelled speaks directly to the executor to cancel a task run +func (c *TaskCoordinator) RunCancelled(ctx context.Context, runID influxdb.ID) error { + err := c.ex.Cancel(ctx, runID) + + return err +} + +// RunRetried speaks directly to the executor to re-try a task run immediately +func (c *TaskCoordinator) RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error { + promise, err := c.ex.ManualRun(ctx, task.ID, run.ID) + if err != nil { + return influxdb.ErrRunExecutionError(err) + } + + <-promise.Done() + if err = promise.Error(); err != nil { + return err + } + + return nil +} + +// RunForced speaks directly to the Executor to run a task immediately +func (c *TaskCoordinator) RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error { + promise, err := c.ex.ManualRun(ctx, task.ID, run.ID) + if err != nil { + return influxdb.ErrRunExecutionError(err) + } + + <-promise.Done() + if err = promise.Error(); err != nil { + return err + } + + return nil +} diff --git a/task/backend/coordinator/task_coordinator_support_test.go b/task/backend/coordinator/task_coordinator_support_test.go new file mode 100644 index 0000000000..0a69eba568 --- /dev/null +++ b/task/backend/coordinator/task_coordinator_support_test.go @@ -0,0 +1,114 @@ +package coordinator + +import ( + "context" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/task/backend/executor" + "github.com/influxdata/influxdb/task/backend/scheduler" +) + +var _ Executor = (*executorE)(nil) + +type ( + executorE struct { + calls []interface{} + } + + manualRunCall struct { + TaskID influxdb.ID + RunID influxdb.ID + } + + cancelCallC struct { + RunID influxdb.ID + } +) + +type ( + schedulerC struct { + scheduler.Scheduler + + calls []interface{} + } + + scheduleCall struct { + Task scheduler.Schedulable + } + + releaseCallC struct { + TaskID scheduler.ID + } +) + +type ( + promise struct { + run *influxdb.Run + + done chan struct{} + err error + + ctx context.Context + cancelFunc context.CancelFunc + } +) + +// ID is the id of the run that was created +func (p *promise) ID() influxdb.ID { + return p.run.ID +} + +// Cancel is used to cancel a executing query +func (p *promise) Cancel(ctx context.Context) { + // call cancelfunc + p.cancelFunc() + + // wait for ctx.Done or p.Done + select { + case <-p.Done(): + case <-ctx.Done(): + } +} + +// Done provides a channel that closes on completion of a promise +func (p *promise) Done() <-chan struct{} { + return p.done +} + +// Error returns the error resulting from a run execution. +// If the execution is not complete error waits on Done(). +func (p *promise) Error() error { + <-p.done + return p.err +} + +func (s *schedulerC) Schedule(task scheduler.Schedulable) error { + s.calls = append(s.calls, scheduleCall{task}) + + return nil +} + +func (s *schedulerC) Release(taskID scheduler.ID) error { + s.calls = append(s.calls, releaseCallC{taskID}) + + return nil +} + +func (e *executorE) ManualRun(ctx context.Context, id influxdb.ID, runID influxdb.ID) (executor.Promise, error) { + e.calls = append(e.calls, manualRunCall{id, runID}) + ctx, cancel := context.WithCancel(ctx) + p := promise{ + done: make(chan struct{}), + ctx: ctx, + cancelFunc: cancel, + } + close(p.done) + + err := p.Error() + return &p, err +} + +func (e *executorE) Cancel(ctx context.Context, runID influxdb.ID) error { + e.calls = append(e.calls, cancelCallC{runID}) + return nil +} diff --git a/task/backend/coordinator/task_coordinator_test.go b/task/backend/coordinator/task_coordinator_test.go new file mode 100644 index 0000000000..a450ef1bc8 --- /dev/null +++ b/task/backend/coordinator/task_coordinator_test.go @@ -0,0 +1,227 @@ +package coordinator + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/task/backend/scheduler" + "go.uber.org/zap" +) + +func Test_Coordinator_Executor_Methods(t *testing.T) { + var ( + one = influxdb.ID(1) + taskOne = &influxdb.Task{ID: one} + + timeString = time.Now().Format(time.RFC3339) + + runOne = &influxdb.Run{ + ID: one, + TaskID: one, + ScheduledFor: timeString, + } + + allowUnexported = cmp.AllowUnexported(executorE{}, schedulerC{}) + ) + + for _, test := range []struct { + name string + claimErr error + updateErr error + releaseErr error + call func(*testing.T, *TaskCoordinator) + executor *executorE + }{ + { + name: "RunForced", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.RunForced(context.Background(), taskOne, runOne); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + executor: &executorE{ + calls: []interface{}{ + manualRunCall{taskOne.ID, runOne.ID}, + }, + }, + }, + { + name: "RunRetried", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.RunRetried(context.Background(), taskOne, runOne); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + executor: &executorE{ + calls: []interface{}{ + manualRunCall{taskOne.ID, runOne.ID}, + }, + }, + }, + { + name: "RunCancelled", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.RunCancelled(context.Background(), runOne.ID); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + executor: &executorE{ + calls: []interface{}{ + cancelCallC{runOne.ID}, + }, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + var ( + executor = &executorE{} + scheduler = &schedulerC{} + coord = NewCoordinator(zap.NewNop(), scheduler, executor) + ) + + test.call(t, coord) + + if diff := cmp.Diff( + test.executor.calls, + executor.calls, + allowUnexported); diff != "" { + t.Errorf("unexpected executor contents %s", diff) + } + }) + } +} + +func Test_Coordinator_Scheduler_Methods(t *testing.T) { + + var ( + one = influxdb.ID(1) + two = influxdb.ID(2) + three = influxdb.ID(3) + now = time.Now().Format(time.RFC3339Nano) + + taskOne = &influxdb.Task{ID: one, CreatedAt: now} + taskTwo = &influxdb.Task{ID: two, Status: "active", CreatedAt: now} + taskTwoInactive = &influxdb.Task{ID: two, Status: "inactive", CreatedAt: now} + taskThreeOriginal = &influxdb.Task{ + ID: three, + Status: "active", + Name: "Previous", + CreatedAt: now, + } + taskThreeNew = &influxdb.Task{ + ID: three, + Status: "active", + Name: "Renamed", + CreatedAt: now, + } + + schedulableT = SchedulableTask{taskOne} + schedulableTaskTwo = SchedulableTask{taskTwo} + schedulableTaskThree = SchedulableTask{taskThreeNew} + + timeString = time.Now().Format(time.RFC3339) + + runOne = &influxdb.Run{ + ID: one, + TaskID: one, + ScheduledFor: timeString, + } + + allowUnexported = cmp.AllowUnexported(executorE{}, schedulerC{}) + ) + + for _, test := range []struct { + name string + claimErr error + updateErr error + releaseErr error + call func(*testing.T, *TaskCoordinator) + scheduler *schedulerC + }{ + { + name: "TaskCreated", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.TaskCreated(context.Background(), taskOne); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + scheduler: &schedulerC{ + calls: []interface{}{ + scheduleCall{schedulableT}, + }, + }, + }, + { + name: "TaskUpdated - deactivate task", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.TaskUpdated(context.Background(), taskTwo, taskTwoInactive); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + scheduler: &schedulerC{ + calls: []interface{}{ + releaseCallC{scheduler.ID(taskTwo.ID)}, + }, + }, + }, + { + name: "TaskUpdated - activate task", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.TaskUpdated(context.Background(), taskTwoInactive, taskTwo); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + scheduler: &schedulerC{ + calls: []interface{}{ + scheduleCall{schedulableTaskTwo}, + }, + }, + }, + { + name: "TaskUpdated - change name", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.TaskUpdated(context.Background(), taskThreeOriginal, taskThreeNew); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + scheduler: &schedulerC{ + calls: []interface{}{ + scheduleCall{schedulableTaskThree}, + }, + }, + }, + { + name: "TaskDeleted", + call: func(t *testing.T, c *TaskCoordinator) { + if err := c.TaskDeleted(context.Background(), runOne.ID); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + scheduler: &schedulerC{ + calls: []interface{}{ + releaseCallC{scheduler.ID(taskOne.ID)}, + }, + }, + }, + } { + t.Run(test.name, func(t *testing.T) { + var ( + executor = &executorE{} + scheduler = &schedulerC{} + coord = NewCoordinator(zap.NewNop(), scheduler, executor) + ) + + test.call(t, coord) + + if diff := cmp.Diff( + test.scheduler.calls, + scheduler.calls, + allowUnexported); diff != "" { + t.Errorf("unexpected scheduler contents %s", diff) + } + }) + } +} diff --git a/task/backend/schedulable_task_service.go b/task/backend/schedulable_task_service.go new file mode 100644 index 0000000000..c0c83a0745 --- /dev/null +++ b/task/backend/schedulable_task_service.go @@ -0,0 +1,41 @@ +package backend + +import ( + "context" + "fmt" + "time" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/task/backend/scheduler" +) + +var _ scheduler.SchedulableService = (*SchedulableTaskService)(nil) + +// UpdateTaskService provides an API to update the LatestScheduled time of a task +type UpdateTaskService interface { + UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) +} + +// SchedulableTaskService implements the SchedulableService interface +type SchedulableTaskService struct { + UpdateTaskService +} + +// NewSchedulableTaskService initializes a new SchedulableTaskService given an UpdateTaskService +func NewSchedulableTaskService(ts UpdateTaskService) SchedulableTaskService { + return SchedulableTaskService{ts} +} + +// UpdateLastScheduled uses the task service to store the latest time a task was scheduled to run +func (s SchedulableTaskService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error { + tm := t.Format(time.RFC3339) + tid := influxdb.ID(id) + _, err := s.UpdateTask(ctx, tid, influxdb.TaskUpdate{ + LatestCompleted: &tm, + }) + + if err != nil { + return fmt.Errorf("could not update last scheduled for task; Err: %v", err) + } + return nil +} diff --git a/task/backend/schedulable_task_service_test.go b/task/backend/schedulable_task_service_test.go new file mode 100644 index 0000000000..5071223c74 --- /dev/null +++ b/task/backend/schedulable_task_service_test.go @@ -0,0 +1,46 @@ +package backend + +import ( + "context" + "testing" + "time" + + "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/task/backend/scheduler" +) + +var ( + mockTaskID = influxdb.ID(1) + mockTimeNow = time.Now() + mockTimeNowStr = time.Now().Format(time.RFC3339Nano) +) + +func (m MockTaskService) UpdateTask(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) { + return &influxdb.Task{ID: id, UpdatedAt: mockTimeNowStr}, nil +} + +type MockTaskService struct{} + +func Test_Schedulable_Task_Service(t *testing.T) { + + for _, test := range []struct { + name string + task *influxdb.Task + }{ + { + name: "Create New Schedulable Task Service", + task: taskOne, + }, + } { + t.Run(test.name, func(t *testing.T) { + ts := MockTaskService{} + + schedulableService := NewSchedulableTaskService(ts) + + err := schedulableService.UpdateLastScheduled(context.Background(), scheduler.ID(mockTaskID), mockTimeNow) + if err != nil { + t.Fatalf("expected nil error, got: %v", err) + } + }) + } +} diff --git a/task/backend/scheduler/scheduler.go b/task/backend/scheduler/scheduler.go index 4c36f5d752..1e3a742f23 100644 --- a/task/backend/scheduler/scheduler.go +++ b/task/backend/scheduler/scheduler.go @@ -3,14 +3,12 @@ package scheduler import ( "context" "time" - - "github.com/influxdata/influxdb" ) // ID duplicates the influxdb ID so users of the scheduler don't have to // import influxdb for the id. // TODO(lh): maybe make this its own thing sometime in the future. -type ID influxdb.ID +type ID uint64 // Executor is a system used by the scheduler to actually execute the scheduleable item. type Executor interface { @@ -23,8 +21,7 @@ type Executor interface { Execute(ctx context.Context, id ID, scheduledAt time.Time) error } -// Schedulable is the interface that encapsulates work that -// is to be executed on a specified schedule. +// Schedulable is the interface that encapsulates the state that is required to schedule a job. type Schedulable interface { // ID is the unique identifier for this Schedulable ID() ID @@ -41,10 +38,14 @@ type Schedulable interface { // LastScheduled specifies last time this Schedulable was queued // for execution. LastScheduled() time.Time +} + +// SchedulableService encapsulates the work necessary to schedule a job +type SchedulableService interface { // UpdateLastScheduled notifies the instance that it was scheduled for // execution at the specified time - UpdateLastScheduled(time.Time) + UpdateLastScheduled(ctx context.Context, id ID, t time.Time) error } type Schedule struct { diff --git a/task_errors.go b/task_errors.go index dfc4e1fb98..03dbc46bb0 100644 --- a/task_errors.go +++ b/task_errors.go @@ -189,7 +189,7 @@ func ErrRunExecutionError(err error) *Error { return &Error{ Code: EInternal, Msg: fmt.Sprintf("could not execute task run; Err: %v", err), - Op: "kv/taskScheduler", + Op: "kv/taskExecutor", Err: err, } }