fix(task): propagate status updates when modifying tasks

This also changes the backend.Store API to remove the EnableTask and
DisableTask methods, merging their functionality into ModifyTask, which
has been named to UpdateTask to keep closer to the CRUD acronym.
pull/10616/head
Mark Rushakoff 2018-10-15 17:00:42 -07:00 committed by Mark Rushakoff
parent c168cd1cee
commit 8c0aec7975
10 changed files with 374 additions and 286 deletions

View File

@ -25,6 +25,7 @@ import (
bolt "github.com/coreos/bbolt"
"github.com/influxdata/platform"
"github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/options"
)
// ErrDBReadOnly is an error for when the database is set to read only.
@ -195,31 +196,86 @@ func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) (
return id, nil
}
// ModifyTask changes a task with a new script, it should error if the task does not exist.
func (s *Store) ModifyTask(ctx context.Context, id platform.ID, newScript string) error {
op, err := backend.StoreValidator.ModifyArgs(id, newScript)
func (s *Store) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (backend.UpdateTaskResult, error) {
var res backend.UpdateTaskResult
op, err := backend.StoreValidator.UpdateArgs(req)
if err != nil {
return err
return res, err
}
return s.db.Update(func(tx *bolt.Tx) error {
encodedID, err := req.ID.Encode()
if err != nil {
return res, err
}
err = s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket)
bt := b.Bucket(tasksPath)
encodedID, err := id.Encode()
if err != nil {
v := bt.Get(encodedID)
if v == nil {
return backend.ErrTaskNotFound
}
res.OldScript = string(v)
newScript := req.Script
if req.Script == "" {
// Need to build op from existing script.
op, err = options.FromScript(string(v))
if err != nil {
return err
}
newScript = string(v)
} else {
if err := bt.Put(encodedID, []byte(req.Script)); err != nil {
return err
}
if err := b.Bucket(nameByTaskID).Put(encodedID, []byte(op.Name)); err != nil {
return err
}
}
var userID, orgID platform.ID
if err := userID.Decode(b.Bucket(userByTaskID).Get(encodedID)); err != nil {
return err
}
if v := bt.Get(encodedID); v == nil {
return backend.ErrTaskNotFound
}
err = bt.Put(encodedID, []byte(newScript))
if err != nil {
if err := orgID.Decode(b.Bucket(orgByTaskID).Get(encodedID)); err != nil {
return err
}
return b.Bucket(nameByTaskID).Put(encodedID, []byte(op.Name))
stmBytes := b.Bucket(taskMetaPath).Get(encodedID)
if stmBytes == nil {
return backend.ErrTaskNotFound
}
var stm backend.StoreTaskMeta
if err := stm.Unmarshal(stmBytes); err != nil {
return err
}
res.OldStatus = backend.TaskStatus(stm.Status)
if req.Status != "" {
stm.Status = string(req.Status)
stmBytes, err = stm.Marshal()
if err != nil {
return err
}
if err := b.Bucket(taskMetaPath).Put(encodedID, stmBytes); err != nil {
return err
}
}
res.NewMeta = stm
res.NewTask = backend.StoreTask{
ID: req.ID,
Org: orgID,
User: userID,
Name: op.Name,
Script: newScript,
}
return nil
})
return res, err
}
// ListTasks lists the tasks based on a filter.
@ -499,58 +555,6 @@ func (s *Store) FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*back
}, &stm, nil
}
func (s *Store) EnableTask(ctx context.Context, id platform.ID) error {
encodedID, err := id.Encode()
if err != nil {
return err
}
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket).Bucket(taskMetaPath)
stmBytes := b.Get(encodedID)
if stmBytes == nil {
return errors.New("task meta not found")
}
stm := backend.StoreTaskMeta{}
err := stm.Unmarshal(stmBytes)
if err != nil {
return err
}
stm.Status = string(backend.TaskActive)
stmBytes, err = stm.Marshal()
if err != nil {
return err
}
return b.Put(encodedID, stmBytes)
})
}
func (s *Store) DisableTask(ctx context.Context, id platform.ID) error {
encodedID, err := id.Encode()
if err != nil {
return err
}
return s.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(s.bucket).Bucket(taskMetaPath)
stmBytes := b.Get(encodedID)
if stmBytes == nil {
return errors.New("task meta not found")
}
stm := backend.StoreTaskMeta{}
err := stm.Unmarshal(stmBytes)
if err != nil {
return err
}
stm.Status = string(backend.TaskInactive)
stmBytes, err = stm.Marshal()
if err != nil {
return err
}
return b.Put(encodedID, stmBytes)
})
}
// DeleteTask deletes the task.
func (s *Store) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) {
encodedID, err := id.Encode()

View File

@ -60,50 +60,40 @@ func (c *Coordinator) CreateTask(ctx context.Context, req backend.CreateTaskRequ
return id, nil
}
func (c *Coordinator) ModifyTask(ctx context.Context, id platform.ID, newScript string) error {
if err := c.Store.ModifyTask(ctx, id, newScript); err != nil {
return err
}
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, id)
func (c *Coordinator) UpdateTask(ctx context.Context, req backend.UpdateTaskRequest) (backend.UpdateTaskResult, error) {
res, err := c.Store.UpdateTask(ctx, req)
if err != nil {
return err
return res, err
}
if err := c.sch.UpdateTask(task, meta); err != nil {
return err
}
return nil
}
func (c *Coordinator) EnableTask(ctx context.Context, id platform.ID) error {
if err := c.Store.EnableTask(ctx, id); err != nil {
return err
}
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, id)
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, req.ID)
if err != nil {
return err
return res, err
}
if err := c.sch.ClaimTask(task, meta); err != nil {
return err
// If disabling the task, do so before modifying the script.
if req.Status == backend.TaskInactive && res.OldStatus != backend.TaskInactive {
if err := c.sch.ReleaseTask(req.ID); err != nil && err != backend.ErrTaskNotClaimed {
return res, err
}
}
return nil
}
func (c *Coordinator) DisableTask(ctx context.Context, id platform.ID) error {
if err := c.Store.DisableTask(ctx, id); err != nil {
return err
if err := c.sch.UpdateTask(task, meta); err != nil && err != backend.ErrTaskNotClaimed {
return res, err
}
return c.sch.ReleaseTask(id)
// If enabling the task, claim it after modifying the script.
if req.Status == backend.TaskActive {
if err := c.sch.ClaimTask(task, meta); err != nil && err != backend.ErrTaskAlreadyClaimed {
return res, err
}
}
return res, nil
}
func (c *Coordinator) DeleteTask(ctx context.Context, id platform.ID) (deleted bool, err error) {
if err := c.sch.ReleaseTask(id); err != nil {
if err := c.sch.ReleaseTask(id); err != nil && err != backend.ErrTaskNotClaimed {
return false, err
}

View File

@ -22,6 +22,8 @@ func timeoutSelector(ch <-chan *mock.Task) (*mock.Task, error) {
}
}
const script = `option task = {name: "a task",cron: "* * * * *"} from(bucket:"test") |> range(start:-1h)`
func TestCoordinator(t *testing.T) {
st := backend.NewInMemStore()
sched := mock.NewScheduler()
@ -33,7 +35,6 @@ func TestCoordinator(t *testing.T) {
orgID := platformtesting.MustIDBase16("69746f7175650d0a")
usrID := platformtesting.MustIDBase16("6c61757320657420")
script := `option task = {name: "a task",cron: "* * * * *"} from(bucket:"test") |> range(start:-1h)`
id, err := coord.CreateTask(context.Background(), backend.CreateTaskRequest{Org: orgID, User: usrID, Script: script})
if err != nil {
t.Fatal(err)
@ -76,10 +77,23 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}
err = coord.DisableTask(context.Background(), id)
res, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive})
if err != nil {
t.Fatal(err)
}
// Only validating res on the first update.
if res.NewTask.ID != id {
t.Fatalf("unexpected ID on update result: got %s, want %s", res.NewTask.ID.String(), id.String())
}
if res.NewTask.Script != script {
t.Fatalf("unexpected script on update result: got %q, want %q", res.NewTask.Script, script)
}
if res.NewMeta.Status != string(backend.TaskInactive) {
t.Fatalf("unexpected meta status on update result: got %q, want %q", res.NewMeta.Status, backend.TaskInactive)
}
if res.OldStatus != backend.TaskActive {
t.Fatalf("unexpected old status on update result: got %q, want %q", res.OldStatus, backend.TaskActive)
}
task, err = timeoutSelector(releaseChan)
if err != nil {
@ -90,8 +104,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal("task sent to scheduler doesnt match task created")
}
err = coord.EnableTask(context.Background(), id)
if err != nil {
if _, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskActive}); err != nil {
t.Fatal(err)
}
@ -105,8 +118,7 @@ func TestCoordinator(t *testing.T) {
}
newScript := `option task = {name: "a task",cron: "1 * * * *"} from(bucket:"test") |> range(start:-2h)`
err = coord.ModifyTask(context.Background(), id, newScript)
if err != nil {
if _, err := coord.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Script: newScript}); err != nil {
t.Fatal(err)
}
@ -119,3 +131,25 @@ func TestCoordinator(t *testing.T) {
t.Fatal("task sent to scheduler doesnt match task created")
}
}
func TestCoordinator_DeleteUnclaimedTask(t *testing.T) {
st := backend.NewInMemStore()
sched := mock.NewScheduler()
coord := coordinator.New(sched, st)
// Create an isolated task directly through the store so the coordinator doesn't know about it.
id, err := st.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script})
if err != nil {
t.Fatal(err)
}
// Deleting the task through the coordinator should succeed.
if _, err := coord.DeleteTask(context.Background(), id); err != nil {
t.Fatal(err)
}
if _, err := st.FindTaskByID(context.Background(), id); err != backend.ErrTaskNotFound {
t.Fatalf("expected deleted task not to be found; got %v", err)
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/influxdata/platform"
"github.com/influxdata/platform/snowflake"
"github.com/influxdata/platform/task/options"
)
var _ Store = (*inmem)(nil)
@ -74,27 +75,58 @@ func (s *inmem) CreateTask(_ context.Context, req CreateTaskRequest) (platform.I
return id, nil
}
func (s *inmem) ModifyTask(_ context.Context, id platform.ID, script string) error {
op, err := StoreValidator.ModifyArgs(id, script)
func (s *inmem) UpdateTask(_ context.Context, req UpdateTaskRequest) (UpdateTaskResult, error) {
var res UpdateTaskResult
op, err := StoreValidator.UpdateArgs(req)
if err != nil {
return err
return res, err
}
idStr := req.ID.String()
s.mu.Lock()
defer s.mu.Unlock()
found := false
for n, t := range s.tasks {
if t.ID != id {
if t.ID != req.ID {
continue
}
found = true
res.OldScript = t.Script
if req.Script == "" {
op, err = options.FromScript(t.Script)
if err != nil {
return res, err
}
} else {
t.Script = req.Script
}
t.Name = op.Name
t.Script = script
s.tasks[n] = t
return nil
res.NewTask = t
break
}
if !found {
return res, fmt.Errorf("ModifyTask: record not found for %s", idStr)
}
return fmt.Errorf("ModifyTask: record not found for %s", id)
stm, ok := s.runners[idStr]
if !ok {
panic("inmem store: had task without runner for task ID " + idStr)
}
res.OldStatus = TaskStatus(stm.Status)
if req.Status != "" {
// Changing the status.
stm.Status = string(req.Status)
s.runners[idStr] = stm
}
res.NewMeta = stm
return res, nil
}
func (s *inmem) ListTasks(_ context.Context, params TaskSearchParams) ([]StoreTask, error) {
@ -195,37 +227,6 @@ func (s *inmem) FindTaskByIDWithMeta(_ context.Context, id platform.ID) (*StoreT
return task, &meta, nil
}
func (s *inmem) EnableTask(ctx context.Context, id platform.ID) error {
s.mu.Lock()
defer s.mu.Unlock()
strID := id.String()
meta, ok := s.runners[strID]
if !ok {
return errors.New("task meta not found")
}
meta.Status = string(TaskActive)
s.runners[strID] = meta
return nil
}
func (s *inmem) DisableTask(ctx context.Context, id platform.ID) error {
s.mu.Lock()
defer s.mu.Unlock()
strID := id.String()
meta, ok := s.runners[strID]
if !ok {
return errors.New("task meta not found")
}
meta.Status = string(TaskInactive)
s.runners[strID] = meta
return nil
}
func (s *inmem) FindTaskMetaByID(ctx context.Context, id platform.ID) (*StoreTaskMeta, error) {
s.mu.RLock()
defer s.mu.RUnlock()

View File

@ -14,8 +14,15 @@ import (
"go.uber.org/zap"
)
var ErrRunCanceled = errors.New("run canceled")
var ErrTaskNotClaimed = errors.New("task not claimed")
var (
ErrRunCanceled = errors.New("run canceled")
// ErrTaskNotClaimed is returned when attempting to operate against a task that must be claimed but is not.
ErrTaskNotClaimed = errors.New("task not claimed")
// ErrTaskAlreadyClaimed is returned when attempting to operate against a task that must not be claimed but is.
ErrTaskAlreadyClaimed = errors.New("task already claimed")
)
// DesiredState persists the desired state of a run.
type DesiredState interface {
@ -259,7 +266,7 @@ func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err err
tid := task.ID.String()
_, ok := s.taskSchedulers[tid]
if ok {
return errors.New("task has already been claimed")
return ErrTaskAlreadyClaimed
}
s.taskSchedulers[tid] = ts

View File

@ -38,6 +38,19 @@ const (
DefaultTaskStatus TaskStatus = TaskActive
)
// validate returns an error if s is not a known task status.
func (s TaskStatus) validate(allowEmpty bool) error {
if allowEmpty && s == "" {
return nil
}
if s == TaskActive || s == TaskInactive {
return nil
}
return fmt.Errorf("invalid task status: %q", s)
}
type RunStatus int
const (
@ -83,6 +96,7 @@ type RunCreation struct {
HasQueue bool
}
// CreateTaskRequest encapsulates state of a new task to be created.
type CreateTaskRequest struct {
// Owners.
Org, User platform.ID
@ -100,21 +114,47 @@ type CreateTaskRequest struct {
Status TaskStatus
}
// UpdateTaskRequest encapsulates requested changes to a task.
type UpdateTaskRequest struct {
// ID of the task.
ID platform.ID
// New script content of the task.
// If empty, do not modify the existing script.
Script string
// The new desired task status.
// If empty, do not modify the existing status.
Status TaskStatus
}
// UpdateTaskResult describes the result of modifying a single task.
// Having the content returned from ModifyTask makes it much simpler for callers
// to decide how to notify on status changes, etc.
type UpdateTaskResult struct {
OldScript string
OldStatus TaskStatus
NewTask StoreTask
NewMeta StoreTaskMeta
}
// Store is the interface around persisted tasks.
type Store interface {
// CreateTask creates a task with from the given CreateTaskRequest.
// If the task is created successfully, the ID of the new task is returned.
CreateTask(ctx context.Context, req CreateTaskRequest) (platform.ID, error)
// ModifyTask updates the script of an existing task.
// UpdateTask updates an existing task.
// It returns an error if there was no task matching the given ID.
ModifyTask(ctx context.Context, id platform.ID, newScript string) error
// If the returned error is not nil, the returned result should not be inspected.
UpdateTask(ctx context.Context, req UpdateTaskRequest) (UpdateTaskResult, error)
// ListTasks lists the tasks in the store that match the search params.
ListTasks(ctx context.Context, params TaskSearchParams) ([]StoreTask, error)
// FindTaskByID returns the task with the given ID.
// If no task matches the ID, the returned task is nil.
// If no task matches the ID, the returned task is nil and ErrTaskNotFound is returned.
FindTaskByID(ctx context.Context, id platform.ID) (*StoreTask, error)
// FindTaskMetaByID returns the metadata about a task.
@ -123,12 +163,6 @@ type Store interface {
// FindTaskByIDWithMeta combines finding the task and the meta into a single call.
FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*StoreTask, *StoreTaskMeta, error)
// EnableTask updates task status to active.
EnableTask(ctx context.Context, id platform.ID) error
// DisableTask updates task status to inactive.
DisableTask(ctx context.Context, id platform.ID) error
// DeleteTask returns whether an entry matching the given ID was deleted.
// If err is non-nil, deleted is false.
// If err is nil, deleted is false if no entry matched the ID,
@ -286,30 +320,36 @@ func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error
return o, fmt.Errorf("missing required fields to create task: %s", strings.Join(missing, ", "))
}
if req.Status != "" && req.Status != TaskActive && req.Status != TaskInactive {
return o, fmt.Errorf("invalid status: %s", req.Status)
if err := req.Status.validate(true); err != nil {
return o, err
}
return o, nil
}
// ModifyArgs returns the script's parsed options,
// and an error if any of the provided fields are invalid for modifying a task.
func (StoreValidation) ModifyArgs(taskID platform.ID, script string) (options.Options, error) {
// UpdateArgs validates the UpdateTaskRequest.
// If the update only includes a new status (i.e. req.Script is empty), the returned options are zero.
// If the update contains neither a new script nor a new status, or if the script is invalid, an error is returned.
func (StoreValidation) UpdateArgs(req UpdateTaskRequest) (options.Options, error) {
var missing []string
var o options.Options
if script == "" {
missing = append(missing, "script")
if req.Script == "" && req.Status == "" {
missing = append(missing, "script or status")
} else {
var err error
o, err = options.FromScript(script)
if err != nil {
if req.Script != "" {
var err error
o, err = options.FromScript(req.Script)
if err != nil {
return o, err
}
}
if err := req.Status.validate(true); err != nil {
return o, err
}
}
if !taskID.Valid() {
if !req.ID.Valid() {
missing = append(missing, "task ID")
}

View File

@ -24,12 +24,11 @@ func NewStoreTest(name string, cf CreateStoreFunc, df DestroyStoreFunc, funcName
if len(funcNames) == 0 {
funcNames = []string{
"CreateTask",
"ModifyTask",
"UpdateTask",
"ListTasks",
"FindTask",
"FindMeta",
"FindTaskByIDWithMeta",
"EnableDisableTask",
"DeleteTask",
"CreateNextRun",
"FinishRun",
@ -38,12 +37,11 @@ func NewStoreTest(name string, cf CreateStoreFunc, df DestroyStoreFunc, funcName
}
availableFuncs := map[string]TestFunc{
"CreateTask": testStoreCreate,
"ModifyTask": testStoreModify,
"UpdateTask": testStoreUpdate,
"ListTasks": testStoreListTasks,
"FindTask": testStoreFindTask,
"FindMeta": testStoreFindMeta,
"FindTaskByIDWithMeta": testStoreFindByIDWithMeta,
"EnableDisableTask": testStoreTaskEnableDisable,
"DeleteTask": testStoreDelete,
"CreateNextRun": testStoreCreateNextRun,
"FinishRun": testStoreFinishRun,
@ -117,7 +115,7 @@ from(bucket:"test") |> range(start:-1h)`
}
}
func testStoreModify(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
func testStoreUpdate(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
const script = `option task = {
name: "a task",
cron: "* * * * *",
@ -151,37 +149,96 @@ from(bucket:"y") |> range(start:-1h)`
if err != nil {
t.Fatal(err)
}
if err := s.ModifyTask(context.Background(), id, script2); err != nil {
t.Fatal(err)
}
task, err := s.FindTaskByID(context.Background(), id)
// Modify just the script.
res, err := s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Script: script2})
if err != nil {
t.Fatal(err)
}
task := res.NewTask
meta := res.NewMeta
if task.Script != script2 {
t.Fatalf("Task didnt update: %s", task.Script)
}
if task.Name != "a task2" {
t.Fatalf("Task didn't update name, expected 'a task2' but got '%s' for task %v", task.Name, task)
}
if meta.Status != string(backend.TaskActive) {
// Other tests explicitly check the initial status against DefaultTaskStatus,
// but in this case we need to be certain of the initial status so we can toggle it correctly.
t.Fatalf("expected task to be created active, got %q", meta.Status)
}
// Modify just the status.
res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive})
if err != nil {
t.Fatal(err)
}
task = res.NewTask
meta = res.NewMeta
if task.Script != script2 {
t.Fatalf("Task script unexpectedly updated: %s", task.Script)
}
if task.Name != "a task2" {
t.Fatalf("Task name unexpectedly updated: %q", task.Name)
}
if meta.Status != string(backend.TaskInactive) {
t.Fatalf("expected task status to be inactive, got %q", meta.Status)
}
// Modify the status to active, and change the script.
res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskActive, Script: script})
if err != nil {
t.Fatal(err)
}
task = res.NewTask
meta = res.NewMeta
if task.Script != script {
t.Fatalf("Task script did not update: %s", task.Script)
}
if task.Name != "a task" {
t.Fatalf("Task name did not update: %s", task.Name)
}
if meta.Status != string(backend.TaskActive) {
t.Fatalf("expected task status to be active, got %q", meta.Status)
}
// Modify the status to inactive, and change the script again.
res, err = s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id, Status: backend.TaskInactive, Script: script2})
if err != nil {
t.Fatal(err)
}
task = res.NewTask
meta = res.NewMeta
if task.Script != script2 {
t.Fatalf("Task script did not update: %s", task.Script)
}
if task.Name != "a task2" {
t.Fatalf("Task name did not update: %s", task.Name)
}
if meta.Status != string(backend.TaskInactive) {
t.Fatalf("expected task status to be inactive, got %q", meta.Status)
}
})
for _, args := range []struct {
caseName string
id platform.ID
script string
req backend.UpdateTaskRequest
}{
{caseName: "missing id", id: platform.ID(0), script: script},
{caseName: "not found", id: platform.ID(7123), script: script},
{caseName: "missing script", id: platform.ID(1), script: ""},
{caseName: "missing name", id: platform.ID(1), script: scriptNoName},
{caseName: "missing id", req: backend.UpdateTaskRequest{Script: script}},
{caseName: "not found", req: backend.UpdateTaskRequest{ID: platform.ID(7123), Script: script}},
{caseName: "missing script and status", req: backend.UpdateTaskRequest{ID: platform.ID(1)}},
{caseName: "missing name", req: backend.UpdateTaskRequest{ID: platform.ID(1), Script: scriptNoName}},
} {
t.Run(args.caseName, func(t *testing.T) {
s := create(t)
defer destroy(t, s)
if err := s.ModifyTask(context.Background(), args.id, args.script); err == nil {
if _, err := s.UpdateTask(context.Background(), args.req); err == nil {
t.Fatal("expected error but did not receive one")
}
})
@ -197,7 +254,7 @@ from(bucket:"y") |> range(start:-1h)`
if err != nil {
t.Fatal(err)
}
if err := s.ModifyTask(context.Background(), id1, script2); err != nil {
if _, err := s.UpdateTask(context.Background(), backend.UpdateTaskRequest{ID: id1, Script: script2}); err != nil {
t.Fatalf("expected to be allowed to reuse name when modifying task, but got %v", err)
}
})
@ -388,7 +445,7 @@ from(bucket:"test") |> range(start:-1h)`
t.Fatal(err)
}
task, err := s.FindTaskByID(context.Background(), id)
task, meta, err := s.FindTaskByIDWithMeta(context.Background(), id)
if err != nil {
t.Fatal(err)
}
@ -408,6 +465,9 @@ from(bucket:"test") |> range(start:-1h)`
if task.Script != script {
t.Fatalf("unexpected script %q", task.Script)
}
if meta.Status != string(backend.DefaultTaskStatus) {
t.Fatalf("unexpected default status: got %q, exp %q", meta.Status, backend.DefaultTaskStatus)
}
badID := id + 1
@ -596,71 +656,6 @@ from(bucket:"test") |> range(start:-1h)`
}
}
func testStoreTaskEnableDisable(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
const script = `option task = {
name: "a task",
cron: "* * * * *",
}
from(bucket:"test") |> range(start:-1h)`
s := create(t)
defer destroy(t, s)
org := platform.ID(1)
user := platform.ID(2)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script})
if err != nil {
t.Fatal(err)
}
meta, err := s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
if meta.Status != string(backend.TaskActive) {
t.Fatal("task status not set to active on create")
}
if err := s.DisableTask(context.Background(), id); err != nil {
t.Fatal(err)
}
meta, err = s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
if meta.Status != string(backend.TaskInactive) {
t.Fatal("task status not set to inactive after disabling")
}
if err := s.EnableTask(context.Background(), id); err != nil {
t.Fatal(err)
}
meta, err = s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
if meta.Status != string(backend.TaskActive) {
t.Fatal("task status not set to active after enabling")
}
badID := id + 1
meta, err = s.FindTaskMetaByID(context.Background(), badID)
if err != backend.ErrTaskNotFound {
t.Fatalf("expected %v when task not found, got %v", backend.ErrTaskNotFound, err)
}
if meta != nil {
t.Fatal("expected nil meta when task not found, got non-nil")
}
}
func testStoreDelete(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
const script = `option task = {
name: "a task",

View File

@ -3,7 +3,6 @@ package mock
import (
"context"
"errors"
"fmt"
"strings"
"sync"
@ -69,7 +68,7 @@ func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMe
_, ok := s.claims[task.ID.String()]
if ok {
return errors.New("task already in list")
return backend.ErrTaskAlreadyClaimed
}
s.meta[task.ID.String()] = *meta
@ -90,7 +89,7 @@ func (s *Scheduler) UpdateTask(task *backend.StoreTask, meta *backend.StoreTaskM
_, ok := s.claims[task.ID.String()]
if !ok {
return errors.New("task not in list")
return backend.ErrTaskNotClaimed
}
s.meta[task.ID.String()] = *meta
@ -116,7 +115,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error {
t, ok := s.claims[taskID.String()]
if !ok {
return errors.New("task not in list")
return backend.ErrTaskNotClaimed
}
if s.releaseChan != nil {
s.releaseChan <- t

View File

@ -3,7 +3,6 @@ package task
import (
"context"
"errors"
"fmt"
"time"
"github.com/influxdata/platform"
@ -92,42 +91,31 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T
return nil, errors.New("cannot update task without content")
}
req := backend.UpdateTaskRequest{ID: id}
if upd.Flux != nil {
req.Script = *upd.Flux
}
if upd.Status != nil {
req.Status = backend.TaskStatus(*upd.Status)
}
res, err := p.s.UpdateTask(ctx, req)
if err != nil {
return nil, err
}
opts, err := options.FromScript(res.NewTask.Script)
if err != nil {
return nil, err
}
task := &platform.Task{
ID: id,
Name: "TODO",
Status: "TODO",
Owner: platform.User{}, // TODO(mr): populate from context?
}
if upd.Flux != nil {
task.Flux = *upd.Flux
opts, err := options.FromScript(task.Flux)
if err != nil {
return nil, err
}
task.Every = opts.Every.String()
task.Cron = opts.Cron
if err := p.s.ModifyTask(ctx, id, task.Flux); err != nil {
return nil, err
}
}
if upd.Status != nil {
var err error
switch *upd.Status {
case string(backend.TaskActive):
err = p.s.EnableTask(ctx, id)
case string(backend.TaskInactive):
err = p.s.DisableTask(ctx, id)
default:
err = fmt.Errorf("invalid status: %s", *upd.Status)
}
if err != nil {
return nil, err
}
task.Status = *upd.Status
Name: opts.Name,
Status: res.NewMeta.Status,
Owner: platform.User{},
Flux: res.NewTask.Script,
Every: opts.Every.String(),
Cron: opts.Cron,
}
return task, nil

View File

@ -155,7 +155,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
}
// Update task.
// Update task: script only.
newFlux := fmt.Sprintf(scriptFmt, 99)
origID := f.ID
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Flux: &newFlux})
@ -169,6 +169,36 @@ func testTaskCRUD(t *testing.T, sys *System) {
if f.Flux != newFlux {
t.Fatalf("wrong flux from update; want %q, got %q", newFlux, f.Flux)
}
if f.Status != string(backend.TaskActive) {
t.Fatalf("expected task to be created active, got %q", f.Status)
}
// Update task: status only.
newStatus := string(backend.TaskInactive)
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Status: &newStatus})
if err != nil {
t.Fatal(err)
}
if f.Flux != newFlux {
t.Fatalf("flux unexpected updated: %s", f.Flux)
}
if f.Status != newStatus {
t.Fatalf("expected task status to be inactive, got %q", f.Status)
}
// Update task: reactivate status and update script.
newStatus = string(backend.TaskActive)
newFlux = fmt.Sprintf(scriptFmt, 98)
f, err = sys.ts.UpdateTask(sys.Ctx, origID, platform.TaskUpdate{Flux: &newFlux, Status: &newStatus})
if err != nil {
t.Fatal(err)
}
if f.Flux != newFlux {
t.Fatalf("flux unexpected updated: %s", f.Flux)
}
if f.Status != newStatus {
t.Fatalf("expected task status to be inactive, got %q", f.Status)
}
// Delete task.
if err := sys.ts.DeleteTask(sys.Ctx, origID); err != nil {