feat(task): allow setting task status during creation

This renames TaskEnabled and TaskDisabled to TaskActive and
TaskInactive, to keep in line with the swagger and other parts of the
system. But I left the EnableTask and DisableTask methods on the Store
interface for now. I could see eliminating those methods if we adjust
the signature of the UpdateTask method.
pull/10616/head
Mark Rushakoff 2018-10-12 14:30:27 -07:00 committed by Mark Rushakoff
parent 56f32b4780
commit cb0b54cfd9
5 changed files with 127 additions and 81 deletions

View File

@ -169,14 +169,16 @@ func (s *Store) CreateTask(ctx context.Context, req backend.CreateTaskRequest) (
return err
}
// metadata
stm := backend.StoreTaskMeta{
MaxConcurrency: int32(o.Concurrency),
Status: string(backend.TaskEnabled),
Status: string(req.Status),
LatestCompleted: req.ScheduleAfter,
EffectiveCron: o.EffectiveCronString(),
Delay: int32(o.Delay / time.Second),
}
if stm.Status == "" {
stm.Status = string(backend.DefaultTaskStatus)
}
stmBytes, err := stm.Marshal()
if err != nil {
@ -517,7 +519,7 @@ func (s *Store) EnableTask(ctx context.Context, id platform.ID) error {
if err != nil {
return err
}
stm.Status = string(backend.TaskEnabled)
stm.Status = string(backend.TaskActive)
stmBytes, err = stm.Marshal()
if err != nil {
return err
@ -543,7 +545,7 @@ func (s *Store) DisableTask(ctx context.Context, id platform.ID) error {
if err != nil {
return err
}
stm.Status = string(backend.TaskDisabled)
stm.Status = string(backend.TaskInactive)
stmBytes, err = stm.Marshal()
if err != nil {
return err

View File

@ -58,13 +58,18 @@ func (s *inmem) CreateTask(_ context.Context, req CreateTaskRequest) (platform.I
defer s.mu.Unlock()
s.tasks = append(s.tasks, task)
s.runners[id.String()] = StoreTaskMeta{
stm := StoreTaskMeta{
MaxConcurrency: int32(o.Concurrency),
Status: string(TaskEnabled),
Status: string(req.Status),
LatestCompleted: req.ScheduleAfter,
EffectiveCron: o.EffectiveCronString(),
Delay: int32(o.Delay / time.Second),
}
if stm.Status == "" {
stm.Status = string(DefaultTaskStatus)
}
s.runners[id.String()] = stm
return id, nil
}
@ -196,7 +201,7 @@ func (s *inmem) EnableTask(ctx context.Context, id platform.ID) error {
if !ok {
return errors.New("task meta not found")
}
meta.Status = string(TaskEnabled)
meta.Status = string(TaskActive)
s.runners[strID] = meta
return nil
@ -211,7 +216,7 @@ func (s *inmem) DisableTask(ctx context.Context, id platform.ID) error {
if !ok {
return errors.New("task meta not found")
}
meta.Status = string(TaskDisabled)
meta.Status = string(TaskInactive)
s.runners[strID] = meta
return nil

View File

@ -21,17 +21,16 @@ var ErrUserNotFound = errors.New("user not found")
// ErrOrgNotFound is an error for when we can't find an org
var ErrOrgNotFound = errors.New("org not found")
// ErrTaskNameTaken is an error for when a task name is already taken
var ErrTaskNameTaken = errors.New("task name already in use by current user or target organization")
// ErrManualQueueFull is returned when a manual run request cannot be completed.
var ErrManualQueueFull = errors.New("manual queue at capacity")
type TaskStatus string
const (
TaskEnabled TaskStatus = "enabled"
TaskDisabled TaskStatus = "disabled"
TaskActive TaskStatus = "active"
TaskInactive TaskStatus = "inactive"
DefaultTaskStatus TaskStatus = TaskActive
)
type RunStatus int
@ -90,6 +89,10 @@ type CreateTaskRequest struct {
// The first run of the task will be run according to the earliest time after ScheduleAfter,
// matching the task's schedul via its cron or every option.
ScheduleAfter int64
// The initial task status.
// If empty, will be treated as DefaultTaskStatus.
Status TaskStatus
}
// Store is the interface around persisted tasks.
@ -115,10 +118,10 @@ type Store interface {
// FindTaskByIDWithMeta combines finding the task and the meta into a single call.
FindTaskByIDWithMeta(ctx context.Context, id platform.ID) (*StoreTask, *StoreTaskMeta, error)
// EnableTask updates task status to enabled.
// EnableTask updates task status to active.
EnableTask(ctx context.Context, id platform.ID) error
// disableTask updates task status to disabled.
// DisableTask updates task status to inactive.
DisableTask(ctx context.Context, id platform.ID) error
// DeleteTask returns whether an entry matching the given ID was deleted.
@ -253,7 +256,7 @@ type StoreValidation struct{}
// CreateArgs returns the script's parsed options,
// and an error if any of the provided fields are invalid for creating a task.
func (StoreValidation) CreateArgs(req CreateTaskRequest /*org, user platform.ID, script string*/) (options.Options, error) {
func (StoreValidation) CreateArgs(req CreateTaskRequest) (options.Options, error) {
var missing []string
var o options.Options
@ -278,6 +281,10 @@ func (StoreValidation) CreateArgs(req CreateTaskRequest /*org, user platform.ID,
return o, fmt.Errorf("missing required fields to create task: %s", strings.Join(missing, ", "))
}
if req.Status != "" && req.Status != TaskActive && req.Status != TaskInactive {
return o, fmt.Errorf("invalid status: %s", req.Status)
}
return o, nil
}

View File

@ -85,6 +85,7 @@ from(bucket:"test") |> range(start:-1h)`
caseName string
org, user platform.ID
name, script string
status backend.TaskStatus
noerr bool
}{
{caseName: "happy path", org: platform.ID(1), user: platform.ID(2), script: script, noerr: true},
@ -95,18 +96,22 @@ from(bucket:"test") |> range(start:-1h)`
{caseName: "repeated name and org", org: platform.ID(1), user: platform.ID(3), script: script, noerr: true},
{caseName: "repeated name and user", org: platform.ID(3), user: platform.ID(2), script: script, noerr: true},
{caseName: "repeated name, org, and user", org: platform.ID(1), user: platform.ID(2), script: script, noerr: true},
{caseName: "explicitly active", org: 1, user: 2, script: script, status: backend.TaskActive, noerr: true},
{caseName: "explicitly inactive", org: 1, user: 2, script: script, status: backend.TaskInactive, noerr: true},
{caseName: "invalid status", org: 1, user: 2, script: script, status: backend.TaskStatus("this is not a valid status")},
} {
t.Run(args.caseName, func(t *testing.T) {
req := backend.CreateTaskRequest{
Org: args.org,
User: args.user,
Script: args.script,
Status: args.status,
}
_, err := s.CreateTask(context.Background(), req)
if args.noerr && err != nil {
t.Fatalf("expected err!=nil but got nil instead")
} else if err == nil && !args.noerr {
t.Fatalf("expected nil error but got %v", err)
t.Fatalf("expected no err but got %v", err)
} else if !args.noerr && err == nil {
t.Fatal("expected error but got nil")
}
})
}
@ -427,74 +432,101 @@ func testStoreFindMeta(t *testing.T, create CreateStoreFunc, destroy DestroyStor
from(bucket:"test") |> range(start:-1h)`
s := create(t)
defer destroy(t, s)
t.Run("happy path", func(t *testing.T) {
s := create(t)
defer destroy(t, s)
org := platform.ID(1)
user := platform.ID(2)
org := platform.ID(1)
user := platform.ID(2)
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script, ScheduleAfter: 6000})
if err != nil {
t.Fatal(err)
}
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: org, User: user, Script: script, ScheduleAfter: 6000})
if err != nil {
t.Fatal(err)
}
meta, err := s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
meta, err := s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
if meta.MaxConcurrency != 3 {
t.Fatal("failed to set max concurrency")
}
if meta.MaxConcurrency != 3 {
t.Fatal("failed to set max concurrency")
}
if meta.LatestCompleted != 6000 {
t.Fatalf("LatestCompleted should have been set to 6000, got %d", meta.LatestCompleted)
}
if meta.LatestCompleted != 6000 {
t.Fatalf("LatestCompleted should have been set to 6000, got %d", meta.LatestCompleted)
}
if meta.EffectiveCron != "* * * * *" {
t.Fatalf("unexpected cron stored in meta: %q", meta.EffectiveCron)
}
if meta.EffectiveCron != "* * * * *" {
t.Fatalf("unexpected cron stored in meta: %q", meta.EffectiveCron)
}
if time.Duration(meta.Delay)*time.Second != 5*time.Second {
t.Fatalf("unexpected delay stored in meta: %v", meta.Delay)
}
if time.Duration(meta.Delay)*time.Second != 5*time.Second {
t.Fatalf("unexpected delay stored in meta: %v", meta.Delay)
}
badID := platform.ID(0)
meta, err = s.FindTaskMetaByID(context.Background(), badID)
if err == nil {
t.Fatalf("failed to error on bad taskID")
}
if meta != nil {
t.Fatalf("expected nil meta when finding nonexistent ID, got %#v", meta)
}
if meta.Status != string(backend.DefaultTaskStatus) {
t.Fatalf("unexpected status: got %v, exp %v", meta.Status, backend.DefaultTaskStatus)
}
rc, err := s.CreateNextRun(context.Background(), id, 6065)
if err != nil {
t.Fatal(err)
}
badID := platform.ID(0)
meta, err = s.FindTaskMetaByID(context.Background(), badID)
if err == nil {
t.Fatalf("failed to error on bad taskID")
}
if meta != nil {
t.Fatalf("expected nil meta when finding nonexistent ID, got %#v", meta)
}
_, err = s.CreateNextRun(context.Background(), id, 6125)
if err != nil {
t.Fatal(err)
}
rc, err := s.CreateNextRun(context.Background(), id, 6065)
if err != nil {
t.Fatal(err)
}
err = s.FinishRun(context.Background(), id, rc.Created.RunID)
if err != nil {
t.Fatal(err)
}
_, err = s.CreateNextRun(context.Background(), id, 6125)
if err != nil {
t.Fatal(err)
}
meta, err = s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
err = s.FinishRun(context.Background(), id, rc.Created.RunID)
if err != nil {
t.Fatal(err)
}
if len(meta.CurrentlyRunning) != 1 {
t.Fatal("creating and finishing runs doesn't work")
}
meta, err = s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
if meta.LatestCompleted != 6060 {
t.Fatalf("expected LatestCompleted to be updated by finished run, but it wasn't; LatestCompleted=%d", meta.LatestCompleted)
}
if len(meta.CurrentlyRunning) != 1 {
t.Fatal("creating and finishing runs doesn't work")
}
if meta.LatestCompleted != 6060 {
t.Fatalf("expected LatestCompleted to be updated by finished run, but it wasn't; LatestCompleted=%d", meta.LatestCompleted)
}
})
t.Run("explicit status", func(t *testing.T) {
s := create(t)
defer destroy(t, s)
for _, st := range []backend.TaskStatus{backend.TaskActive, backend.TaskInactive} {
id, err := s.CreateTask(context.Background(), backend.CreateTaskRequest{Org: 1, User: 2, Script: script, Status: st})
if err != nil {
t.Fatal(err)
}
meta, err := s.FindTaskMetaByID(context.Background(), id)
if err != nil {
t.Fatal(err)
}
if meta.Status != string(st) {
t.Fatalf("got status %v, exp %v", meta.Status, st)
}
}
})
}
func testStoreFindIDAndMeta(t *testing.T, create CreateStoreFunc, destroy DestroyStoreFunc) {
@ -589,8 +621,8 @@ func testStoreTaskEnableDisable(t *testing.T, create CreateStoreFunc, destroy De
t.Fatal(err)
}
if meta.Status != string(backend.TaskEnabled) {
t.Fatal("task status not set to enabled on create")
if meta.Status != string(backend.TaskActive) {
t.Fatal("task status not set to active on create")
}
if err := s.DisableTask(context.Background(), id); err != nil {
@ -602,8 +634,8 @@ func testStoreTaskEnableDisable(t *testing.T, create CreateStoreFunc, destroy De
t.Fatal(err)
}
if meta.Status != string(backend.TaskDisabled) {
t.Fatal("task status not set to enabled on create")
if meta.Status != string(backend.TaskInactive) {
t.Fatal("task status not set to inactive after disabling")
}
if err := s.EnableTask(context.Background(), id); err != nil {
@ -615,8 +647,8 @@ func testStoreTaskEnableDisable(t *testing.T, create CreateStoreFunc, destroy De
t.Fatal(err)
}
if meta.Status != string(backend.TaskEnabled) {
t.Fatal("task status not set to enabled on create")
if meta.Status != string(backend.TaskActive) {
t.Fatal("task status not set to active after enabling")
}
}

View File

@ -116,9 +116,9 @@ func (p pAdapter) UpdateTask(ctx context.Context, id platform.ID, upd platform.T
if upd.Status != nil {
var err error
switch *upd.Status {
case string(backend.TaskEnabled):
case string(backend.TaskActive):
err = p.s.EnableTask(ctx, id)
case string(backend.TaskDisabled):
case string(backend.TaskInactive):
err = p.s.DisableTask(ctx, id)
default:
err = fmt.Errorf("invalid status: %s", *upd.Status)