refactor(tasks): create coordinator for new scheduler/executor (#15268)

pull/15245/head
Alirie Gray 2019-09-26 13:55:23 -07:00 committed by GitHub
parent 73ac9c5b5c
commit a9df93b1fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 656 additions and 26 deletions

19
task.go
View File

@ -60,7 +60,24 @@ func (t *Task) EffectiveCron() string {
return ""
}
// Run is a record created when a run of a task is scheduled.
// LatestCompletedTime gives the time.Time that the task was last queued to be run in RFC3339 format.
func (t *Task) LatestCompletedTime() (time.Time, error) {
tm := t.LatestCompleted
if tm == "" {
tm = t.CreatedAt
}
return time.Parse(time.RFC3339, tm)
}
// OffsetDuration gives the time.Duration of the Task's Offset property, which represents a delay before execution
func (t *Task) OffsetDuration() (time.Duration, error) {
if t.Offset == "" {
return time.Duration(0), nil
}
return time.ParseDuration(t.Offset)
}
// Run is a record createId when a run of a task is scheduled.
type Run struct {
ID ID `json:"id,omitempty"`
TaskID ID `json:"taskID"`

View File

@ -27,7 +27,7 @@ var (
TaskID: one,
}
allowUnexported = cmp.AllowUnexported(scheduler{})
allowUnexported = cmp.AllowUnexported(schedulerS{})
)
func Test_Coordinator(t *testing.T) {
@ -37,7 +37,7 @@ func Test_Coordinator(t *testing.T) {
updateErr error
releaseErr error
call func(*testing.T, *Coordinator)
scheduler *scheduler
scheduler *schedulerS
}{
{
name: "TaskCreated",
@ -46,7 +46,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
claimCall{taskOne},
},
@ -59,7 +59,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
updateCall{activeThree},
claimCall{activeThree},
@ -73,7 +73,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
releaseCall{three},
updateCall{taskThree},
@ -88,7 +88,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
releaseCall{three},
updateCall{taskThree},
@ -103,7 +103,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
releaseCall{three},
updateCall{taskThree},
@ -117,7 +117,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
updateCall{taskTwo},
},
@ -130,7 +130,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
releaseCall{two},
},
@ -143,7 +143,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
cancelCall{one, one},
},
@ -156,7 +156,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
updateCall{taskOne},
},
@ -169,7 +169,7 @@ func Test_Coordinator(t *testing.T) {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &scheduler{
scheduler: &schedulerS{
calls: []interface{}{
updateCall{taskOne},
},
@ -178,7 +178,7 @@ func Test_Coordinator(t *testing.T) {
} {
t.Run(test.name, func(t *testing.T) {
var (
scheduler = &scheduler{
scheduler = &schedulerS{
claimErr: test.claimErr,
updateErr: test.updateErr,
releaseErr: test.releaseErr,

View File

@ -8,7 +8,7 @@ import (
)
type (
scheduler struct {
schedulerS struct {
backend.Scheduler
claimErr,
@ -35,25 +35,25 @@ type (
}
)
func (s *scheduler) ClaimTask(_ context.Context, task *influxdb.Task) error {
func (s *schedulerS) ClaimTask(_ context.Context, task *influxdb.Task) error {
s.calls = append(s.calls, claimCall{task})
return s.claimErr
}
func (s *scheduler) UpdateTask(_ context.Context, task *influxdb.Task) error {
func (s *schedulerS) UpdateTask(_ context.Context, task *influxdb.Task) error {
s.calls = append(s.calls, updateCall{task})
return s.updateErr
}
func (s *scheduler) ReleaseTask(taskID influxdb.ID) error {
func (s *schedulerS) ReleaseTask(taskID influxdb.ID) error {
s.calls = append(s.calls, releaseCall{taskID})
return s.releaseErr
}
func (s *scheduler) CancelRun(_ context.Context, taskID influxdb.ID, runID influxdb.ID) error {
func (s *schedulerS) CancelRun(_ context.Context, taskID influxdb.ID, runID influxdb.ID) error {
s.calls = append(s.calls, cancelCall{taskID, runID})
return nil

View File

@ -0,0 +1,184 @@
package coordinator
import (
"context"
"errors"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/task/backend/middleware"
"github.com/influxdata/influxdb/task/backend/scheduler"
"go.uber.org/zap"
)
var _ middleware.Coordinator = (*Coordinator)(nil)
var _ Executor = (*executor.TaskExecutor)(nil)
// DefaultLimit is the maximum number of tasks that a given taskd server can own
const DefaultLimit = 1000
// Executor is an abstraction of the task executor with only the functions needed by the coordinator
type Executor interface {
ManualRun(ctx context.Context, id influxdb.ID, runID influxdb.ID) (executor.Promise, error)
Cancel(ctx context.Context, runID influxdb.ID) error
}
// TaskCoordinator (temporary name) is the intermediary between the scheduling/executing system and the rest of the task system
type TaskCoordinator struct {
logger *zap.Logger
sch scheduler.Scheduler
ex Executor
limit int
}
type CoordinatorOption func(*TaskCoordinator)
// SchedulableTask is a wrapper around the Task struct, giving it methods to make it compatible with the Scheduler
type SchedulableTask struct {
*influxdb.Task
}
func (t SchedulableTask) ID() scheduler.ID {
return scheduler.ID(t.Task.ID)
}
// Schedule takes the time a Task is scheduled for and returns a Schedule object
func (t SchedulableTask) Schedule() scheduler.Schedule {
// TODO (al): use new scheduler's NewSchedule method
return scheduler.Schedule{}
}
// Offset returns a time.Duration for the Task's offset property
func (t SchedulableTask) Offset() time.Duration {
offset, _ := t.OffsetDuration()
return offset
}
// LastScheduled parses the task's LatestCompleted value as a Time object
func (t SchedulableTask) LastScheduled() time.Time {
tm, _ := t.LatestCompletedTime()
return tm
}
func WithLimitOpt(i int) CoordinatorOption {
return func(c *TaskCoordinator) {
c.limit = i
}
}
// NewSchedulableTask transforms an influxdb task to a schedulable task type
func NewSchedulableTask(task *influxdb.Task) (SchedulableTask, error) {
if offset, err := task.OffsetDuration(); offset != time.Duration(0) && err != nil {
return SchedulableTask{}, errors.New("could not create schedulable task: offset duration could not be parsed")
}
if _, err := task.LatestCompletedTime(); err != nil {
return SchedulableTask{}, errors.New("could not create schedulable task: latest completed time could not be parsed")
}
t := SchedulableTask{task}
return t, nil
}
func NewCoordinator(logger *zap.Logger, scheduler scheduler.Scheduler, executor Executor, opts ...CoordinatorOption) *TaskCoordinator {
c := &TaskCoordinator{
logger: logger,
sch: scheduler,
ex: executor,
limit: DefaultLimit,
}
for _, opt := range opts {
opt(c)
}
return c
}
// TaskCreated asks the Scheduler to schedule the newly created task
func (c *TaskCoordinator) TaskCreated(ctx context.Context, task *influxdb.Task) error {
t, err := NewSchedulableTask(task)
if err != nil {
return err
}
// func new schedulable task
// catch errors from offset and last scheduled
if err = c.sch.Schedule(t); err != nil {
return err
}
return nil
}
// TaskUpdated releases the task if it is being disabled, and schedules it otherwise
func (c *TaskCoordinator) TaskUpdated(ctx context.Context, from, to *influxdb.Task) error {
sid := scheduler.ID(to.ID)
t, err := NewSchedulableTask(to)
if err != nil {
return err
}
// if disabling the task, release it before schedule update
if to.Status != from.Status && to.Status == string(backend.TaskInactive) {
if err := c.sch.Release(sid); err != nil && err != influxdb.ErrTaskNotClaimed {
return err
}
} else {
if err := c.sch.Schedule(t); err != nil {
return err
}
}
return nil
}
//TaskDeleted asks the Scheduler to release the deleted task
func (c *TaskCoordinator) TaskDeleted(ctx context.Context, id influxdb.ID) error {
tid := scheduler.ID(id)
if err := c.sch.Release(tid); err != nil && err != influxdb.ErrTaskNotClaimed {
return err
}
return nil
}
// RunCancelled speaks directly to the executor to cancel a task run
func (c *TaskCoordinator) RunCancelled(ctx context.Context, runID influxdb.ID) error {
err := c.ex.Cancel(ctx, runID)
return err
}
// RunRetried speaks directly to the executor to re-try a task run immediately
func (c *TaskCoordinator) RunRetried(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
promise, err := c.ex.ManualRun(ctx, task.ID, run.ID)
if err != nil {
return influxdb.ErrRunExecutionError(err)
}
<-promise.Done()
if err = promise.Error(); err != nil {
return err
}
return nil
}
// RunForced speaks directly to the Executor to run a task immediately
func (c *TaskCoordinator) RunForced(ctx context.Context, task *influxdb.Task, run *influxdb.Run) error {
promise, err := c.ex.ManualRun(ctx, task.ID, run.ID)
if err != nil {
return influxdb.ErrRunExecutionError(err)
}
<-promise.Done()
if err = promise.Error(); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,114 @@
package coordinator
import (
"context"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/task/backend/scheduler"
)
var _ Executor = (*executorE)(nil)
type (
executorE struct {
calls []interface{}
}
manualRunCall struct {
TaskID influxdb.ID
RunID influxdb.ID
}
cancelCallC struct {
RunID influxdb.ID
}
)
type (
schedulerC struct {
scheduler.Scheduler
calls []interface{}
}
scheduleCall struct {
Task scheduler.Schedulable
}
releaseCallC struct {
TaskID scheduler.ID
}
)
type (
promise struct {
run *influxdb.Run
done chan struct{}
err error
ctx context.Context
cancelFunc context.CancelFunc
}
)
// ID is the id of the run that was created
func (p *promise) ID() influxdb.ID {
return p.run.ID
}
// Cancel is used to cancel a executing query
func (p *promise) Cancel(ctx context.Context) {
// call cancelfunc
p.cancelFunc()
// wait for ctx.Done or p.Done
select {
case <-p.Done():
case <-ctx.Done():
}
}
// Done provides a channel that closes on completion of a promise
func (p *promise) Done() <-chan struct{} {
return p.done
}
// Error returns the error resulting from a run execution.
// If the execution is not complete error waits on Done().
func (p *promise) Error() error {
<-p.done
return p.err
}
func (s *schedulerC) Schedule(task scheduler.Schedulable) error {
s.calls = append(s.calls, scheduleCall{task})
return nil
}
func (s *schedulerC) Release(taskID scheduler.ID) error {
s.calls = append(s.calls, releaseCallC{taskID})
return nil
}
func (e *executorE) ManualRun(ctx context.Context, id influxdb.ID, runID influxdb.ID) (executor.Promise, error) {
e.calls = append(e.calls, manualRunCall{id, runID})
ctx, cancel := context.WithCancel(ctx)
p := promise{
done: make(chan struct{}),
ctx: ctx,
cancelFunc: cancel,
}
close(p.done)
err := p.Error()
return &p, err
}
func (e *executorE) Cancel(ctx context.Context, runID influxdb.ID) error {
e.calls = append(e.calls, cancelCallC{runID})
return nil
}

View File

@ -0,0 +1,227 @@
package coordinator
import (
"context"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend/scheduler"
"go.uber.org/zap"
)
func Test_Coordinator_Executor_Methods(t *testing.T) {
var (
one = influxdb.ID(1)
taskOne = &influxdb.Task{ID: one}
timeString = time.Now().Format(time.RFC3339)
runOne = &influxdb.Run{
ID: one,
TaskID: one,
ScheduledFor: timeString,
}
allowUnexported = cmp.AllowUnexported(executorE{}, schedulerC{})
)
for _, test := range []struct {
name string
claimErr error
updateErr error
releaseErr error
call func(*testing.T, *TaskCoordinator)
executor *executorE
}{
{
name: "RunForced",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.RunForced(context.Background(), taskOne, runOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
executor: &executorE{
calls: []interface{}{
manualRunCall{taskOne.ID, runOne.ID},
},
},
},
{
name: "RunRetried",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.RunRetried(context.Background(), taskOne, runOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
executor: &executorE{
calls: []interface{}{
manualRunCall{taskOne.ID, runOne.ID},
},
},
},
{
name: "RunCancelled",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.RunCancelled(context.Background(), runOne.ID); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
executor: &executorE{
calls: []interface{}{
cancelCallC{runOne.ID},
},
},
},
} {
t.Run(test.name, func(t *testing.T) {
var (
executor = &executorE{}
scheduler = &schedulerC{}
coord = NewCoordinator(zap.NewNop(), scheduler, executor)
)
test.call(t, coord)
if diff := cmp.Diff(
test.executor.calls,
executor.calls,
allowUnexported); diff != "" {
t.Errorf("unexpected executor contents %s", diff)
}
})
}
}
func Test_Coordinator_Scheduler_Methods(t *testing.T) {
var (
one = influxdb.ID(1)
two = influxdb.ID(2)
three = influxdb.ID(3)
now = time.Now().Format(time.RFC3339Nano)
taskOne = &influxdb.Task{ID: one, CreatedAt: now}
taskTwo = &influxdb.Task{ID: two, Status: "active", CreatedAt: now}
taskTwoInactive = &influxdb.Task{ID: two, Status: "inactive", CreatedAt: now}
taskThreeOriginal = &influxdb.Task{
ID: three,
Status: "active",
Name: "Previous",
CreatedAt: now,
}
taskThreeNew = &influxdb.Task{
ID: three,
Status: "active",
Name: "Renamed",
CreatedAt: now,
}
schedulableT = SchedulableTask{taskOne}
schedulableTaskTwo = SchedulableTask{taskTwo}
schedulableTaskThree = SchedulableTask{taskThreeNew}
timeString = time.Now().Format(time.RFC3339)
runOne = &influxdb.Run{
ID: one,
TaskID: one,
ScheduledFor: timeString,
}
allowUnexported = cmp.AllowUnexported(executorE{}, schedulerC{})
)
for _, test := range []struct {
name string
claimErr error
updateErr error
releaseErr error
call func(*testing.T, *TaskCoordinator)
scheduler *schedulerC
}{
{
name: "TaskCreated",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.TaskCreated(context.Background(), taskOne); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &schedulerC{
calls: []interface{}{
scheduleCall{schedulableT},
},
},
},
{
name: "TaskUpdated - deactivate task",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.TaskUpdated(context.Background(), taskTwo, taskTwoInactive); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &schedulerC{
calls: []interface{}{
releaseCallC{scheduler.ID(taskTwo.ID)},
},
},
},
{
name: "TaskUpdated - activate task",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.TaskUpdated(context.Background(), taskTwoInactive, taskTwo); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &schedulerC{
calls: []interface{}{
scheduleCall{schedulableTaskTwo},
},
},
},
{
name: "TaskUpdated - change name",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.TaskUpdated(context.Background(), taskThreeOriginal, taskThreeNew); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &schedulerC{
calls: []interface{}{
scheduleCall{schedulableTaskThree},
},
},
},
{
name: "TaskDeleted",
call: func(t *testing.T, c *TaskCoordinator) {
if err := c.TaskDeleted(context.Background(), runOne.ID); err != nil {
t.Errorf("expected nil error found %q", err)
}
},
scheduler: &schedulerC{
calls: []interface{}{
releaseCallC{scheduler.ID(taskOne.ID)},
},
},
},
} {
t.Run(test.name, func(t *testing.T) {
var (
executor = &executorE{}
scheduler = &schedulerC{}
coord = NewCoordinator(zap.NewNop(), scheduler, executor)
)
test.call(t, coord)
if diff := cmp.Diff(
test.scheduler.calls,
scheduler.calls,
allowUnexported); diff != "" {
t.Errorf("unexpected scheduler contents %s", diff)
}
})
}
}

View File

@ -0,0 +1,41 @@
package backend
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend/scheduler"
)
var _ scheduler.SchedulableService = (*SchedulableTaskService)(nil)
// UpdateTaskService provides an API to update the LatestScheduled time of a task
type UpdateTaskService interface {
UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error)
}
// SchedulableTaskService implements the SchedulableService interface
type SchedulableTaskService struct {
UpdateTaskService
}
// NewSchedulableTaskService initializes a new SchedulableTaskService given an UpdateTaskService
func NewSchedulableTaskService(ts UpdateTaskService) SchedulableTaskService {
return SchedulableTaskService{ts}
}
// UpdateLastScheduled uses the task service to store the latest time a task was scheduled to run
func (s SchedulableTaskService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error {
tm := t.Format(time.RFC3339)
tid := influxdb.ID(id)
_, err := s.UpdateTask(ctx, tid, influxdb.TaskUpdate{
LatestCompleted: &tm,
})
if err != nil {
return fmt.Errorf("could not update last scheduled for task; Err: %v", err)
}
return nil
}

View File

@ -0,0 +1,46 @@
package backend
import (
"context"
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend/scheduler"
)
var (
mockTaskID = influxdb.ID(1)
mockTimeNow = time.Now()
mockTimeNowStr = time.Now().Format(time.RFC3339Nano)
)
func (m MockTaskService) UpdateTask(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) {
return &influxdb.Task{ID: id, UpdatedAt: mockTimeNowStr}, nil
}
type MockTaskService struct{}
func Test_Schedulable_Task_Service(t *testing.T) {
for _, test := range []struct {
name string
task *influxdb.Task
}{
{
name: "Create New Schedulable Task Service",
task: taskOne,
},
} {
t.Run(test.name, func(t *testing.T) {
ts := MockTaskService{}
schedulableService := NewSchedulableTaskService(ts)
err := schedulableService.UpdateLastScheduled(context.Background(), scheduler.ID(mockTaskID), mockTimeNow)
if err != nil {
t.Fatalf("expected nil error, got: %v", err)
}
})
}
}

View File

@ -3,14 +3,12 @@ package scheduler
import (
"context"
"time"
"github.com/influxdata/influxdb"
)
// ID duplicates the influxdb ID so users of the scheduler don't have to
// import influxdb for the id.
// TODO(lh): maybe make this its own thing sometime in the future.
type ID influxdb.ID
type ID uint64
// Executor is a system used by the scheduler to actually execute the scheduleable item.
type Executor interface {
@ -23,8 +21,7 @@ type Executor interface {
Execute(ctx context.Context, id ID, scheduledAt time.Time) error
}
// Schedulable is the interface that encapsulates work that
// is to be executed on a specified schedule.
// Schedulable is the interface that encapsulates the state that is required to schedule a job.
type Schedulable interface {
// ID is the unique identifier for this Schedulable
ID() ID
@ -41,10 +38,14 @@ type Schedulable interface {
// LastScheduled specifies last time this Schedulable was queued
// for execution.
LastScheduled() time.Time
}
// SchedulableService encapsulates the work necessary to schedule a job
type SchedulableService interface {
// UpdateLastScheduled notifies the instance that it was scheduled for
// execution at the specified time
UpdateLastScheduled(time.Time)
UpdateLastScheduled(ctx context.Context, id ID, t time.Time) error
}
type Schedule struct {

View File

@ -189,7 +189,7 @@ func ErrRunExecutionError(err error) *Error {
return &Error{
Code: EInternal,
Msg: fmt.Sprintf("could not execute task run; Err: %v", err),
Op: "kv/taskScheduler",
Op: "kv/taskExecutor",
Err: err,
}
}