Update task coordinator to act as a TaskService middleware (#13068)

* Update task coordinator to act as a TaskService middleware
pull/13071/head
Lyon Hill 2019-04-01 16:45:44 -06:00 committed by GitHub
parent 7a6107e70a
commit 83000361c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 186 additions and 140 deletions

View File

@ -554,7 +554,8 @@ func (m *Launcher) run(ctx context.Context) (err error) {
m.scheduler.Start(ctx)
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, store), lr, m.scheduler, authSvc, userResourceSvc, orgSvc)
taskSvc = task.PlatformAdapter(store, lr, m.scheduler, authSvc, userResourceSvc, orgSvc)
taskSvc = coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, taskSvc)
taskSvc = task.NewValidator(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc)
m.taskStore = store
}

View File

@ -10,7 +10,7 @@ import (
)
type Coordinator struct {
backend.Store
platform.TaskService
logger *zap.Logger
sch backend.Scheduler
@ -26,12 +26,12 @@ func WithLimit(i int) Option {
}
}
func New(logger *zap.Logger, scheduler backend.Scheduler, st backend.Store, opts ...Option) *Coordinator {
func New(logger *zap.Logger, scheduler backend.Scheduler, ts platform.TaskService, opts ...Option) *Coordinator {
c := &Coordinator{
logger: logger,
sch: scheduler,
Store: st,
limit: 1000,
logger: logger,
sch: scheduler,
TaskService: ts,
limit: 1000,
}
for _, opt := range opts {
@ -45,7 +45,7 @@ func New(logger *zap.Logger, scheduler backend.Scheduler, st backend.Store, opts
// claimExistingTasks is called on startup to claim all tasks in the store.
func (c *Coordinator) claimExistingTasks() {
tasks, err := c.Store.ListTasks(context.Background(), backend.TaskSearchParams{})
tasks, _, err := c.TaskService.FindTasks(context.Background(), platform.TaskFilter{})
if err != nil {
c.logger.Error("failed to list tasks", zap.Error(err))
return
@ -53,24 +53,19 @@ func (c *Coordinator) claimExistingTasks() {
for len(tasks) > 0 {
for _, task := range tasks {
if task.Meta.Status != string(backend.TaskActive) {
if task.Status != string(backend.TaskActive) {
// Don't claim inactive tasks at startup.
continue
}
t, err := backend.ToInfluxTask(&task.Task, &task.Meta)
if err != nil {
continue
}
// I may need a context with an auth here
if err := c.sch.ClaimTask(context.Background(), t); err != nil {
if err := c.sch.ClaimTask(context.Background(), task); err != nil {
c.logger.Error("failed claim task", zap.Error(err))
continue
}
}
tasks, err = c.Store.ListTasks(context.Background(), backend.TaskSearchParams{
After: tasks[len(tasks)-1].Task.ID,
tasks, _, err = c.TaskService.FindTasks(context.Background(), platform.TaskFilter{
After: &tasks[len(tasks)-1].ID,
})
if err != nil {
c.logger.Error("failed list additional tasks", zap.Error(err))
@ -79,111 +74,93 @@ func (c *Coordinator) claimExistingTasks() {
}
}
func (c *Coordinator) CreateTask(ctx context.Context, req backend.CreateTaskRequest) (platform.ID, error) {
id, err := c.Store.CreateTask(ctx, req)
func (c *Coordinator) CreateTask(ctx context.Context, t platform.TaskCreate) (*platform.Task, error) {
task, err := c.TaskService.CreateTask(ctx, t)
if err != nil {
return id, err
return task, err
}
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, id)
if err != nil {
return id, err
}
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return id, err
}
if err := c.sch.ClaimTask(ctx, t); err != nil {
_, delErr := c.Store.DeleteTask(ctx, id)
if err := c.sch.ClaimTask(ctx, task); err != nil {
delErr := c.TaskService.DeleteTask(ctx, task.ID)
if delErr != nil {
return id, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
return task, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
}
return id, err
return task, err
}
return id, nil
return task, nil
}
func (c *Coordinator) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (backend.UpdateTaskResult, error) {
res, err := c.Store.UpdateTask(ctx, req)
func (c *Coordinator) UpdateTask(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
task, err := c.TaskService.UpdateTask(ctx, id, upd)
if err != nil {
return res, err
}
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, req.ID)
if err != nil {
return res, err
return task, err
}
// If disabling the task, do so before modifying the script.
if req.Status == backend.TaskInactive && res.OldStatus != backend.TaskInactive {
if err := c.sch.ReleaseTask(req.ID); err != nil && err != backend.ErrTaskNotClaimed {
return res, err
if task.Status == string(backend.TaskInactive) {
if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed {
return task, err
}
}
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return res, err
}
if err := c.sch.UpdateTask(ctx, t); err != nil && err != backend.ErrTaskNotClaimed {
return res, err
if err := c.sch.UpdateTask(ctx, task); err != nil && err != backend.ErrTaskNotClaimed {
return task, err
}
// If enabling the task, claim it after modifying the script.
if req.Status == backend.TaskActive {
if err := c.sch.ClaimTask(ctx, t); err != nil && err != backend.ErrTaskAlreadyClaimed {
return res, err
if task.Status == string(backend.TaskActive) {
if err := c.sch.ClaimTask(ctx, task); err != nil && err != backend.ErrTaskAlreadyClaimed {
return task, err
}
}
return res, nil
return task, nil
}
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) {
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) error {
if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed {
return false, err
return err
}
return c.Store.DeleteTask(ctx, id)
return c.TaskService.DeleteTask(ctx, id)
}
func (c *Coordinator) DeleteOrg(ctx context.Context, orgID platform.ID) error {
orgTasks, err := c.Store.ListTasks(ctx, backend.TaskSearchParams{
Org: orgID,
})
func (c *Coordinator) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
err := c.sch.CancelRun(ctx, taskID, runID)
if err != nil {
return err
}
for _, orgTask := range orgTasks {
if err := c.sch.ReleaseTask(orgTask.Task.ID); err != nil {
return err
}
// TODO(lh): Im not sure if we need to call the task service here directly or if the scheduler does that
// for now we will do it and then if it causes errors we can opt to do it in the scheduler only
return c.TaskService.CancelRun(ctx, taskID, runID)
}
func (c *Coordinator) RetryRun(ctx context.Context, taskID, runID platform.ID) (*platform.Run, error) {
task, err := c.TaskService.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
return c.Store.DeleteOrg(ctx, orgID)
}
func (c *Coordinator) CancelRun(ctx context.Context, taskID, runID platform.ID) error {
return c.sch.CancelRun(ctx, taskID, runID)
}
func (c *Coordinator) ManuallyRunTimeRange(ctx context.Context, taskID platform.ID, start, end, requestedAt int64) (*backend.StoreTaskMetaManualRun, error) {
r, err := c.Store.ManuallyRunTimeRange(ctx, taskID, start, end, requestedAt)
r, err := c.TaskService.RetryRun(ctx, taskID, runID)
if err != nil {
return r, err
}
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return nil, err
}
return r, c.sch.UpdateTask(ctx, t)
return r, c.sch.UpdateTask(ctx, task)
}
func (c *Coordinator) ForceRun(ctx context.Context, taskID platform.ID, scheduledFor int64) (*platform.Run, error) {
task, err := c.TaskService.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
r, err := c.TaskService.ForceRun(ctx, taskID, scheduledFor)
if err != nil {
return r, err
}
return r, c.sch.UpdateTask(ctx, task)
}

View File

@ -3,15 +3,17 @@ package coordinator_test
import (
"context"
"errors"
"sync"
"testing"
"time"
platform "github.com/influxdata/influxdb"
pmock "github.com/influxdata/influxdb/mock"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/coordinator"
"github.com/influxdata/influxdb/task/mock"
platformtesting "github.com/influxdata/influxdb/testing"
"go.uber.org/zap/zaptest"
)
@ -26,50 +28,123 @@ func timeoutSelector(ch <-chan *platform.Task) (*platform.Task, error) {
const script = `option task = {name: "a task",cron: "* * * * *"} from(bucket:"test") |> range(start:-1h)`
// TODO(lh): Once we have a kv.TaskService this entire part can be replaced with kv.TaskService using a inmem kv.Store
func inmemTaskService() platform.TaskService {
gen := snowflake.NewDefaultIDGenerator()
tasks := map[platform.ID]*platform.Task{}
mu := sync.Mutex{}
ts := &pmock.TaskService{
CreateTaskFn: func(ctx context.Context, tc platform.TaskCreate) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
id := gen.ID()
task := &platform.Task{ID: id, Flux: tc.Flux, Status: tc.Status, OrganizationID: tc.OrganizationID, Organization: tc.Organization}
if task.Status == "" {
task.Status = string(backend.TaskActive)
}
tasks[id] = task
return tasks[id], nil
},
DeleteTaskFn: func(ctx context.Context, id platform.ID) error {
mu.Lock()
defer mu.Unlock()
delete(tasks, id)
return nil
},
UpdateTaskFn: func(ctx context.Context, id platform.ID, upd platform.TaskUpdate) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, backend.ErrTaskNotFound
}
if upd.Flux != nil {
t.Flux = *upd.Flux
}
if upd.Status != nil {
t.Status = *upd.Status
}
return t, nil
},
FindTaskByIDFn: func(ctx context.Context, id platform.ID) (*platform.Task, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, backend.ErrTaskNotFound
}
return t, nil
},
FindTasksFn: func(ctx context.Context, tf platform.TaskFilter) ([]*platform.Task, int, error) {
mu.Lock()
defer mu.Unlock()
if tf.After != nil {
return []*platform.Task{}, 0, nil
}
rtn := []*platform.Task{}
for _, task := range tasks {
rtn = append(rtn, task)
}
return rtn, len(rtn), nil
},
ForceRunFn: func(ctx context.Context, id platform.ID, scheduledFor int64) (*platform.Run, error) {
mu.Lock()
defer mu.Unlock()
t, ok := tasks[id]
if !ok {
return nil, backend.ErrTaskNotFound
}
return &platform.Run{ID: id, TaskID: t.ID, ScheduledFor: time.Unix(scheduledFor, 0).Format(time.RFC3339)}, nil
},
}
return ts
}
func TestCoordinator(t *testing.T) {
st := backend.NewInMemStore()
ts := inmemTaskService()
sched := mock.NewScheduler()
coord := coordinator.New(zaptest.NewLogger(t), sched, st)
coord := coordinator.New(zaptest.NewLogger(t), sched, ts)
createChan := sched.TaskCreateChan()
releaseChan := sched.TaskReleaseChan()
updateChan := sched.TaskUpdateChan()
orgID := platformtesting.MustIDBase16("69746f7175650d0a")
authzID := platform.ID(123456)
id, err := coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, AuthorizationID: authzID, Script: script})
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
if err != nil {
t.Fatal(err)
}
task, err := timeoutSelector(createChan)
createdTask, err := timeoutSelector(createChan)
if err != nil {
t.Fatal(err)
}
if task.ID != createdTask.ID {
t.Fatal("task given to scheduler not the same as task created")
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
deleted, err := coord.DeleteTask(context.Background(), id)
if err != nil {
if err := coord.DeleteTask(context.Background(), task.ID); err != nil {
t.Fatal(err)
}
if !deleted {
t.Fatal("no error and not deleted")
}
task, err = timeoutSelector(releaseChan)
if err != nil {
t.Fatal(err)
}
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
t.Fatal("task sent to scheduler doesn't match task created")
}
id, err = coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, AuthorizationID: authzID, Script: script})
task, err = coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
if err != nil {
t.Fatal(err)
}
@ -78,23 +153,20 @@ func TestCoordinator(t *testing.T) {
if err != nil {
t.Fatal(err)
}
res, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive})
inactive := string(backend.TaskInactive)
res, err := coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &inactive})
if err != nil {
t.Fatal(err)
}
// Only validating res on the first update.
if res.NewTask.ID != id {
t.Fatalf("unexpected ID on update result: got %s, want %s", res.NewTask.ID.String(), id.String())
if res.ID != task.ID {
t.Fatalf("unexpected ID on update result: got %s, want %s", res.ID.String(), task.ID.String())
}
if res.NewTask.Script != script {
t.Fatalf("unexpected script on update result: got %q, want %q", res.NewTask.Script, script)
if res.Flux != task.Flux {
t.Fatalf("unexpected script on update result: got %q, want %q", res.Flux, task.Flux)
}
if res.NewMeta.Status != string(backend.TaskInactive) {
t.Fatalf("unexpected meta status on update result: got %q, want %q", res.NewMeta.Status, backend.TaskInactive)
}
if res.OldStatus != backend.TaskActive {
t.Fatalf("unexpected old status on update result: got %q, want %q", res.OldStatus, backend.TaskActive)
if res.Status != inactive {
t.Fatalf("unexpected meta status on update result: got %q, want %q", res.Status, inactive)
}
task, err = timeoutSelector(releaseChan)
@ -106,7 +178,8 @@ func TestCoordinator(t *testing.T) {
t.Fatal("task sent to scheduler doesnt match task created")
}
if _, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskActive}); err != nil {
active := string(backend.TaskActive)
if _, err := coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Status: &active}); err != nil {
t.Fatal(err)
}
@ -120,7 +193,7 @@ func TestCoordinator(t *testing.T) {
}
newScript := `option task = {name: "a task",cron: "1 * * * *"} from(bucket:"test") |> range(start:-2h)`
if _, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Script: newScript}); err != nil {
if _, err := coord.UpdateTask(context.Background(), task.ID, platform.TaskUpdate{Flux: &newScript}); err != nil {
t.Fatal(err)
}
@ -135,23 +208,23 @@ func TestCoordinator(t *testing.T) {
}
func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
st := backend.NewInMemStore()
ts := inmemTaskService()
sched := mock.NewScheduler()
coord := coordinator.New(zaptest.NewLogger(t), sched, st)
coord := coordinator.New(zaptest.NewLogger(t), sched, ts)
// Create an isolated task directly through the store so the coordinator doesn't know about it.
id, err := st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, AuthorizationID: 3, Script: script})
task, err := ts.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
if err != nil {
t.Fatal(err)
}
// Deleting the task through the coordinator should succeed.
if _, err := coord.DeleteTask(context.Background(), id); err != nil {
if err := coord.DeleteTask(context.Background(), task.ID); err != nil {
t.Fatal(err)
}
if _, err := st.FindTaskByID(context.Background(), id); err != backend.ErrTaskNotFound {
if _, err := ts.FindTaskByID(context.Background(), task.ID); err != backend.ErrTaskNotFound {
t.Fatalf("expected deleted task not to be found; got %v", err)
}
}
@ -161,7 +234,7 @@ func TestCoordinator_ClaimExistingTasks(t *testing.T) {
t.Skip("skipping in short mode")
}
st := backend.NewInMemStore()
ts := inmemTaskService()
sched := mock.NewScheduler()
createChan := sched.TaskCreateChan()
@ -171,18 +244,18 @@ func TestCoordinator_ClaimExistingTasks(t *testing.T) {
createdIDs := make([]platform.ID, numTasks)
for i := 0; i < numTasks; i++ {
ctr := backend.CreateTaskRequest{Org: 1, AuthorizationID: 3, Script: script}
ctr := platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script}
if i == inactiveTaskIndex {
ctr.Status = backend.TaskInactive
ctr.Status = string(backend.TaskInactive)
}
id, err := st.CreateTask(context.Background(), ctr)
task, err := ts.CreateTask(context.Background(), ctr)
if err != nil {
t.Fatal(err)
}
createdIDs[i] = id
createdIDs[i] = task.ID
}
coordinator.New(zaptest.NewLogger(t), sched, st)
coordinator.New(zaptest.NewLogger(t), sched, ts)
const expectedCreatedTasks = numTasks - 1 // -1 to skip the single inactive task.
for i := 0; i < expectedCreatedTasks; i++ {
@ -204,21 +277,26 @@ func TestCoordinator_ClaimExistingTasks(t *testing.T) {
}
}
func TestCoordinator_ManuallyRunTimeRange(t *testing.T) {
st := backend.NewInMemStore()
func TestCoordinator_ForceRun(t *testing.T) {
ts := inmemTaskService()
sched := mock.NewScheduler()
coord := coordinator.New(zaptest.NewLogger(t), sched, st)
coord := coordinator.New(zaptest.NewLogger(t), sched, ts)
// Create an isolated task directly through the store so the coordinator doesn't know about it.
id, err := coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, AuthorizationID: 3, Script: script})
task, err := coord.CreateTask(context.Background(), platform.TaskCreate{OrganizationID: 1, Token: "token", Flux: script})
if err != nil {
t.Fatal(err)
}
task, err = coord.FindTaskByID(context.Background(), task.ID)
if err != nil {
t.Fatal(err)
}
ch := sched.TaskUpdateChan()
manualRunTime := time.Now().Unix()
if _, err := coord.ManuallyRunTimeRange(context.Background(), id, manualRunTime, manualRunTime, manualRunTime); err != nil {
if _, err := coord.ForceRun(context.Background(), task.ID, manualRunTime); err != nil {
t.Fatal(err)
}
@ -226,16 +304,6 @@ func TestCoordinator_ManuallyRunTimeRange(t *testing.T) {
case <-ch:
// great!
case <-time.After(time.Second):
t.Fatal("didnt receive task update in time")
t.Fatal("didn't receive task update in time")
}
_, meta, err := st.FindTaskByIDWithMeta(context.Background(), id)
if err != nil {
t.Fatal(err)
}
if len(meta.ManualRuns) != 1 {
t.Fatal("failed to update store")
}
}