refactor(task/backend): move the task/backend constants to the global package (#17133)

This moves a few types and constants to the global package so it can be
used without importing the `task/backend` package. These constants are
referenced in non tasks-specific code.

This is needed to break a dependency chain where the task backend will
call into the flux runtime to perform parsing or evaluation of a script
and to prevent the http package from inheriting that dependency.
pull/17109/head
Jonathan A. Sternberg 2020-03-06 16:19:32 -06:00 committed by GitHub
parent 39b7c2ab76
commit 834a8740e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 172 additions and 198 deletions

View File

@ -7,7 +7,6 @@ import (
"github.com/influxdata/influxdb"
platcontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/task/backend"
"go.uber.org/zap"
)
@ -273,7 +272,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID infl
return nil, err
}
if task.Status != string(backend.TaskActive) {
if task.Status != string(influxdb.TaskActive) {
return nil, ErrInactiveTask
}
@ -301,7 +300,7 @@ func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID influxdb.ID
return nil, err
}
if task.Status != string(backend.TaskActive) {
if task.Status != string(influxdb.TaskActive) {
return nil, ErrInactiveTask
}

View File

@ -13,7 +13,6 @@ import (
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/mock"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
"github.com/pkg/errors"
"go.uber.org/zap/zaptest"
)
@ -54,7 +53,7 @@ func mockTaskService(orgID, taskID, runID influxdb.ID) influxdb.TaskService {
ID: taskID,
OrganizationID: orgID,
Name: "cows",
Status: string(backend.TaskActive),
Status: string(influxdb.TaskActive),
Flux: `option task = {
name: "my_task",
every: 1s,

View File

@ -17,7 +17,6 @@ import (
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/pkg/httpc"
"github.com/influxdata/influxdb/task/backend"
"go.uber.org/zap"
)
@ -1634,7 +1633,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (*
return nil, influxdb.ErrRunNotFound
}
// RequestStillQueuedError is also part of the contract.
if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil {
if e := influxdb.ParseRequestStillQueuedError(err.Error()); e != nil {
return nil, *e
}
@ -1668,7 +1667,7 @@ func (t TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduled
}
// RequestStillQueuedError is also part of the contract.
if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil {
if e := influxdb.ParseRequestStillQueuedError(err.Error()); e != nil {
return nil, *e
}

View File

@ -20,7 +20,6 @@ import (
kithttp "github.com/influxdata/influxdb/kit/transport/http"
"github.com/influxdata/influxdb/mock"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
influxdbtesting "github.com/influxdata/influxdb/testing"
"go.uber.org/zap"
"go.uber.org/zap/zaptest"
@ -989,7 +988,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
return nil, influxdb.ErrTaskNotFound
}
return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
return &influxdb.Run{ID: runID, TaskID: taskID, Status: influxdb.RunScheduled.String()}, nil
},
},
method: http.MethodPost,
@ -1009,7 +1008,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
return nil, influxdb.ErrRunNotFound
}
return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
return &influxdb.Run{ID: runID, TaskID: taskID, Status: influxdb.RunScheduled.String()}, nil
},
},
method: http.MethodGet,
@ -1028,7 +1027,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
return nil, influxdb.ErrRunNotFound
}
return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil
return &influxdb.Run{ID: runID, TaskID: taskID, Status: influxdb.RunScheduled.String()}, nil
},
},
method: http.MethodPost,

View File

@ -6,11 +6,9 @@ import (
"strings"
"time"
"github.com/influxdata/influxdb/resource"
"github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/resource"
"github.com/influxdata/influxdb/task/options"
"go.uber.org/zap"
)
@ -34,7 +32,6 @@ var (
)
var _ influxdb.TaskService = (*Service)(nil)
var _ backend.TaskControlService = (*Service)(nil)
type kvTask struct {
ID influxdb.ID `json:"id"`
@ -600,7 +597,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate)
}
if tc.Status == "" {
tc.Status = string(backend.TaskActive)
tc.Status = string(influxdb.TaskActive)
}
createdAt := s.clock.Now().Truncate(time.Second).UTC()
@ -1163,7 +1160,7 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID
}
r.ID = s.IDGenerator.ID()
r.Status = backend.RunScheduled.String()
r.Status = influxdb.RunScheduled.String()
r.StartedAt = time.Time{}
r.FinishedAt = time.Time{}
r.RequestedAt = time.Time{}
@ -1231,7 +1228,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched
r := &influxdb.Run{
ID: s.IDGenerator.ID(),
TaskID: taskID,
Status: backend.RunScheduled.String(),
Status: influxdb.RunScheduled.String(),
RequestedAt: time.Now().UTC(),
ScheduledFor: t,
Log: []influxdb.Log{},
@ -1296,7 +1293,7 @@ func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, sche
TaskID: taskID,
ScheduledFor: t,
RunAt: runAt,
Status: backend.RunScheduled.String(),
Status: influxdb.RunScheduled.String(),
Log: []influxdb.Log{},
}
@ -1553,7 +1550,7 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I
}
// UpdateRunState sets the run state at the respective time.
func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error {
err := s.kv.Update(ctx, func(tx Tx) error {
err := s.updateRunState(ctx, tx, taskID, runID, when, state)
if err != nil {
@ -1564,7 +1561,7 @@ func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID,
return err
}
func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error {
// find run
run, err := s.findRunByID(ctx, tx, taskID, runID)
if err != nil {
@ -1574,9 +1571,9 @@ func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influ
// update state
run.Status = state.String()
switch state {
case backend.RunStarted:
case influxdb.RunStarted:
run.StartedAt = when
case backend.RunSuccess, backend.RunFail, backend.RunCanceled:
case influxdb.RunSuccess, influxdb.RunFail, influxdb.RunCanceled:
run.FinishedAt = when
}

View File

@ -13,7 +13,6 @@ import (
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/kv"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/servicetest"
"go.uber.org/zap/zaptest"
)
@ -127,7 +126,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
OrganizationID: ts.Org.ID,
OwnerID: ts.User.ID,
Status: string(backend.TaskActive),
Status: string(influxdb.TaskActive),
})
if err != nil {
t.Fatal(err)
@ -179,7 +178,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) {
}
// test status filter
active := string(backend.TaskActive)
active := string(influxdb.TaskActive)
tasksWithActiveFilter, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{Status: &active})
if err != nil {
t.Fatal("could not find tasks")
@ -205,7 +204,7 @@ func TestService_UpdateTask_InactiveToActive(t *testing.T) {
Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`,
OrganizationID: ts.Org.ID,
OwnerID: ts.User.ID,
Status: string(backend.TaskActive),
Status: string(influxdb.TaskActive),
})
if err != nil {
t.Fatal("CreateTask", err)
@ -304,7 +303,7 @@ func TestTaskRunCancellation(t *testing.T) {
t.Fatal(err)
}
if canceled.Status != backend.RunCanceled.String() {
if canceled.Status != influxdb.RunCanceled.String() {
t.Fatalf("expected task run to be cancelled")
}
}

View File

@ -135,7 +135,7 @@ type TaskControlService struct {
ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
FinishRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
UpdateRunStateFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error
UpdateRunStateFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error
AddRunLogFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error
}
@ -154,7 +154,7 @@ func (tcs *TaskControlService) StartManualRun(ctx context.Context, taskID, runID
func (tcs *TaskControlService) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
return tcs.FinishRunFn(ctx, taskID, runID)
}
func (tcs *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
func (tcs *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error {
return tcs.UpdateRunStateFn(ctx, taskID, runID, when, state)
}
func (tcs *TaskControlService) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error {

72
task.go
View File

@ -465,3 +465,75 @@ type LogFilter struct {
// The optional Run ID limits logs to a single run.
Run *ID
}
type TaskStatus string
const (
TaskActive TaskStatus = "active"
TaskInactive TaskStatus = "inactive"
DefaultTaskStatus TaskStatus = TaskActive
)
type RunStatus int
const (
RunStarted RunStatus = iota
RunSuccess
RunFail
RunCanceled
RunScheduled
)
func (r RunStatus) String() string {
switch r {
case RunStarted:
return "started"
case RunSuccess:
return "success"
case RunFail:
return "failed"
case RunCanceled:
return "canceled"
case RunScheduled:
return "scheduled"
}
panic(fmt.Sprintf("unknown RunStatus: %d", r))
}
// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
type RequestStillQueuedError struct {
// Unix timestamps matching existing request's start and end.
Start, End int64
}
const fmtRequestStillQueued = "previous retry for start=%s end=%s has not yet finished"
func (e RequestStillQueuedError) Error() string {
return fmt.Sprintf(fmtRequestStillQueued,
time.Unix(e.Start, 0).UTC().Format(time.RFC3339),
time.Unix(e.End, 0).UTC().Format(time.RFC3339),
)
}
// ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg.
// If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.
func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError {
var s, e string
n, err := fmt.Sscanf(msg, fmtRequestStillQueued, &s, &e)
if err != nil || n != 2 {
return nil
}
start, err := time.Parse(time.RFC3339, s)
if err != nil {
return nil
}
end, err := time.Parse(time.RFC3339, e)
if err != nil {
return nil
}
return &RequestStillQueuedError{Start: start.Unix(), End: end.Unix()}
}

View File

@ -36,7 +36,7 @@ func NotifyCoordinatorOfExisting(ctx context.Context, log *zap.Logger, ts TaskSe
latestCompleted := now()
for len(tasks) > 0 {
for _, task := range tasks {
if task.Status != string(TaskActive) {
if task.Status != string(influxdb.TaskActive) {
continue
}
@ -78,7 +78,7 @@ func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs Ta
latestCompleted := now()
for len(tasks) > 0 {
for _, task := range tasks {
if task.Status != string(TaskActive) {
if task.Status != string(influxdb.TaskActive) {
continue
}

View File

@ -6,7 +6,6 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/task/backend/middleware"
"github.com/influxdata/influxdb/task/backend/scheduler"
@ -131,7 +130,7 @@ func (c *Coordinator) TaskUpdated(ctx context.Context, from, to *influxdb.Task)
}
// if disabling the task, release it before schedule update
if to.Status != from.Status && to.Status == string(backend.TaskInactive) {
if to.Status != from.Status && to.Status == string(influxdb.TaskInactive) {
if err := c.sch.Release(sid); err != nil && err != influxdb.ErrTaskNotClaimed {
return err
}

View File

@ -1,16 +0,0 @@
package backend_test
import (
"testing"
"github.com/influxdata/influxdb/task/backend"
)
func TestParseRequestStillQueuedError(t *testing.T) {
e := backend.RequestStillQueuedError{Start: 1000, End: 2000}
validMsg := e.Error()
if err := backend.ParseRequestStillQueuedError(validMsg); err == nil || *err != e {
t.Fatalf("%q should have parsed to %v, but got %v", validMsg, e, err)
}
}

View File

@ -279,7 +279,7 @@ func (w *worker) work() {
// If done the promise was canceled
case <-prom.ctx.Done():
w.e.tcs.AddRunLog(prom.ctx, prom.task.ID, prom.run.ID, time.Now().UTC(), "Run canceled")
w.e.tcs.UpdateRunState(prom.ctx, prom.task.ID, prom.run.ID, time.Now().UTC(), backend.RunCanceled)
w.e.tcs.UpdateRunState(prom.ctx, prom.task.ID, prom.run.ID, time.Now().UTC(), influxdb.RunCanceled)
prom.err = influxdb.ErrRunCanceled
close(prom.done)
return
@ -306,14 +306,14 @@ func (w *worker) start(p *promise) {
// add to run log
w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux))
// update run status
w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), backend.RunStarted)
w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), influxdb.RunStarted)
// add to metrics
w.e.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt))
p.startedAt = time.Now()
}
func (w *worker) finish(p *promise, rs backend.RunStatus, err error) {
func (w *worker) finish(p *promise, rs influxdb.RunStatus, err error) {
// trace
span, ctx := tracing.StartSpanFromContext(p.ctx)
@ -365,7 +365,7 @@ func (w *worker) executeQuery(p *promise) {
pkg, err := flux.Parse(p.task.Flux)
if err != nil {
w.finish(p, backend.RunFail, influxdb.ErrFluxParseError(err))
w.finish(p, influxdb.RunFail, influxdb.ErrFluxParseError(err))
return
}
@ -384,7 +384,7 @@ func (w *worker) executeQuery(p *promise) {
it, err := w.e.qs.Query(ctx, req)
if err != nil {
// Assume the error should not be part of the runResult.
w.finish(p, backend.RunFail, influxdb.ErrQueryError(err))
w.finish(p, influxdb.RunFail, influxdb.ErrQueryError(err))
return
}
@ -407,16 +407,16 @@ func (w *worker) executeQuery(p *promise) {
}
if runErr != nil {
w.finish(p, backend.RunFail, influxdb.ErrRunExecutionError(runErr))
w.finish(p, influxdb.RunFail, influxdb.ErrRunExecutionError(runErr))
return
}
if it.Err() != nil {
w.finish(p, backend.RunFail, influxdb.ErrResultIteratorError(it.Err()))
w.finish(p, influxdb.RunFail, influxdb.ErrResultIteratorError(it.Err()))
return
}
w.finish(p, backend.RunSuccess, nil)
w.finish(p, influxdb.RunSuccess, nil)
}
// RunsActive returns the current number of workers, which is equivalent to

View File

@ -4,7 +4,6 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
"github.com/prometheus/client_golang/prometheus"
)
@ -144,7 +143,7 @@ func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duratio
}
// FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID.
func (em *ExecutorMetrics) FinishRun(task *influxdb.Task, status backend.RunStatus, runDuration time.Duration) {
func (em *ExecutorMetrics) FinishRun(task *influxdb.Task, status influxdb.RunStatus, runDuration time.Duration) {
em.totalRunsComplete.WithLabelValues(task.Type, status.String()).Inc()
em.runDuration.WithLabelValues(task.Type, "all").Observe(runDuration.Seconds())

View File

@ -5,8 +5,6 @@ import (
"fmt"
"time"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb"
)
@ -80,7 +78,7 @@ func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
// this allows us to see not run the task for inactive time
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) {
toTask.LatestCompleted = cs.Now()
}
@ -111,7 +109,7 @@ func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb.
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
// this allows us to see not run the task for inactive time
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) {
toTask.LatestCompleted = cs.Now()
}

View File

@ -6,8 +6,6 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/notification/check"
@ -192,9 +190,9 @@ func TestCheckUpdateFromInactive(t *testing.T) {
mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) {
if id == 1 {
return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil
return &influxdb.Task{ID: id, Status: string(influxdb.TaskInactive)}, nil
} else if id == 10 {
return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil
return &influxdb.Task{ID: id, Status: string(influxdb.TaskActive)}, nil
}
return &influxdb.Task{ID: id}, nil
}

View File

@ -43,7 +43,7 @@ func inmemTaskService() influxdb.TaskService {
id := gen.ID()
task := &influxdb.Task{ID: id, Flux: tc.Flux, Cron: "* * * * *", Status: tc.Status, OrganizationID: tc.OrganizationID, Organization: tc.Organization}
if task.Status == "" {
task.Status = string(backend.TaskActive)
task.Status = string(influxdb.TaskActive)
}
tasks[id] = task
@ -152,7 +152,7 @@ func TestCoordinatingTaskService(t *testing.T) {
t.Fatal(err)
}
inactive := string(backend.TaskInactive)
inactive := string(influxdb.TaskInactive)
res, err := middleware.UpdateTask(context.Background(), task.ID, influxdb.TaskUpdate{Status: &inactive})
if err != nil {
t.Fatal(err)
@ -172,7 +172,7 @@ func TestCoordinatingTaskService(t *testing.T) {
t.Fatal("task sent to scheduler doesnt match task created")
}
active := string(backend.TaskActive)
active := string(influxdb.TaskActive)
if _, err := middleware.UpdateTask(context.Background(), task.ID, influxdb.TaskUpdate{Status: &active}); err != nil {
t.Fatal(err)
}

View File

@ -5,8 +5,6 @@ import (
"fmt"
"time"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb"
)
@ -79,7 +77,7 @@ func (ns *CoordinatingNotificationRuleStore) UpdateNotificationRule(ctx context.
}
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
// this allows us to see not run the task for inactive time
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) {
toTask.LatestCompleted = ns.Now()
}
@ -110,7 +108,7 @@ func (ns *CoordinatingNotificationRuleStore) PatchNotificationRule(ctx context.C
// if the update is to activate and the previous task was inactive we should add a "latest completed" update
// this allows us to see not run the task for inactive time
if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) {
if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) {
toTask.LatestCompleted = ns.Now()
}

View File

@ -6,8 +6,6 @@ import (
"testing"
"time"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/notification/rule"
"github.com/influxdata/influxdb/task/backend/middleware"
@ -83,9 +81,9 @@ func TestNotificationRuleUpdateFromInactive(t *testing.T) {
mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) {
if id == 1 {
return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil
return &influxdb.Task{ID: id, Status: string(influxdb.TaskInactive)}, nil
} else if id == 10 {
return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil
return &influxdb.Task{ID: id, Status: string(influxdb.TaskActive)}, nil
}
return &influxdb.Task{ID: id}, nil
}

View File

@ -2,7 +2,6 @@ package backend
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb"
@ -25,80 +24,8 @@ type TaskControlService interface {
FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
// UpdateRunState sets the run state at the respective time.
UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state RunStatus) error
UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error
// AddRunLog adds a log line to the run.
AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error
}
type TaskStatus string
const (
TaskActive TaskStatus = "active"
TaskInactive TaskStatus = "inactive"
DefaultTaskStatus TaskStatus = TaskActive
)
type RunStatus int
const (
RunStarted RunStatus = iota
RunSuccess
RunFail
RunCanceled
RunScheduled
)
func (r RunStatus) String() string {
switch r {
case RunStarted:
return "started"
case RunSuccess:
return "success"
case RunFail:
return "failed"
case RunCanceled:
return "canceled"
case RunScheduled:
return "scheduled"
}
panic(fmt.Sprintf("unknown RunStatus: %d", r))
}
// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed.
type RequestStillQueuedError struct {
// Unix timestamps matching existing request's start and end.
Start, End int64
}
const fmtRequestStillQueued = "previous retry for start=%s end=%s has not yet finished"
func (e RequestStillQueuedError) Error() string {
return fmt.Sprintf(fmtRequestStillQueued,
time.Unix(e.Start, 0).UTC().Format(time.RFC3339),
time.Unix(e.End, 0).UTC().Format(time.RFC3339),
)
}
// ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg.
// If msg is formatted correctly, the resultant error is returned; otherwise it returns nil.
func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError {
var s, e string
n, err := fmt.Sscanf(msg, fmtRequestStillQueued, &s, &e)
if err != nil || n != 2 {
return nil
}
start, err := time.Parse(time.RFC3339, s)
if err != nil {
return nil
}
end, err := time.Parse(time.RFC3339, e)
if err != nil {
return nil
}
return &RequestStillQueuedError{Start: start.Unix(), End: end.Unix()}
}

View File

@ -130,7 +130,7 @@ func (t *TaskControlService) ManualRuns(ctx context.Context, taskID influxdb.ID)
}
// UpdateRunState sets the run state at the respective time.
func (d *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
func (d *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error {
d.mu.Lock()
defer d.mu.Unlock()
@ -139,11 +139,11 @@ func (d *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID i
panic("run state called without a run")
}
switch state {
case backend.RunStarted:
case influxdb.RunStarted:
run.StartedAt = when
case backend.RunSuccess, backend.RunFail, backend.RunCanceled:
case influxdb.RunSuccess, influxdb.RunFail, influxdb.RunCanceled:
run.FinishedAt = when
case backend.RunScheduled:
case influxdb.RunScheduled:
// nothing
default:
panic("invalid status")

View File

@ -276,7 +276,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
Name: "task #0",
Cron: "* * * * *",
Offset: 5 * time.Second,
Status: string(backend.DefaultTaskStatus),
Status: string(influxdb.DefaultTaskStatus),
Flux: fmt.Sprintf(scriptFmt, 0),
Type: influxdb.TaskSystemType,
}
@ -292,7 +292,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 1),
OwnerID: cr.UserID,
Status: string(backend.TaskInactive),
Status: string(influxdb.TaskInactive),
}
if _, err := sys.TaskService.CreateTask(authorizedCtx, tc2); err != nil {
@ -327,24 +327,24 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
// Check task status filter
active := string(backend.TaskActive)
active := string(influxdb.TaskActive)
fs, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{Status: &active})
if err != nil {
t.Fatal(err)
}
activeTasks := findTasksByStatus(fs, string(backend.TaskActive))
activeTasks := findTasksByStatus(fs, string(influxdb.TaskActive))
if len(fs) != len(activeTasks) {
t.Fatalf("expected to find %d active tasks, found: %d", len(activeTasks), len(fs))
}
inactive := string(backend.TaskInactive)
inactive := string(influxdb.TaskInactive)
fs, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{Status: &inactive})
if err != nil {
t.Fatal(err)
}
inactiveTasks := findTasksByStatus(fs, string(backend.TaskInactive))
inactiveTasks := findTasksByStatus(fs, string(influxdb.TaskInactive))
if len(fs) != len(inactiveTasks) {
t.Fatalf("expected to find %d inactive tasks, found: %d", len(inactiveTasks), len(fs))
}
@ -363,12 +363,12 @@ func testTaskCRUD(t *testing.T, sys *System) {
if f.Flux != newFlux {
t.Fatalf("wrong flux from update; want %q, got %q", newFlux, f.Flux)
}
if f.Status != string(backend.TaskActive) {
if f.Status != string(influxdb.TaskActive) {
t.Fatalf("expected task to be created active, got %q", f.Status)
}
// Update task: status only.
newStatus := string(backend.TaskInactive)
newStatus := string(influxdb.TaskInactive)
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Status: &newStatus})
if err != nil {
t.Fatal(err)
@ -381,7 +381,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
// Update task: reactivate status and update script.
newStatus = string(backend.TaskActive)
newStatus = string(influxdb.TaskActive)
newFlux = fmt.Sprintf(scriptFmt, 98)
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Flux: &newFlux, Status: &newStatus})
if err != nil {
@ -395,7 +395,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
// Update task: just update an option.
newStatus = string(backend.TaskActive)
newStatus = string(influxdb.TaskActive)
newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tcron: \"* * * * *\",\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")"
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Name: "task-changed #98"}})
if err != nil {
@ -410,7 +410,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
// Update task: switch to every.
newStatus = string(backend.TaskActive)
newStatus = string(influxdb.TaskActive)
newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")"
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}})
if err != nil {
@ -425,7 +425,7 @@ func testTaskCRUD(t *testing.T, sys *System) {
}
// Update task: just cron.
newStatus = string(backend.TaskActive)
newStatus = string(influxdb.TaskActive)
newFlux = fmt.Sprintf(scriptDifferentName, 98)
f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Cron: "* * * * *"}})
if err != nil {
@ -748,11 +748,11 @@ func testUpdate(t *testing.T, sys *System) {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), influxdb.RunStarted); err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), backend.RunSuccess); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), influxdb.RunSuccess); err != nil {
t.Fatal(err)
}
@ -782,7 +782,7 @@ func testUpdate(t *testing.T, sys *System) {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), influxdb.RunStarted); err != nil {
t.Fatal(err)
}
@ -790,7 +790,7 @@ func testUpdate(t *testing.T, sys *System) {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), backend.RunFail); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), influxdb.RunFail); err != nil {
t.Fatal(err)
}
@ -905,7 +905,7 @@ func testTaskRuns(t *testing.T, sys *System) {
startedAt := time.Now().UTC()
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, influxdb.RunStarted); err != nil {
t.Fatal(err)
}
@ -918,7 +918,7 @@ func testTaskRuns(t *testing.T, sys *System) {
}
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, influxdb.RunStarted); err != nil {
t.Fatal(err)
}
@ -932,7 +932,7 @@ func testTaskRuns(t *testing.T, sys *System) {
}
// Mark the second run finished.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), backend.RunSuccess); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), influxdb.RunSuccess); err != nil {
t.Fatal(err)
}
@ -954,8 +954,8 @@ func testTaskRuns(t *testing.T, sys *System) {
if runs[0].StartedAt != startedAt {
t.Fatalf("unexpectedStartedAt; want %s, got %s", startedAt, runs[0].StartedAt)
}
if runs[0].Status != backend.RunStarted.String() {
t.Fatalf("unexpected run status; want %s, got %s", backend.RunStarted.String(), runs[0].Status)
if runs[0].Status != influxdb.RunStarted.String() {
t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunStarted.String(), runs[0].Status)
}
if !runs[0].FinishedAt.IsZero() {
@ -1035,7 +1035,7 @@ func testTaskRuns(t *testing.T, sys *System) {
if err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, time.Now(), backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, time.Now(), influxdb.RunStarted); err != nil {
t.Fatal(err)
}
@ -1043,7 +1043,7 @@ func testTaskRuns(t *testing.T, sys *System) {
if err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), influxdb.RunStarted); err != nil {
t.Fatal(err)
}
// Add a log for the first run.
@ -1350,7 +1350,7 @@ func testRunStorage(t *testing.T, sys *System) {
startedAt := time.Now().UTC().Add(time.Second * -10)
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, influxdb.RunStarted); err != nil {
t.Fatal(err)
}
@ -1363,12 +1363,12 @@ func testRunStorage(t *testing.T, sys *System) {
}
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), influxdb.RunStarted); err != nil {
t.Fatal(err)
}
// Mark the second run finished.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second*2), backend.RunFail); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second*2), influxdb.RunFail); err != nil {
t.Fatal(err)
}
@ -1390,8 +1390,8 @@ func testRunStorage(t *testing.T, sys *System) {
if runs[0].StartedAt != startedAt {
t.Fatalf("unexpectedStartedAt; want %s, got %s", startedAt, runs[0].StartedAt)
}
if runs[0].Status != backend.RunStarted.String() {
t.Fatalf("unexpected run status; want %s, got %s", backend.RunStarted.String(), runs[0].Status)
if runs[0].Status != influxdb.RunStarted.String() {
t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunStarted.String(), runs[0].Status)
}
if !runs[0].FinishedAt.IsZero() {
@ -1403,11 +1403,11 @@ func testRunStorage(t *testing.T, sys *System) {
if err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*3), backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*3), influxdb.RunStarted); err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*4), backend.RunSuccess); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*4), influxdb.RunSuccess); err != nil {
t.Fatal(err)
}
if _, err := sys.TaskControlService.FinishRun(sys.Ctx, task.ID, rc2.ID); err != nil {
@ -1440,8 +1440,8 @@ func testRunStorage(t *testing.T, sys *System) {
if runs[0].StartedAt != startedAt {
t.Fatalf("unexpectedStartedAt; want %s, got %s", startedAt, runs[0].StartedAt)
}
if runs[0].Status != backend.RunStarted.String() {
t.Fatalf("unexpected run status; want %s, got %s", backend.RunStarted.String(), runs[0].Status)
if runs[0].Status != influxdb.RunStarted.String() {
t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunStarted.String(), runs[0].Status)
}
// TODO (al): handle empty finishedAt
// if runs[0].FinishedAt != "" {
@ -1455,8 +1455,8 @@ func testRunStorage(t *testing.T, sys *System) {
if exp := startedAt.Add(time.Second); runs[2].StartedAt != exp {
t.Fatalf("unexpected StartedAt; want %s, got %s", exp, runs[2].StartedAt)
}
if runs[2].Status != backend.RunFail.String() {
t.Fatalf("unexpected run status; want %s, got %s", backend.RunSuccess.String(), runs[2].Status)
if runs[2].Status != influxdb.RunFail.String() {
t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunSuccess.String(), runs[2].Status)
}
if exp := startedAt.Add(time.Second * 2); runs[2].FinishedAt != exp {
t.Fatalf("unexpected FinishedAt; want %s, got %s", exp, runs[2].FinishedAt)
@ -1525,10 +1525,10 @@ func testRetryAcrossStorage(t *testing.T, sys *System) {
startedAt := time.Now().UTC()
// Update the run state to Started then Failed; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt, backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt, influxdb.RunStarted); err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt.Add(time.Second), backend.RunFail); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt.Add(time.Second), influxdb.RunFail); err != nil {
t.Fatal(err)
}
if _, err := sys.TaskControlService.FinishRun(sys.Ctx, task.ID, rc.ID); err != nil {
@ -1551,7 +1551,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) {
t.Fatalf("wrong scheduledFor on task: got %s, want %s", m.ScheduledFor, rc.ScheduledFor)
}
exp := backend.RequestStillQueuedError{Start: rc.ScheduledFor.Unix(), End: rc.ScheduledFor.Unix()}
exp := influxdb.RequestStillQueuedError{Start: rc.ScheduledFor.Unix(), End: rc.ScheduledFor.Unix()}
// Retrying a run which has been queued but not started, should be rejected.
if _, err = sys.TaskService.RetryRun(sys.Ctx, task.ID, rc.ID); err != exp && err.Error() != "run already queued" {
@ -1587,7 +1587,7 @@ func testLogsAcrossStorage(t *testing.T, sys *System) {
startedAt := time.Now().UTC()
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, influxdb.RunStarted); err != nil {
t.Fatal(err)
}
@ -1600,12 +1600,12 @@ func testLogsAcrossStorage(t *testing.T, sys *System) {
}
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, influxdb.RunStarted); err != nil {
t.Fatal(err)
}
// Mark the second run finished.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), backend.RunSuccess); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), influxdb.RunSuccess); err != nil {
t.Fatal(err)
}

View File

@ -156,3 +156,12 @@ from(bucket: "x")
})
}
func TestParseRequestStillQueuedError(t *testing.T) {
e := platform.RequestStillQueuedError{Start: 1000, End: 2000}
validMsg := e.Error()
if err := platform.ParseRequestStillQueuedError(validMsg); err == nil || *err != e {
t.Fatalf("%q should have parsed to %v, but got %v", validMsg, e, err)
}
}