From 834a8740e00226deeb73090f1b3b3f85b116a3a8 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 6 Mar 2020 16:19:32 -0600 Subject: [PATCH] refactor(task/backend): move the task/backend constants to the global package (#17133) This moves a few types and constants to the global package so it can be used without importing the `task/backend` package. These constants are referenced in non tasks-specific code. This is needed to break a dependency chain where the task backend will call into the flux runtime to perform parsing or evaluation of a script and to prevent the http package from inheriting that dependency. --- authorizer/task.go | 5 +- authorizer/task_test.go | 3 +- http/task_service.go | 5 +- http/task_service_test.go | 7 +- kv/task.go | 21 +++-- kv/task_test.go | 9 +-- mock/task_service.go | 4 +- task.go | 72 +++++++++++++++++ task/backend/coordinator.go | 4 +- task/backend/coordinator/coordinator.go | 3 +- task/backend/error_test.go | 16 ---- task/backend/executor/executor.go | 16 ++-- task/backend/executor/executor_metrics.go | 3 +- task/backend/middleware/check_middleware.go | 6 +- .../middleware/check_middleware_test.go | 6 +- task/backend/middleware/middleware_test.go | 6 +- .../middleware/notification_middleware.go | 6 +- .../notification_middleware_test.go | 6 +- task/backend/task.go | 75 +---------------- task/mock/task_control_service.go | 8 +- task/servicetest/servicetest.go | 80 +++++++++---------- task_test.go | 9 +++ 22 files changed, 172 insertions(+), 198 deletions(-) delete mode 100644 task/backend/error_test.go diff --git a/authorizer/task.go b/authorizer/task.go index ea8ad050ab..b5ef7a46c7 100644 --- a/authorizer/task.go +++ b/authorizer/task.go @@ -7,7 +7,6 @@ import ( "github.com/influxdata/influxdb" platcontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/kit/tracing" - "github.com/influxdata/influxdb/task/backend" "go.uber.org/zap" ) @@ -273,7 +272,7 @@ func (ts *taskServiceValidator) RetryRun(ctx context.Context, taskID, runID infl return nil, err } - if task.Status != string(backend.TaskActive) { + if task.Status != string(influxdb.TaskActive) { return nil, ErrInactiveTask } @@ -301,7 +300,7 @@ func (ts *taskServiceValidator) ForceRun(ctx context.Context, taskID influxdb.ID return nil, err } - if task.Status != string(backend.TaskActive) { + if task.Status != string(influxdb.TaskActive) { return nil, ErrInactiveTask } diff --git a/authorizer/task_test.go b/authorizer/task_test.go index 0b589fa74c..4dfc80333d 100644 --- a/authorizer/task_test.go +++ b/authorizer/task_test.go @@ -13,7 +13,6 @@ import ( "github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/mock" _ "github.com/influxdata/influxdb/query/builtin" - "github.com/influxdata/influxdb/task/backend" "github.com/pkg/errors" "go.uber.org/zap/zaptest" ) @@ -54,7 +53,7 @@ func mockTaskService(orgID, taskID, runID influxdb.ID) influxdb.TaskService { ID: taskID, OrganizationID: orgID, Name: "cows", - Status: string(backend.TaskActive), + Status: string(influxdb.TaskActive), Flux: `option task = { name: "my_task", every: 1s, diff --git a/http/task_service.go b/http/task_service.go index 5b336055b0..fd7d306884 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -17,7 +17,6 @@ import ( "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/kv" "github.com/influxdata/influxdb/pkg/httpc" - "github.com/influxdata/influxdb/task/backend" "go.uber.org/zap" ) @@ -1634,7 +1633,7 @@ func (t TaskService) RetryRun(ctx context.Context, taskID, runID influxdb.ID) (* return nil, influxdb.ErrRunNotFound } // RequestStillQueuedError is also part of the contract. - if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil { + if e := influxdb.ParseRequestStillQueuedError(err.Error()); e != nil { return nil, *e } @@ -1668,7 +1667,7 @@ func (t TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, scheduled } // RequestStillQueuedError is also part of the contract. - if e := backend.ParseRequestStillQueuedError(err.Error()); e != nil { + if e := influxdb.ParseRequestStillQueuedError(err.Error()); e != nil { return nil, *e } diff --git a/http/task_service_test.go b/http/task_service_test.go index bbeeb29e82..5a19fbc2fd 100644 --- a/http/task_service_test.go +++ b/http/task_service_test.go @@ -20,7 +20,6 @@ import ( kithttp "github.com/influxdata/influxdb/kit/transport/http" "github.com/influxdata/influxdb/mock" _ "github.com/influxdata/influxdb/query/builtin" - "github.com/influxdata/influxdb/task/backend" influxdbtesting "github.com/influxdata/influxdb/testing" "go.uber.org/zap" "go.uber.org/zap/zaptest" @@ -989,7 +988,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { return nil, influxdb.ErrTaskNotFound } - return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil + return &influxdb.Run{ID: runID, TaskID: taskID, Status: influxdb.RunScheduled.String()}, nil }, }, method: http.MethodPost, @@ -1009,7 +1008,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { return nil, influxdb.ErrRunNotFound } - return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil + return &influxdb.Run{ID: runID, TaskID: taskID, Status: influxdb.RunScheduled.String()}, nil }, }, method: http.MethodGet, @@ -1028,7 +1027,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) { return nil, influxdb.ErrRunNotFound } - return &influxdb.Run{ID: runID, TaskID: taskID, Status: backend.RunScheduled.String()}, nil + return &influxdb.Run{ID: runID, TaskID: taskID, Status: influxdb.RunScheduled.String()}, nil }, }, method: http.MethodPost, diff --git a/kv/task.go b/kv/task.go index dbf16f3163..b08b5d8d35 100644 --- a/kv/task.go +++ b/kv/task.go @@ -6,11 +6,9 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/resource" - "github.com/influxdata/influxdb" icontext "github.com/influxdata/influxdb/context" - "github.com/influxdata/influxdb/task/backend" + "github.com/influxdata/influxdb/resource" "github.com/influxdata/influxdb/task/options" "go.uber.org/zap" ) @@ -34,7 +32,6 @@ var ( ) var _ influxdb.TaskService = (*Service)(nil) -var _ backend.TaskControlService = (*Service)(nil) type kvTask struct { ID influxdb.ID `json:"id"` @@ -600,7 +597,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) } if tc.Status == "" { - tc.Status = string(backend.TaskActive) + tc.Status = string(influxdb.TaskActive) } createdAt := s.clock.Now().Truncate(time.Second).UTC() @@ -1163,7 +1160,7 @@ func (s *Service) retryRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID } r.ID = s.IDGenerator.ID() - r.Status = backend.RunScheduled.String() + r.Status = influxdb.RunScheduled.String() r.StartedAt = time.Time{} r.FinishedAt = time.Time{} r.RequestedAt = time.Time{} @@ -1231,7 +1228,7 @@ func (s *Service) forceRun(ctx context.Context, tx Tx, taskID influxdb.ID, sched r := &influxdb.Run{ ID: s.IDGenerator.ID(), TaskID: taskID, - Status: backend.RunScheduled.String(), + Status: influxdb.RunScheduled.String(), RequestedAt: time.Now().UTC(), ScheduledFor: t, Log: []influxdb.Log{}, @@ -1296,7 +1293,7 @@ func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, sche TaskID: taskID, ScheduledFor: t, RunAt: runAt, - Status: backend.RunScheduled.String(), + Status: influxdb.RunScheduled.String(), Log: []influxdb.Log{}, } @@ -1553,7 +1550,7 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I } // UpdateRunState sets the run state at the respective time. -func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error { +func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error { err := s.kv.Update(ctx, func(tx Tx) error { err := s.updateRunState(ctx, tx, taskID, runID, when, state) if err != nil { @@ -1564,7 +1561,7 @@ func (s *Service) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, return err } -func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error { +func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error { // find run run, err := s.findRunByID(ctx, tx, taskID, runID) if err != nil { @@ -1574,9 +1571,9 @@ func (s *Service) updateRunState(ctx context.Context, tx Tx, taskID, runID influ // update state run.Status = state.String() switch state { - case backend.RunStarted: + case influxdb.RunStarted: run.StartedAt = when - case backend.RunSuccess, backend.RunFail, backend.RunCanceled: + case influxdb.RunSuccess, influxdb.RunFail, influxdb.RunCanceled: run.FinishedAt = when } diff --git a/kv/task_test.go b/kv/task_test.go index dbde31f131..0239561fb3 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -13,7 +13,6 @@ import ( icontext "github.com/influxdata/influxdb/context" "github.com/influxdata/influxdb/kv" _ "github.com/influxdata/influxdb/query/builtin" - "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/servicetest" "go.uber.org/zap/zaptest" ) @@ -127,7 +126,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) { Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`, OrganizationID: ts.Org.ID, OwnerID: ts.User.ID, - Status: string(backend.TaskActive), + Status: string(influxdb.TaskActive), }) if err != nil { t.Fatal(err) @@ -179,7 +178,7 @@ func TestRetrieveTaskWithBadAuth(t *testing.T) { } // test status filter - active := string(backend.TaskActive) + active := string(influxdb.TaskActive) tasksWithActiveFilter, _, err := ts.Service.FindTasks(ctx, influxdb.TaskFilter{Status: &active}) if err != nil { t.Fatal("could not find tasks") @@ -205,7 +204,7 @@ func TestService_UpdateTask_InactiveToActive(t *testing.T) { Flux: `option task = {name: "a task",every: 1h} from(bucket:"test") |> range(start:-1h)`, OrganizationID: ts.Org.ID, OwnerID: ts.User.ID, - Status: string(backend.TaskActive), + Status: string(influxdb.TaskActive), }) if err != nil { t.Fatal("CreateTask", err) @@ -304,7 +303,7 @@ func TestTaskRunCancellation(t *testing.T) { t.Fatal(err) } - if canceled.Status != backend.RunCanceled.String() { + if canceled.Status != influxdb.RunCanceled.String() { t.Fatalf("expected task run to be cancelled") } } diff --git a/mock/task_service.go b/mock/task_service.go index 3007c9cff6..4b44d91936 100644 --- a/mock/task_service.go +++ b/mock/task_service.go @@ -135,7 +135,7 @@ type TaskControlService struct { ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) FinishRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) - UpdateRunStateFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error + UpdateRunStateFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error AddRunLogFn func(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error } @@ -154,7 +154,7 @@ func (tcs *TaskControlService) StartManualRun(ctx context.Context, taskID, runID func (tcs *TaskControlService) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) { return tcs.FinishRunFn(ctx, taskID, runID) } -func (tcs *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error { +func (tcs *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error { return tcs.UpdateRunStateFn(ctx, taskID, runID, when, state) } func (tcs *TaskControlService) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error { diff --git a/task.go b/task.go index 868944bc53..2aad09b64b 100644 --- a/task.go +++ b/task.go @@ -465,3 +465,75 @@ type LogFilter struct { // The optional Run ID limits logs to a single run. Run *ID } + +type TaskStatus string + +const ( + TaskActive TaskStatus = "active" + TaskInactive TaskStatus = "inactive" + + DefaultTaskStatus TaskStatus = TaskActive +) + +type RunStatus int + +const ( + RunStarted RunStatus = iota + RunSuccess + RunFail + RunCanceled + RunScheduled +) + +func (r RunStatus) String() string { + switch r { + case RunStarted: + return "started" + case RunSuccess: + return "success" + case RunFail: + return "failed" + case RunCanceled: + return "canceled" + case RunScheduled: + return "scheduled" + } + panic(fmt.Sprintf("unknown RunStatus: %d", r)) +} + +// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed. +type RequestStillQueuedError struct { + // Unix timestamps matching existing request's start and end. + Start, End int64 +} + +const fmtRequestStillQueued = "previous retry for start=%s end=%s has not yet finished" + +func (e RequestStillQueuedError) Error() string { + return fmt.Sprintf(fmtRequestStillQueued, + time.Unix(e.Start, 0).UTC().Format(time.RFC3339), + time.Unix(e.End, 0).UTC().Format(time.RFC3339), + ) +} + +// ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg. +// If msg is formatted correctly, the resultant error is returned; otherwise it returns nil. +func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError { + var s, e string + n, err := fmt.Sscanf(msg, fmtRequestStillQueued, &s, &e) + if err != nil || n != 2 { + return nil + } + + start, err := time.Parse(time.RFC3339, s) + if err != nil { + return nil + } + + end, err := time.Parse(time.RFC3339, e) + if err != nil { + return nil + } + + return &RequestStillQueuedError{Start: start.Unix(), End: end.Unix()} +} diff --git a/task/backend/coordinator.go b/task/backend/coordinator.go index 146c71a2c0..a4b7707530 100644 --- a/task/backend/coordinator.go +++ b/task/backend/coordinator.go @@ -36,7 +36,7 @@ func NotifyCoordinatorOfExisting(ctx context.Context, log *zap.Logger, ts TaskSe latestCompleted := now() for len(tasks) > 0 { for _, task := range tasks { - if task.Status != string(TaskActive) { + if task.Status != string(influxdb.TaskActive) { continue } @@ -78,7 +78,7 @@ func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs Ta latestCompleted := now() for len(tasks) > 0 { for _, task := range tasks { - if task.Status != string(TaskActive) { + if task.Status != string(influxdb.TaskActive) { continue } diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index 7c9060a6cc..c885bfd27b 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -6,7 +6,6 @@ import ( "time" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/task/backend" "github.com/influxdata/influxdb/task/backend/executor" "github.com/influxdata/influxdb/task/backend/middleware" "github.com/influxdata/influxdb/task/backend/scheduler" @@ -131,7 +130,7 @@ func (c *Coordinator) TaskUpdated(ctx context.Context, from, to *influxdb.Task) } // if disabling the task, release it before schedule update - if to.Status != from.Status && to.Status == string(backend.TaskInactive) { + if to.Status != from.Status && to.Status == string(influxdb.TaskInactive) { if err := c.sch.Release(sid); err != nil && err != influxdb.ErrTaskNotClaimed { return err } diff --git a/task/backend/error_test.go b/task/backend/error_test.go deleted file mode 100644 index 71024f7e15..0000000000 --- a/task/backend/error_test.go +++ /dev/null @@ -1,16 +0,0 @@ -package backend_test - -import ( - "testing" - - "github.com/influxdata/influxdb/task/backend" -) - -func TestParseRequestStillQueuedError(t *testing.T) { - e := backend.RequestStillQueuedError{Start: 1000, End: 2000} - validMsg := e.Error() - - if err := backend.ParseRequestStillQueuedError(validMsg); err == nil || *err != e { - t.Fatalf("%q should have parsed to %v, but got %v", validMsg, e, err) - } -} diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 64e4e1b04b..b2b012010c 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -279,7 +279,7 @@ func (w *worker) work() { // If done the promise was canceled case <-prom.ctx.Done(): w.e.tcs.AddRunLog(prom.ctx, prom.task.ID, prom.run.ID, time.Now().UTC(), "Run canceled") - w.e.tcs.UpdateRunState(prom.ctx, prom.task.ID, prom.run.ID, time.Now().UTC(), backend.RunCanceled) + w.e.tcs.UpdateRunState(prom.ctx, prom.task.ID, prom.run.ID, time.Now().UTC(), influxdb.RunCanceled) prom.err = influxdb.ErrRunCanceled close(prom.done) return @@ -306,14 +306,14 @@ func (w *worker) start(p *promise) { // add to run log w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux)) // update run status - w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), backend.RunStarted) + w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), influxdb.RunStarted) // add to metrics w.e.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt)) p.startedAt = time.Now() } -func (w *worker) finish(p *promise, rs backend.RunStatus, err error) { +func (w *worker) finish(p *promise, rs influxdb.RunStatus, err error) { // trace span, ctx := tracing.StartSpanFromContext(p.ctx) @@ -365,7 +365,7 @@ func (w *worker) executeQuery(p *promise) { pkg, err := flux.Parse(p.task.Flux) if err != nil { - w.finish(p, backend.RunFail, influxdb.ErrFluxParseError(err)) + w.finish(p, influxdb.RunFail, influxdb.ErrFluxParseError(err)) return } @@ -384,7 +384,7 @@ func (w *worker) executeQuery(p *promise) { it, err := w.e.qs.Query(ctx, req) if err != nil { // Assume the error should not be part of the runResult. - w.finish(p, backend.RunFail, influxdb.ErrQueryError(err)) + w.finish(p, influxdb.RunFail, influxdb.ErrQueryError(err)) return } @@ -407,16 +407,16 @@ func (w *worker) executeQuery(p *promise) { } if runErr != nil { - w.finish(p, backend.RunFail, influxdb.ErrRunExecutionError(runErr)) + w.finish(p, influxdb.RunFail, influxdb.ErrRunExecutionError(runErr)) return } if it.Err() != nil { - w.finish(p, backend.RunFail, influxdb.ErrResultIteratorError(it.Err())) + w.finish(p, influxdb.RunFail, influxdb.ErrResultIteratorError(it.Err())) return } - w.finish(p, backend.RunSuccess, nil) + w.finish(p, influxdb.RunSuccess, nil) } // RunsActive returns the current number of workers, which is equivalent to diff --git a/task/backend/executor/executor_metrics.go b/task/backend/executor/executor_metrics.go index 982139a788..46f0096fe6 100644 --- a/task/backend/executor/executor_metrics.go +++ b/task/backend/executor/executor_metrics.go @@ -4,7 +4,6 @@ import ( "time" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/task/backend" "github.com/prometheus/client_golang/prometheus" ) @@ -144,7 +143,7 @@ func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duratio } // FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID. -func (em *ExecutorMetrics) FinishRun(task *influxdb.Task, status backend.RunStatus, runDuration time.Duration) { +func (em *ExecutorMetrics) FinishRun(task *influxdb.Task, status influxdb.RunStatus, runDuration time.Duration) { em.totalRunsComplete.WithLabelValues(task.Type, status.String()).Inc() em.runDuration.WithLabelValues(task.Type, "all").Observe(runDuration.Seconds()) diff --git a/task/backend/middleware/check_middleware.go b/task/backend/middleware/check_middleware.go index 016a252168..3c9198bd12 100644 --- a/task/backend/middleware/check_middleware.go +++ b/task/backend/middleware/check_middleware.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/task/backend" - "github.com/influxdata/influxdb" ) @@ -80,7 +78,7 @@ func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time - if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) { toTask.LatestCompleted = cs.Now() } @@ -111,7 +109,7 @@ func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb. // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time - if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) { toTask.LatestCompleted = cs.Now() } diff --git a/task/backend/middleware/check_middleware_test.go b/task/backend/middleware/check_middleware_test.go index d76e2ee9f4..4f22c65bae 100644 --- a/task/backend/middleware/check_middleware_test.go +++ b/task/backend/middleware/check_middleware_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/task/backend" - "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/mock" "github.com/influxdata/influxdb/notification/check" @@ -192,9 +190,9 @@ func TestCheckUpdateFromInactive(t *testing.T) { mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { if id == 1 { - return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil + return &influxdb.Task{ID: id, Status: string(influxdb.TaskInactive)}, nil } else if id == 10 { - return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil + return &influxdb.Task{ID: id, Status: string(influxdb.TaskActive)}, nil } return &influxdb.Task{ID: id}, nil } diff --git a/task/backend/middleware/middleware_test.go b/task/backend/middleware/middleware_test.go index bca3568a3d..695a7da50c 100644 --- a/task/backend/middleware/middleware_test.go +++ b/task/backend/middleware/middleware_test.go @@ -43,7 +43,7 @@ func inmemTaskService() influxdb.TaskService { id := gen.ID() task := &influxdb.Task{ID: id, Flux: tc.Flux, Cron: "* * * * *", Status: tc.Status, OrganizationID: tc.OrganizationID, Organization: tc.Organization} if task.Status == "" { - task.Status = string(backend.TaskActive) + task.Status = string(influxdb.TaskActive) } tasks[id] = task @@ -152,7 +152,7 @@ func TestCoordinatingTaskService(t *testing.T) { t.Fatal(err) } - inactive := string(backend.TaskInactive) + inactive := string(influxdb.TaskInactive) res, err := middleware.UpdateTask(context.Background(), task.ID, influxdb.TaskUpdate{Status: &inactive}) if err != nil { t.Fatal(err) @@ -172,7 +172,7 @@ func TestCoordinatingTaskService(t *testing.T) { t.Fatal("task sent to scheduler doesnt match task created") } - active := string(backend.TaskActive) + active := string(influxdb.TaskActive) if _, err := middleware.UpdateTask(context.Background(), task.ID, influxdb.TaskUpdate{Status: &active}); err != nil { t.Fatal(err) } diff --git a/task/backend/middleware/notification_middleware.go b/task/backend/middleware/notification_middleware.go index ad5f706d4c..0f16574899 100644 --- a/task/backend/middleware/notification_middleware.go +++ b/task/backend/middleware/notification_middleware.go @@ -5,8 +5,6 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/task/backend" - "github.com/influxdata/influxdb" ) @@ -79,7 +77,7 @@ func (ns *CoordinatingNotificationRuleStore) UpdateNotificationRule(ctx context. } // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time - if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) { toTask.LatestCompleted = ns.Now() } @@ -110,7 +108,7 @@ func (ns *CoordinatingNotificationRuleStore) PatchNotificationRule(ctx context.C // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time - if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { + if fromTask.Status == string(influxdb.TaskInactive) && toTask.Status == string(influxdb.TaskActive) { toTask.LatestCompleted = ns.Now() } diff --git a/task/backend/middleware/notification_middleware_test.go b/task/backend/middleware/notification_middleware_test.go index a654ca8e76..aff28630f5 100644 --- a/task/backend/middleware/notification_middleware_test.go +++ b/task/backend/middleware/notification_middleware_test.go @@ -6,8 +6,6 @@ import ( "testing" "time" - "github.com/influxdata/influxdb/task/backend" - "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/notification/rule" "github.com/influxdata/influxdb/task/backend/middleware" @@ -83,9 +81,9 @@ func TestNotificationRuleUpdateFromInactive(t *testing.T) { mocks.taskSvc.FindTaskByIDFn = func(_ context.Context, id influxdb.ID) (*influxdb.Task, error) { if id == 1 { - return &influxdb.Task{ID: id, Status: string(backend.TaskInactive)}, nil + return &influxdb.Task{ID: id, Status: string(influxdb.TaskInactive)}, nil } else if id == 10 { - return &influxdb.Task{ID: id, Status: string(backend.TaskActive)}, nil + return &influxdb.Task{ID: id, Status: string(influxdb.TaskActive)}, nil } return &influxdb.Task{ID: id}, nil } diff --git a/task/backend/task.go b/task/backend/task.go index 11770ff466..f99c2df686 100644 --- a/task/backend/task.go +++ b/task/backend/task.go @@ -2,7 +2,6 @@ package backend import ( "context" - "fmt" "time" "github.com/influxdata/influxdb" @@ -25,80 +24,8 @@ type TaskControlService interface { FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) // UpdateRunState sets the run state at the respective time. - UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state RunStatus) error + UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error // AddRunLog adds a log line to the run. AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error } - -type TaskStatus string - -const ( - TaskActive TaskStatus = "active" - TaskInactive TaskStatus = "inactive" - - DefaultTaskStatus TaskStatus = TaskActive -) - -type RunStatus int - -const ( - RunStarted RunStatus = iota - RunSuccess - RunFail - RunCanceled - RunScheduled -) - -func (r RunStatus) String() string { - switch r { - case RunStarted: - return "started" - case RunSuccess: - return "success" - case RunFail: - return "failed" - case RunCanceled: - return "canceled" - case RunScheduled: - return "scheduled" - } - panic(fmt.Sprintf("unknown RunStatus: %d", r)) -} - -// RequestStillQueuedError is returned when attempting to retry a run which has not yet completed. -type RequestStillQueuedError struct { - // Unix timestamps matching existing request's start and end. - Start, End int64 -} - -const fmtRequestStillQueued = "previous retry for start=%s end=%s has not yet finished" - -func (e RequestStillQueuedError) Error() string { - return fmt.Sprintf(fmtRequestStillQueued, - time.Unix(e.Start, 0).UTC().Format(time.RFC3339), - time.Unix(e.End, 0).UTC().Format(time.RFC3339), - ) -} - -// ParseRequestStillQueuedError attempts to parse a RequestStillQueuedError from msg. -// If msg is formatted correctly, the resultant error is returned; otherwise it returns nil. -func ParseRequestStillQueuedError(msg string) *RequestStillQueuedError { - var s, e string - n, err := fmt.Sscanf(msg, fmtRequestStillQueued, &s, &e) - if err != nil || n != 2 { - return nil - } - - start, err := time.Parse(time.RFC3339, s) - if err != nil { - return nil - } - - end, err := time.Parse(time.RFC3339, e) - if err != nil { - return nil - } - - return &RequestStillQueuedError{Start: start.Unix(), End: end.Unix()} -} diff --git a/task/mock/task_control_service.go b/task/mock/task_control_service.go index eee57095c4..348d93b8bf 100644 --- a/task/mock/task_control_service.go +++ b/task/mock/task_control_service.go @@ -130,7 +130,7 @@ func (t *TaskControlService) ManualRuns(ctx context.Context, taskID influxdb.ID) } // UpdateRunState sets the run state at the respective time. -func (d *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error { +func (d *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state influxdb.RunStatus) error { d.mu.Lock() defer d.mu.Unlock() @@ -139,11 +139,11 @@ func (d *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID i panic("run state called without a run") } switch state { - case backend.RunStarted: + case influxdb.RunStarted: run.StartedAt = when - case backend.RunSuccess, backend.RunFail, backend.RunCanceled: + case influxdb.RunSuccess, influxdb.RunFail, influxdb.RunCanceled: run.FinishedAt = when - case backend.RunScheduled: + case influxdb.RunScheduled: // nothing default: panic("invalid status") diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index 38eb48a2e2..a0f2f01cba 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -276,7 +276,7 @@ func testTaskCRUD(t *testing.T, sys *System) { Name: "task #0", Cron: "* * * * *", Offset: 5 * time.Second, - Status: string(backend.DefaultTaskStatus), + Status: string(influxdb.DefaultTaskStatus), Flux: fmt.Sprintf(scriptFmt, 0), Type: influxdb.TaskSystemType, } @@ -292,7 +292,7 @@ func testTaskCRUD(t *testing.T, sys *System) { OrganizationID: cr.OrgID, Flux: fmt.Sprintf(scriptFmt, 1), OwnerID: cr.UserID, - Status: string(backend.TaskInactive), + Status: string(influxdb.TaskInactive), } if _, err := sys.TaskService.CreateTask(authorizedCtx, tc2); err != nil { @@ -327,24 +327,24 @@ func testTaskCRUD(t *testing.T, sys *System) { } // Check task status filter - active := string(backend.TaskActive) + active := string(influxdb.TaskActive) fs, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{Status: &active}) if err != nil { t.Fatal(err) } - activeTasks := findTasksByStatus(fs, string(backend.TaskActive)) + activeTasks := findTasksByStatus(fs, string(influxdb.TaskActive)) if len(fs) != len(activeTasks) { t.Fatalf("expected to find %d active tasks, found: %d", len(activeTasks), len(fs)) } - inactive := string(backend.TaskInactive) + inactive := string(influxdb.TaskInactive) fs, _, err = sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{Status: &inactive}) if err != nil { t.Fatal(err) } - inactiveTasks := findTasksByStatus(fs, string(backend.TaskInactive)) + inactiveTasks := findTasksByStatus(fs, string(influxdb.TaskInactive)) if len(fs) != len(inactiveTasks) { t.Fatalf("expected to find %d inactive tasks, found: %d", len(inactiveTasks), len(fs)) } @@ -363,12 +363,12 @@ func testTaskCRUD(t *testing.T, sys *System) { if f.Flux != newFlux { t.Fatalf("wrong flux from update; want %q, got %q", newFlux, f.Flux) } - if f.Status != string(backend.TaskActive) { + if f.Status != string(influxdb.TaskActive) { t.Fatalf("expected task to be created active, got %q", f.Status) } // Update task: status only. - newStatus := string(backend.TaskInactive) + newStatus := string(influxdb.TaskInactive) f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Status: &newStatus}) if err != nil { t.Fatal(err) @@ -381,7 +381,7 @@ func testTaskCRUD(t *testing.T, sys *System) { } // Update task: reactivate status and update script. - newStatus = string(backend.TaskActive) + newStatus = string(influxdb.TaskActive) newFlux = fmt.Sprintf(scriptFmt, 98) f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Flux: &newFlux, Status: &newStatus}) if err != nil { @@ -395,7 +395,7 @@ func testTaskCRUD(t *testing.T, sys *System) { } // Update task: just update an option. - newStatus = string(backend.TaskActive) + newStatus = string(influxdb.TaskActive) newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tcron: \"* * * * *\",\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")" f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Name: "task-changed #98"}}) if err != nil { @@ -410,7 +410,7 @@ func testTaskCRUD(t *testing.T, sys *System) { } // Update task: switch to every. - newStatus = string(backend.TaskActive) + newStatus = string(influxdb.TaskActive) newFlux = "option task = {\n\tname: \"task-changed #98\",\n\tevery: 30s,\n\toffset: 5s,\n\tconcurrency: 100,\n}\n\nfrom(bucket: \"b\")\n\t|> to(bucket: \"two\", orgID: \"000000000000000\")" f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Every: *(options.MustParseDuration("30s"))}}) if err != nil { @@ -425,7 +425,7 @@ func testTaskCRUD(t *testing.T, sys *System) { } // Update task: just cron. - newStatus = string(backend.TaskActive) + newStatus = string(influxdb.TaskActive) newFlux = fmt.Sprintf(scriptDifferentName, 98) f, err = sys.TaskService.UpdateTask(authorizedCtx, origID, influxdb.TaskUpdate{Options: options.Options{Cron: "* * * * *"}}) if err != nil { @@ -748,11 +748,11 @@ func testUpdate(t *testing.T, sys *System) { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), influxdb.RunStarted); err != nil { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), backend.RunSuccess); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, time.Now(), influxdb.RunSuccess); err != nil { t.Fatal(err) } @@ -782,7 +782,7 @@ func testUpdate(t *testing.T, sys *System) { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), influxdb.RunStarted); err != nil { t.Fatal(err) } @@ -790,7 +790,7 @@ func testUpdate(t *testing.T, sys *System) { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), backend.RunFail); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), influxdb.RunFail); err != nil { t.Fatal(err) } @@ -905,7 +905,7 @@ func testTaskRuns(t *testing.T, sys *System) { startedAt := time.Now().UTC() // Update the run state to Started; normally the scheduler would do this. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, influxdb.RunStarted); err != nil { t.Fatal(err) } @@ -918,7 +918,7 @@ func testTaskRuns(t *testing.T, sys *System) { } // Update the run state to Started; normally the scheduler would do this. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, influxdb.RunStarted); err != nil { t.Fatal(err) } @@ -932,7 +932,7 @@ func testTaskRuns(t *testing.T, sys *System) { } // Mark the second run finished. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), backend.RunSuccess); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), influxdb.RunSuccess); err != nil { t.Fatal(err) } @@ -954,8 +954,8 @@ func testTaskRuns(t *testing.T, sys *System) { if runs[0].StartedAt != startedAt { t.Fatalf("unexpectedStartedAt; want %s, got %s", startedAt, runs[0].StartedAt) } - if runs[0].Status != backend.RunStarted.String() { - t.Fatalf("unexpected run status; want %s, got %s", backend.RunStarted.String(), runs[0].Status) + if runs[0].Status != influxdb.RunStarted.String() { + t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunStarted.String(), runs[0].Status) } if !runs[0].FinishedAt.IsZero() { @@ -1035,7 +1035,7 @@ func testTaskRuns(t *testing.T, sys *System) { if err != nil { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, time.Now(), backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, time.Now(), influxdb.RunStarted); err != nil { t.Fatal(err) } @@ -1043,7 +1043,7 @@ func testTaskRuns(t *testing.T, sys *System) { if err != nil { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, time.Now(), influxdb.RunStarted); err != nil { t.Fatal(err) } // Add a log for the first run. @@ -1350,7 +1350,7 @@ func testRunStorage(t *testing.T, sys *System) { startedAt := time.Now().UTC().Add(time.Second * -10) // Update the run state to Started; normally the scheduler would do this. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, influxdb.RunStarted); err != nil { t.Fatal(err) } @@ -1363,12 +1363,12 @@ func testRunStorage(t *testing.T, sys *System) { } // Update the run state to Started; normally the scheduler would do this. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), influxdb.RunStarted); err != nil { t.Fatal(err) } // Mark the second run finished. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second*2), backend.RunFail); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second*2), influxdb.RunFail); err != nil { t.Fatal(err) } @@ -1390,8 +1390,8 @@ func testRunStorage(t *testing.T, sys *System) { if runs[0].StartedAt != startedAt { t.Fatalf("unexpectedStartedAt; want %s, got %s", startedAt, runs[0].StartedAt) } - if runs[0].Status != backend.RunStarted.String() { - t.Fatalf("unexpected run status; want %s, got %s", backend.RunStarted.String(), runs[0].Status) + if runs[0].Status != influxdb.RunStarted.String() { + t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunStarted.String(), runs[0].Status) } if !runs[0].FinishedAt.IsZero() { @@ -1403,11 +1403,11 @@ func testRunStorage(t *testing.T, sys *System) { if err != nil { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*3), backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*3), influxdb.RunStarted); err != nil { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*4), backend.RunSuccess); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.ID, startedAt.Add(time.Second*4), influxdb.RunSuccess); err != nil { t.Fatal(err) } if _, err := sys.TaskControlService.FinishRun(sys.Ctx, task.ID, rc2.ID); err != nil { @@ -1440,8 +1440,8 @@ func testRunStorage(t *testing.T, sys *System) { if runs[0].StartedAt != startedAt { t.Fatalf("unexpectedStartedAt; want %s, got %s", startedAt, runs[0].StartedAt) } - if runs[0].Status != backend.RunStarted.String() { - t.Fatalf("unexpected run status; want %s, got %s", backend.RunStarted.String(), runs[0].Status) + if runs[0].Status != influxdb.RunStarted.String() { + t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunStarted.String(), runs[0].Status) } // TODO (al): handle empty finishedAt // if runs[0].FinishedAt != "" { @@ -1455,8 +1455,8 @@ func testRunStorage(t *testing.T, sys *System) { if exp := startedAt.Add(time.Second); runs[2].StartedAt != exp { t.Fatalf("unexpected StartedAt; want %s, got %s", exp, runs[2].StartedAt) } - if runs[2].Status != backend.RunFail.String() { - t.Fatalf("unexpected run status; want %s, got %s", backend.RunSuccess.String(), runs[2].Status) + if runs[2].Status != influxdb.RunFail.String() { + t.Fatalf("unexpected run status; want %s, got %s", influxdb.RunSuccess.String(), runs[2].Status) } if exp := startedAt.Add(time.Second * 2); runs[2].FinishedAt != exp { t.Fatalf("unexpected FinishedAt; want %s, got %s", exp, runs[2].FinishedAt) @@ -1525,10 +1525,10 @@ func testRetryAcrossStorage(t *testing.T, sys *System) { startedAt := time.Now().UTC() // Update the run state to Started then Failed; normally the scheduler would do this. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt, backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt, influxdb.RunStarted); err != nil { t.Fatal(err) } - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt.Add(time.Second), backend.RunFail); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc.ID, startedAt.Add(time.Second), influxdb.RunFail); err != nil { t.Fatal(err) } if _, err := sys.TaskControlService.FinishRun(sys.Ctx, task.ID, rc.ID); err != nil { @@ -1551,7 +1551,7 @@ func testRetryAcrossStorage(t *testing.T, sys *System) { t.Fatalf("wrong scheduledFor on task: got %s, want %s", m.ScheduledFor, rc.ScheduledFor) } - exp := backend.RequestStillQueuedError{Start: rc.ScheduledFor.Unix(), End: rc.ScheduledFor.Unix()} + exp := influxdb.RequestStillQueuedError{Start: rc.ScheduledFor.Unix(), End: rc.ScheduledFor.Unix()} // Retrying a run which has been queued but not started, should be rejected. if _, err = sys.TaskService.RetryRun(sys.Ctx, task.ID, rc.ID); err != exp && err.Error() != "run already queued" { @@ -1587,7 +1587,7 @@ func testLogsAcrossStorage(t *testing.T, sys *System) { startedAt := time.Now().UTC() // Update the run state to Started; normally the scheduler would do this. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.ID, startedAt, influxdb.RunStarted); err != nil { t.Fatal(err) } @@ -1600,12 +1600,12 @@ func testLogsAcrossStorage(t *testing.T, sys *System) { } // Update the run state to Started; normally the scheduler would do this. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, backend.RunStarted); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt, influxdb.RunStarted); err != nil { t.Fatal(err) } // Mark the second run finished. - if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), backend.RunSuccess); err != nil { + if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.ID, startedAt.Add(time.Second), influxdb.RunSuccess); err != nil { t.Fatal(err) } diff --git a/task_test.go b/task_test.go index c0353c0531..a42f773df2 100644 --- a/task_test.go +++ b/task_test.go @@ -156,3 +156,12 @@ from(bucket: "x") }) } + +func TestParseRequestStillQueuedError(t *testing.T) { + e := platform.RequestStillQueuedError{Start: 1000, End: 2000} + validMsg := e.Error() + + if err := platform.ParseRequestStillQueuedError(validMsg); err == nil || *err != e { + t.Fatalf("%q should have parsed to %v, but got %v", validMsg, e, err) + } +}