diff --git a/task/backend/bolt/bolt.go b/task/backend/bolt/bolt.go index c36abd892e..cb88ec40cb 100644 --- a/task/backend/bolt/bolt.go +++ b/task/backend/bolt/bolt.go @@ -25,6 +25,7 @@ import ( bolt "github.com/coreos/bbolt" "github.com/influxdata/platform" "github.com/influxdata/platform/task/backend" + "github.com/influxdata/platform/task/options" ) // ErrDBReadOnly is an error for when the database is set to read only. @@ -195,31 +196,86 @@ func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) ( return id, nil } -// ModifyTask changes a task with a new script, it should error if the task does not exist. -func (s *Store) ModifyTask(ctx context.Context, id platform.ID, newScript string) error { - op, err := backend.StoreValidator.ModifyArgs(id, newScript) +func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (backend.UpdateTaskResult, error) { + var res backend.UpdateTaskResult + op, err := backend.StoreValidator.UpdateArgs(req) if err != nil { - return err + return res, err } - return s.db.Update(func(tx *bolt.Tx) error { + encodedID, err := req.ID.Encode() + if err != nil { + return res, err + } + + err = s.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(s.bucket) bt := b.Bucket(tasksPath) - encodedID, err := id.Encode() - if err != nil { + v := bt.Get(encodedID) + if v == nil { + return backend.ErrTaskNotFound + } + res.OldScript = string(v) + + newScript := req.Script + if req.Script == "" { + // Need to build op from existing script. + op, err = options.FromScript(string(v)) + if err != nil { + return err + } + newScript = string(v) + } else { + if err := bt.Put(encodedID, []byte(req.Script)); err != nil { + return err + } + if err := b.Bucket(nameByTaskID).Put(encodedID, []byte(op.Name)); err != nil { + return err + } + } + + var userID, orgID platform.ID + if err := userID.Decode(b.Bucket(userByTaskID).Get(encodedID)); err != nil { return err } - if v := bt.Get(encodedID); v == nil { - return backend.ErrTaskNotFound - } - err = bt.Put(encodedID, []byte(newScript)) - if err != nil { + if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil { return err } - return b.Bucket(nameByTaskID).Put(encodedID, []byte(op.Name)) + + stmBytes := b.Bucket(taskMetaPath).Get(encodedID) + if stmBytes == nil { + return backend.ErrTaskNotFound + } + var stm backend.StoreTaskMeta + if err := stm.Unmarshal(stmBytes); err != nil { + return err + } + res.OldStatus = backend.TaskStatus(stm.Status) + if req.Status != "" { + stm.Status = string(req.Status) + stmBytes, err = stm.Marshal() + if err != nil { + return err + } + if err := b.Bucket(taskMetaPath).Put(encodedID, stmBytes); err != nil { + return err + } + } + res.NewMeta = stm + + res.NewTask = backend.StoreTask{ + ID: req.ID, + Org: orgID, + User: userID, + Name: op.Name, + Script: newScript, + } + + return nil }) + return res, err } // ListTasks lists the tasks based on a filter. @@ -499,58 +555,6 @@ func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*back }, &stm, nil } -func (s *Store) EnableTask(ctx context.Context, id platform.ID) error { - encodedID, err := id.Encode() - if err != nil { - return err - } - return s.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket).Bucket(taskMetaPath) - stmBytes := b.Get(encodedID) - if stmBytes == nil { - return errors.New("task meta not found") - } - stm := backend.StoreTaskMeta{} - err := stm.Unmarshal(stmBytes) - if err != nil { - return err - } - stm.Status = string(backend.TaskActive) - stmBytes, err = stm.Marshal() - if err != nil { - return err - } - - return b.Put(encodedID, stmBytes) - }) -} - -func (s *Store) DisableTask(ctx context.Context, id platform.ID) error { - encodedID, err := id.Encode() - if err != nil { - return err - } - return s.db.Update(func(tx *bolt.Tx) error { - b := tx.Bucket(s.bucket).Bucket(taskMetaPath) - stmBytes := b.Get(encodedID) - if stmBytes == nil { - return errors.New("task meta not found") - } - stm := backend.StoreTaskMeta{} - err := stm.Unmarshal(stmBytes) - if err != nil { - return err - } - stm.Status = string(backend.TaskInactive) - stmBytes, err = stm.Marshal() - if err != nil { - return err - } - - return b.Put(encodedID, stmBytes) - }) -} - // DeleteTask deletes the task. func (s *Store) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) { encodedID, err := id.Encode() diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index 51965d431c..6459a8c1e8 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -60,50 +60,40 @@ func (c *Coordinator) CreateTask(ctx context.Context, req backend.CreateTaskRequ return id, nil } -func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript string) error { - if err := c.Store.ModifyTask(ctx, id, newScript); err != nil { - return err - } - - task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, id) +func (c *Coordinator) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (backend.UpdateTaskResult, error) { + res, err := c.Store.UpdateTask(ctx, req) if err != nil { - return err + return res, err } - if err := c.sch.UpdateTask(task, meta); err != nil { - return err - } - - return nil -} - -func (c *Coordinator) EnableTask(ctx context.Context, id platform.ID) error { - if err := c.Store.EnableTask(ctx, id); err != nil { - return err - } - - task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, id) + task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, req.ID) if err != nil { - return err + return res, err } - if err := c.sch.ClaimTask(task, meta); err != nil { - return err + // If disabling the task, do so before modifying the script. + if req.Status == backend.TaskInactive && res.OldStatus != backend.TaskInactive { + if err := c.sch.ReleaseTask(req.ID); err != nil && err != backend.ErrTaskNotClaimed { + return res, err + } } - return nil -} - -func (c *Coordinator) DisableTask(ctx context.Context, id platform.ID) error { - if err := c.Store.DisableTask(ctx, id); err != nil { - return err + if err := c.sch.UpdateTask(task, meta); err != nil && err != backend.ErrTaskNotClaimed { + return res, err } - return c.sch.ReleaseTask(id) + // If enabling the task, claim it after modifying the script. + if req.Status == backend.TaskActive { + if err := c.sch.ClaimTask(task, meta); err != nil && err != backend.ErrTaskAlreadyClaimed { + return res, err + } + } + + return res, nil } func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) { - if err := c.sch.ReleaseTask(id); err != nil { + if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed { return false, err } diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index 59372d6489..e75010553c 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -22,6 +22,8 @@ func timeoutSelector(ch <-chan *mock.Task) (*mock.Task, error) { } } +const script = `option task = {name: "a task",cron: "* * * * *"} from(bucket:"test") |> range(start:-1h)` + func TestCoordinator(t *testing.T) { st := backend.NewInMemStore() sched := mock.NewScheduler() @@ -33,7 +35,6 @@ func TestCoordinator(t *testing.T) { orgID := platformtesting.MustIDBase16("69746f7175650d0a") usrID := platformtesting.MustIDBase16("6c61757320657420") - script := `option task = {name: "a task",cron: "* * * * *"} from(bucket:"test") |> range(start:-1h)` id, err := coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: usrID, Script: script}) if err != nil { t.Fatal(err) @@ -76,10 +77,23 @@ func TestCoordinator(t *testing.T) { t.Fatal(err) } - err = coord.DisableTask(context.Background(), id) + res, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive}) if err != nil { t.Fatal(err) } + // Only validating res on the first update. + if res.NewTask.ID != id { + t.Fatalf("unexpected ID on update result: got %s, want %s", res.NewTask.ID.String(), id.String()) + } + if res.NewTask.Script != script { + t.Fatalf("unexpected script on update result: got %q, want %q", res.NewTask.Script, script) + } + if res.NewMeta.Status != string(backend.TaskInactive) { + t.Fatalf("unexpected meta status on update result: got %q, want %q", res.NewMeta.Status, backend.TaskInactive) + } + if res.OldStatus != backend.TaskActive { + t.Fatalf("unexpected old status on update result: got %q, want %q", res.OldStatus, backend.TaskActive) + } task, err = timeoutSelector(releaseChan) if err != nil { @@ -90,8 +104,7 @@ func TestCoordinator(t *testing.T) { t.Fatal("task sent to scheduler doesnt match task created") } - err = coord.EnableTask(context.Background(), id) - if err != nil { + if _, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskActive}); err != nil { t.Fatal(err) } @@ -105,8 +118,7 @@ func TestCoordinator(t *testing.T) { } newScript := `option task = {name: "a task",cron: "1 * * * *"} from(bucket:"test") |> range(start:-2h)` - err = coord.ModifyTask(context.Background(), id, newScript) - if err != nil { + if _, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Script: newScript}); err != nil { t.Fatal(err) } @@ -119,3 +131,25 @@ func TestCoordinator(t *testing.T) { t.Fatal("task sent to scheduler doesnt match task created") } } + +func TestCoordinator_DeleteUnclaimedTask(t *testing.T) { + st := backend.NewInMemStore() + sched := mock.NewScheduler() + + coord := coordinator.New(sched, st) + + // Create an isolated task directly through the store so the coordinator doesn't know about it. + id, err := st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script}) + if err != nil { + t.Fatal(err) + } + + // Deleting the task through the coordinator should succeed. + if _, err := coord.DeleteTask(context.Background(), id); err != nil { + t.Fatal(err) + } + + if _, err := st.FindTaskByID(context.Background(), id); err != backend.ErrTaskNotFound { + t.Fatalf("expected deleted task not to be found; got %v", err) + } +} diff --git a/task/backend/inmem_store.go b/task/backend/inmem_store.go index 2bded87880..7277989584 100644 --- a/task/backend/inmem_store.go +++ b/task/backend/inmem_store.go @@ -9,6 +9,7 @@ import ( "github.com/influxdata/platform" "github.com/influxdata/platform/snowflake" + "github.com/influxdata/platform/task/options" ) var _ Store = (*inmem)(nil) @@ -74,27 +75,58 @@ func (s *inmem) CreateTask(_ context.Context, req CreateTaskRequest) (platform.I return id, nil } -func (s *inmem) ModifyTask(_ context.Context, id platform.ID, script string) error { - op, err := StoreValidator.ModifyArgs(id, script) +func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTaskResult, error) { + var res UpdateTaskResult + op, err := StoreValidator.UpdateArgs(req) if err != nil { - return err + return res, err } + idStr := req.ID.String() + s.mu.Lock() defer s.mu.Unlock() + found := false for n, t := range s.tasks { - if t.ID != id { + if t.ID != req.ID { continue } + found = true + res.OldScript = t.Script + if req.Script == "" { + op, err = options.FromScript(t.Script) + if err != nil { + return res, err + } + } else { + t.Script = req.Script + } t.Name = op.Name - t.Script = script + s.tasks[n] = t - return nil + res.NewTask = t + break + } + if !found { + return res, fmt.Errorf("ModifyTask: record not found for %s", idStr) } - return fmt.Errorf("ModifyTask: record not found for %s", id) + stm, ok := s.runners[idStr] + if !ok { + panic("inmem store: had task without runner for task ID " + idStr) + } + res.OldStatus = TaskStatus(stm.Status) + + if req.Status != "" { + // Changing the status. + stm.Status = string(req.Status) + s.runners[idStr] = stm + } + res.NewMeta = stm + + return res, nil } func (s *inmem) ListTasks(_ context.Context, params TaskSearchParams) ([]StoreTask, error) { @@ -195,37 +227,6 @@ func (s *inmem) FindTaskByIDWithMeta(_ context.Context, id platform.ID) (*StoreT return task, &meta, nil } -func (s *inmem) EnableTask(ctx context.Context, id platform.ID) error { - s.mu.Lock() - defer s.mu.Unlock() - - strID := id.String() - - meta, ok := s.runners[strID] - if !ok { - return errors.New("task meta not found") - } - meta.Status = string(TaskActive) - s.runners[strID] = meta - - return nil -} - -func (s *inmem) DisableTask(ctx context.Context, id platform.ID) error { - s.mu.Lock() - defer s.mu.Unlock() - strID := id.String() - - meta, ok := s.runners[strID] - if !ok { - return errors.New("task meta not found") - } - meta.Status = string(TaskInactive) - s.runners[strID] = meta - - return nil -} - func (s *inmem) FindTaskMetaByID(ctx context.Context, id platform.ID) (*StoreTaskMeta, error) { s.mu.RLock() defer s.mu.RUnlock() diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index cab8d8f2e5..dc5ae8048f 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -14,8 +14,15 @@ import ( "go.uber.org/zap" ) -var ErrRunCanceled = errors.New("run canceled") -var ErrTaskNotClaimed = errors.New("task not claimed") +var ( + ErrRunCanceled = errors.New("run canceled") + + // ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not. + ErrTaskNotClaimed = errors.New("task not claimed") + + // ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is. + ErrTaskAlreadyClaimed = errors.New("task already claimed") +) // DesiredState persists the desired state of a run. type DesiredState interface { @@ -259,7 +266,7 @@ func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err err tid := task.ID.String() _, ok := s.taskSchedulers[tid] if ok { - return errors.New("task has already been claimed") + return ErrTaskAlreadyClaimed } s.taskSchedulers[tid] = ts diff --git a/task/backend/store.go b/task/backend/store.go index 8c3de340d9..7ee4702975 100644 --- a/task/backend/store.go +++ b/task/backend/store.go @@ -38,6 +38,19 @@ const ( DefaultTaskStatus TaskStatus = TaskActive ) +// validate returns an error if s is not a known task status. +func (s TaskStatus) validate(allowEmpty bool) error { + if allowEmpty && s == "" { + return nil + } + + if s == TaskActive || s == TaskInactive { + return nil + } + + return fmt.Errorf("invalid task status: %q", s) +} + type RunStatus int const ( @@ -83,6 +96,7 @@ type RunCreation struct { HasQueue bool } +// CreateTaskRequest encapsulates state of a new task to be created. type CreateTaskRequest struct { // Owners. Org, User platform.ID @@ -100,21 +114,47 @@ type CreateTaskRequest struct { Status TaskStatus } +// UpdateTaskRequest encapsulates requested changes to a task. +type UpdateTaskRequest struct { + // ID of the task. + ID platform.ID + + // New script content of the task. + // If empty, do not modify the existing script. + Script string + + // The new desired task status. + // If empty, do not modify the existing status. + Status TaskStatus +} + +// UpdateTaskResult describes the result of modifying a single task. +// Having the content returned from ModifyTask makes it much simpler for callers +// to decide how to notify on status changes, etc. +type UpdateTaskResult struct { + OldScript string + OldStatus TaskStatus + + NewTask StoreTask + NewMeta StoreTaskMeta +} + // Store is the interface around persisted tasks. type Store interface { // CreateTask creates a task with from the given CreateTaskRequest. // If the task is created successfully, the ID of the new task is returned. CreateTask(ctx context.Context, req CreateTaskRequest) (platform.ID, error) - // ModifyTask updates the script of an existing task. + // UpdateTask updates an existing task. // It returns an error if there was no task matching the given ID. - ModifyTask(ctx context.Context, id platform.ID, newScript string) error + // If the returned error is not nil, the returned result should not be inspected. + UpdateTask(ctx context.Context, req UpdateTaskRequest) (UpdateTaskResult, error) // ListTasks lists the tasks in the store that match the search params. ListTasks(ctx context.Context, params TaskSearchParams) ([]StoreTask, error) // FindTaskByID returns the task with the given ID. - // If no task matches the ID, the returned task is nil. + // If no task matches the ID, the returned task is nil and ErrTaskNotFound is returned. FindTaskByID(ctx context.Context, id platform.ID) (*StoreTask, error) // FindTaskMetaByID returns the metadata about a task. @@ -123,12 +163,6 @@ type Store interface { // FindTaskByIDWithMeta combines finding the task and the meta into a single call. FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*StoreTask, *StoreTaskMeta, error) - // EnableTask updates task status to active. - EnableTask(ctx context.Context, id platform.ID) error - - // DisableTask updates task status to inactive. - DisableTask(ctx context.Context, id platform.ID) error - // DeleteTask returns whether an entry matching the given ID was deleted. // If err is non-nil, deleted is false. // If err is nil, deleted is false if no entry matched the ID, @@ -286,30 +320,36 @@ func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error return o, fmt.Errorf("missing required fields to create task: %s", strings.Join(missing, ", ")) } - if req.Status != "" && req.Status != TaskActive && req.Status != TaskInactive { - return o, fmt.Errorf("invalid status: %s", req.Status) + if err := req.Status.validate(true); err != nil { + return o, err } return o, nil } -// ModifyArgs returns the script's parsed options, -// and an error if any of the provided fields are invalid for modifying a task. -func (StoreValidation) ModifyArgs(taskID platform.ID, script string) (options.Options, error) { +// UpdateArgs validates the UpdateTaskRequest. +// If the update only includes a new status (i.e. req.Script is empty), the returned options are zero. +// If the update contains neither a new script nor a new status, or if the script is invalid, an error is returned. +func (StoreValidation) UpdateArgs(req UpdateTaskRequest) (options.Options, error) { var missing []string var o options.Options - if script == "" { - missing = append(missing, "script") + if req.Script == "" && req.Status == "" { + missing = append(missing, "script or status") } else { - var err error - o, err = options.FromScript(script) - if err != nil { + if req.Script != "" { + var err error + o, err = options.FromScript(req.Script) + if err != nil { + return o, err + } + } + if err := req.Status.validate(true); err != nil { return o, err } } - if !taskID.Valid() { + if !req.ID.Valid() { missing = append(missing, "task ID") } diff --git a/task/backend/storetest/storetest.go b/task/backend/storetest/storetest.go index 8198a05054..cc8863a84f 100644 --- a/task/backend/storetest/storetest.go +++ b/task/backend/storetest/storetest.go @@ -24,12 +24,11 @@ func NewStoreTest(name string, cf CreateStoreFunc, df DestroyStoreFunc, funcName if len(funcNames) == 0 { funcNames = []string{ "CreateTask", - "ModifyTask", + "UpdateTask", "ListTasks", "FindTask", "FindMeta", "FindTaskByIDWithMeta", - "EnableDisableTask", "DeleteTask", "CreateNextRun", "FinishRun", @@ -38,12 +37,11 @@ func NewStoreTest(name string, cf CreateStoreFunc, df DestroyStoreFunc, funcName } availableFuncs := map[string]TestFunc{ "CreateTask": testStoreCreate, - "ModifyTask": testStoreModify, + "UpdateTask": testStoreUpdate, "ListTasks": testStoreListTasks, "FindTask": testStoreFindTask, "FindMeta": testStoreFindMeta, "FindTaskByIDWithMeta": testStoreFindByIDWithMeta, - "EnableDisableTask": testStoreTaskEnableDisable, "DeleteTask": testStoreDelete, "CreateNextRun": testStoreCreateNextRun, "FinishRun": testStoreFinishRun, @@ -117,7 +115,7 @@ from(bucket:"test") |> range(start:-1h)` } } -func testStoreModify(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) { +func testStoreUpdate(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) { const script = `option task = { name: "a task", cron: "* * * * *", @@ -151,37 +149,96 @@ from(bucket:"y") |> range(start:-1h)` if err != nil { t.Fatal(err) } - if err := s.ModifyTask(context.Background(), id, script2); err != nil { - t.Fatal(err) - } - task, err := s.FindTaskByID(context.Background(), id) + // Modify just the script. + res, err := s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Script: script2}) if err != nil { t.Fatal(err) } + + task := res.NewTask + meta := res.NewMeta if task.Script != script2 { t.Fatalf("Task didnt update: %s", task.Script) } if task.Name != "a task2" { t.Fatalf("Task didn't update name, expected 'a task2' but got '%s' for task %v", task.Name, task) } + if meta.Status != string(backend.TaskActive) { + // Other tests explicitly check the initial status against DefaultTaskStatus, + // but in this case we need to be certain of the initial status so we can toggle it correctly. + t.Fatalf("expected task to be created active, got %q", meta.Status) + } + + // Modify just the status. + res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive}) + if err != nil { + t.Fatal(err) + } + + task = res.NewTask + meta = res.NewMeta + if task.Script != script2 { + t.Fatalf("Task script unexpectedly updated: %s", task.Script) + } + if task.Name != "a task2" { + t.Fatalf("Task name unexpectedly updated: %q", task.Name) + } + if meta.Status != string(backend.TaskInactive) { + t.Fatalf("expected task status to be inactive, got %q", meta.Status) + } + + // Modify the status to active, and change the script. + res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskActive, Script: script}) + if err != nil { + t.Fatal(err) + } + + task = res.NewTask + meta = res.NewMeta + if task.Script != script { + t.Fatalf("Task script did not update: %s", task.Script) + } + if task.Name != "a task" { + t.Fatalf("Task name did not update: %s", task.Name) + } + if meta.Status != string(backend.TaskActive) { + t.Fatalf("expected task status to be active, got %q", meta.Status) + } + + // Modify the status to inactive, and change the script again. + res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive, Script: script2}) + if err != nil { + t.Fatal(err) + } + + task = res.NewTask + meta = res.NewMeta + if task.Script != script2 { + t.Fatalf("Task script did not update: %s", task.Script) + } + if task.Name != "a task2" { + t.Fatalf("Task name did not update: %s", task.Name) + } + if meta.Status != string(backend.TaskInactive) { + t.Fatalf("expected task status to be inactive, got %q", meta.Status) + } }) for _, args := range []struct { caseName string - id platform.ID - script string + req backend.UpdateTaskRequest }{ - {caseName: "missing id", id: platform.ID(0), script: script}, - {caseName: "not found", id: platform.ID(7123), script: script}, - {caseName: "missing script", id: platform.ID(1), script: ""}, - {caseName: "missing name", id: platform.ID(1), script: scriptNoName}, + {caseName: "missing id", req: backend.UpdateTaskRequest{Script: script}}, + {caseName: "not found", req: backend.UpdateTaskRequest{ID: platform.ID(7123), Script: script}}, + {caseName: "missing script and status", req: backend.UpdateTaskRequest{ID: platform.ID(1)}}, + {caseName: "missing name", req: backend.UpdateTaskRequest{ID: platform.ID(1), Script: scriptNoName}}, } { t.Run(args.caseName, func(t *testing.T) { s := create(t) defer destroy(t, s) - if err := s.ModifyTask(context.Background(), args.id, args.script); err == nil { + if _, err := s.UpdateTask(context.Background(), args.req); err == nil { t.Fatal("expected error but did not receive one") } }) @@ -197,7 +254,7 @@ from(bucket:"y") |> range(start:-1h)` if err != nil { t.Fatal(err) } - if err := s.ModifyTask(context.Background(), id1, script2); err != nil { + if _, err := s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id1, Script: script2}); err != nil { t.Fatalf("expected to be allowed to reuse name when modifying task, but got %v", err) } }) @@ -388,7 +445,7 @@ from(bucket:"test") |> range(start:-1h)` t.Fatal(err) } - task, err := s.FindTaskByID(context.Background(), id) + task, meta, err := s.FindTaskByIDWithMeta(context.Background(), id) if err != nil { t.Fatal(err) } @@ -408,6 +465,9 @@ from(bucket:"test") |> range(start:-1h)` if task.Script != script { t.Fatalf("unexpected script %q", task.Script) } + if meta.Status != string(backend.DefaultTaskStatus) { + t.Fatalf("unexpected default status: got %q, exp %q", meta.Status, backend.DefaultTaskStatus) + } badID := id + 1 @@ -596,71 +656,6 @@ from(bucket:"test") |> range(start:-1h)` } } -func testStoreTaskEnableDisable(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) { - const script = `option task = { - name: "a task", - cron: "* * * * *", - } - - from(bucket:"test") |> range(start:-1h)` - - s := create(t) - defer destroy(t, s) - - org := platform.ID(1) - user := platform.ID(2) - - id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script}) - if err != nil { - t.Fatal(err) - } - - meta, err := s.FindTaskMetaByID(context.Background(), id) - if err != nil { - t.Fatal(err) - } - - if meta.Status != string(backend.TaskActive) { - t.Fatal("task status not set to active on create") - } - - if err := s.DisableTask(context.Background(), id); err != nil { - t.Fatal(err) - } - - meta, err = s.FindTaskMetaByID(context.Background(), id) - if err != nil { - t.Fatal(err) - } - - if meta.Status != string(backend.TaskInactive) { - t.Fatal("task status not set to inactive after disabling") - } - - if err := s.EnableTask(context.Background(), id); err != nil { - t.Fatal(err) - } - - meta, err = s.FindTaskMetaByID(context.Background(), id) - if err != nil { - t.Fatal(err) - } - - if meta.Status != string(backend.TaskActive) { - t.Fatal("task status not set to active after enabling") - } - - badID := id + 1 - - meta, err = s.FindTaskMetaByID(context.Background(), badID) - if err != backend.ErrTaskNotFound { - t.Fatalf("expected %v when task not found, got %v", backend.ErrTaskNotFound, err) - } - if meta != nil { - t.Fatal("expected nil meta when task not found, got non-nil") - } -} - func testStoreDelete(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) { const script = `option task = { name: "a task", diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index fe53dca4b4..be7bc3e7ef 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -3,7 +3,6 @@ package mock import ( "context" - "errors" "fmt" "strings" "sync" @@ -69,7 +68,7 @@ func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMe _, ok := s.claims[task.ID.String()] if ok { - return errors.New("task already in list") + return backend.ErrTaskAlreadyClaimed } s.meta[task.ID.String()] = *meta @@ -90,7 +89,7 @@ func (s *Scheduler) UpdateTask(task *backend.StoreTask, meta *backend.StoreTaskM _, ok := s.claims[task.ID.String()] if !ok { - return errors.New("task not in list") + return backend.ErrTaskNotClaimed } s.meta[task.ID.String()] = *meta @@ -116,7 +115,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error { t, ok := s.claims[taskID.String()] if !ok { - return errors.New("task not in list") + return backend.ErrTaskNotClaimed } if s.releaseChan != nil { s.releaseChan <- t diff --git a/task/platform_adapter.go b/task/platform_adapter.go index d780dbc406..822e2b6ef9 100644 --- a/task/platform_adapter.go +++ b/task/platform_adapter.go @@ -3,7 +3,6 @@ package task import ( "context" "errors" - "fmt" "time" "github.com/influxdata/platform" @@ -92,42 +91,31 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T return nil, errors.New("cannot update task without content") } + req := backend.UpdateTaskRequest{ID: id} + if upd.Flux != nil { + req.Script = *upd.Flux + } + if upd.Status != nil { + req.Status = backend.TaskStatus(*upd.Status) + } + res, err := p.s.UpdateTask(ctx, req) + if err != nil { + return nil, err + } + + opts, err := options.FromScript(res.NewTask.Script) + if err != nil { + return nil, err + } + task := &platform.Task{ ID: id, - Name: "TODO", - Status: "TODO", - Owner: platform.User{}, // TODO(mr): populate from context? - } - if upd.Flux != nil { - task.Flux = *upd.Flux - - opts, err := options.FromScript(task.Flux) - if err != nil { - return nil, err - } - task.Every = opts.Every.String() - task.Cron = opts.Cron - - if err := p.s.ModifyTask(ctx, id, task.Flux); err != nil { - return nil, err - } - } - - if upd.Status != nil { - var err error - switch *upd.Status { - case string(backend.TaskActive): - err = p.s.EnableTask(ctx, id) - case string(backend.TaskInactive): - err = p.s.DisableTask(ctx, id) - default: - err = fmt.Errorf("invalid status: %s", *upd.Status) - } - - if err != nil { - return nil, err - } - task.Status = *upd.Status + Name: opts.Name, + Status: res.NewMeta.Status, + Owner: platform.User{}, + Flux: res.NewTask.Script, + Every: opts.Every.String(), + Cron: opts.Cron, } return task, nil diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index b42012d194..f5f23bc491 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -155,7 +155,7 @@ func testTaskCRUD(t *testing.T, sys *System) { } } - // Update task. + // Update task: script only. newFlux := fmt.Sprintf(scriptFmt, 99) origID := f.ID f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Flux: &newFlux}) @@ -169,6 +169,36 @@ func testTaskCRUD(t *testing.T, sys *System) { if f.Flux != newFlux { t.Fatalf("wrong flux from update; want %q, got %q", newFlux, f.Flux) } + if f.Status != string(backend.TaskActive) { + t.Fatalf("expected task to be created active, got %q", f.Status) + } + + // Update task: status only. + newStatus := string(backend.TaskInactive) + f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Status: &newStatus}) + if err != nil { + t.Fatal(err) + } + if f.Flux != newFlux { + t.Fatalf("flux unexpected updated: %s", f.Flux) + } + if f.Status != newStatus { + t.Fatalf("expected task status to be inactive, got %q", f.Status) + } + + // Update task: reactivate status and update script. + newStatus = string(backend.TaskActive) + newFlux = fmt.Sprintf(scriptFmt, 98) + f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Flux: &newFlux, Status: &newStatus}) + if err != nil { + t.Fatal(err) + } + if f.Flux != newFlux { + t.Fatalf("flux unexpected updated: %s", f.Flux) + } + if f.Status != newStatus { + t.Fatalf("expected task status to be inactive, got %q", f.Status) + } // Delete task. if err := sys.ts.DeleteTask(sys.Ctx, origID); err != nil {