diff --git a/cmd/influx/task.go b/cmd/influx/task.go index 315ddc1c98..5df4a33395 100644 --- a/cmd/influx/task.go +++ b/cmd/influx/task.go @@ -121,14 +121,13 @@ func taskCreateF(cmd *cobra.Command, args []string) error { "Cron", ) w.Write(map[string]interface{}{ - "ID": t.ID.String(), - "Name": t.Name, - "OrganizationID": t.OrganizationID.String(), - "Organization": t.Organization, - "AuthorizationID": t.AuthorizationID.String(), - "Status": t.Status, - "Every": t.Every, - "Cron": t.Cron, + "ID": t.ID.String(), + "Name": t.Name, + "OrganizationID": t.OrganizationID.String(), + "Organization": t.Organization, + "Status": t.Status, + "Every": t.Every, + "Cron": t.Cron, }) w.Flush() @@ -193,7 +192,7 @@ func taskFindF(cmd *cobra.Command, args []string) error { } filter.Limit = taskFindFlags.limit - var tasks []*platform.Task + var tasks []http.Task var err error if taskFindFlags.id != "" { @@ -207,7 +206,7 @@ func taskFindF(cmd *cobra.Command, args []string) error { return err } - tasks = append(tasks, task) + tasks = append(tasks, *task) } else { tasks, _, err = s.FindTasks(context.Background(), filter) if err != nil { @@ -228,14 +227,13 @@ func taskFindF(cmd *cobra.Command, args []string) error { ) for _, t := range tasks { w.Write(map[string]interface{}{ - "ID": t.ID.String(), - "Name": t.Name, - "OrganizationID": t.OrganizationID.String(), - "Organization": t.Organization, - "AuthorizationID": t.AuthorizationID.String(), - "Status": t.Status, - "Every": t.Every, - "Cron": t.Cron, + "ID": t.ID.String(), + "Name": t.Name, + "OrganizationID": t.OrganizationID.String(), + "Organization": t.Organization, + "Status": t.Status, + "Every": t.Every, + "Cron": t.Cron, }) } w.Flush() @@ -306,14 +304,13 @@ func taskUpdateF(cmd *cobra.Command, args []string) error { "Cron", ) w.Write(map[string]interface{}{ - "ID": t.ID.String(), - "Name": t.Name, - "OrganizationID": t.OrganizationID.String(), - "Organization": t.Organization, - "AuthorizationID": t.AuthorizationID.String(), - "Status": t.Status, - "Every": t.Every, - "Cron": t.Cron, + "ID": t.ID.String(), + "Name": t.Name, + "OrganizationID": t.OrganizationID.String(), + "Organization": t.Organization, + "Status": t.Status, + "Every": t.Every, + "Cron": t.Cron, }) w.Flush() @@ -374,14 +371,13 @@ func taskDeleteF(cmd *cobra.Command, args []string) error { "Cron", ) w.Write(map[string]interface{}{ - "ID": t.ID.String(), - "Name": t.Name, - "OrganizationID": t.OrganizationID.String(), - "Organization": t.Organization, - "AuthorizationID": t.AuthorizationID.String(), - "Status": t.Status, - "Every": t.Every, - "Cron": t.Cron, + "ID": t.ID.String(), + "Name": t.Name, + "OrganizationID": t.OrganizationID.String(), + "Organization": t.Organization, + "Status": t.Status, + "Every": t.Every, + "Cron": t.Cron, }) w.Flush() diff --git a/http/task_service.go b/http/task_service.go index d4eb1ffeac..2b441eda25 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -152,10 +152,53 @@ func NewTaskHandler(b *TaskBackend) *TaskHandler { return h } +type Task struct { + ID influxdb.ID `json:"id"` + OrganizationID influxdb.ID `json:"orgID"` + Organization string `json:"org"` + OwnerID influxdb.ID `json:"ownerID"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Status string `json:"status"` + Flux string `json:"flux"` + Every string `json:"every,omitempty"` + Cron string `json:"cron,omitempty"` + Offset string `json:"offset,omitempty"` + LatestCompleted string `json:"latestCompleted,omitempty"` + LastRunStatus string `json:"lastRunStatus,omitempty"` + LastRunError string `json:"lastRunError,omitempty"` + CreatedAt string `json:"createdAt,omitempty"` + UpdatedAt string `json:"updatedAt,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + type taskResponse struct { Links map[string]string `json:"links"` Labels []influxdb.Label `json:"labels"` - influxdb.Task + Task +} + +// NewFrontEndTask converts a internal task type to a task that we want to display to users +func NewFrontEndTask(t influxdb.Task) Task { + return Task{ + ID: t.ID, + OrganizationID: t.OrganizationID, + Organization: t.Organization, + OwnerID: t.OwnerID, + Name: t.Name, + Description: t.Description, + Status: t.Status, + Flux: t.Flux, + Every: t.Every, + Cron: t.Cron, + Offset: t.Offset, + LatestCompleted: t.LatestCompleted, + LastRunStatus: t.LastRunStatus, + LastRunError: t.LastRunError, + CreatedAt: t.CreatedAt, + UpdatedAt: t.UpdatedAt, + Metadata: t.Metadata, + } } func newTaskResponse(t influxdb.Task, labels []*influxdb.Label) taskResponse { @@ -168,7 +211,7 @@ func newTaskResponse(t influxdb.Task, labels []*influxdb.Label) taskResponse { "runs": fmt.Sprintf("/api/v2/tasks/%s/runs", t.ID), "logs": fmt.Sprintf("/api/v2/tasks/%s/logs", t.ID), }, - Task: t, + Task: NewFrontEndTask(t), Labels: []influxdb.Label{}, } @@ -1310,7 +1353,7 @@ type TaskService struct { } // FindTaskByID returns a single task -func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) { +func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1352,7 +1395,7 @@ func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxd // FindTasks returns a list of tasks that match a filter (limit 100) and the total count // of matching tasks. -func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) { +func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]Task, int, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1406,15 +1449,15 @@ func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) return nil, 0, err } - tasks := make([]*influxdb.Task, len(tr.Tasks)) + tasks := make([]Task, len(tr.Tasks)) for i := range tr.Tasks { - tasks[i] = &tr.Tasks[i].Task + tasks[i] = tr.Tasks[i].Task } return tasks, len(tasks), nil } // CreateTask creates a new task. -func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) { +func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -1456,7 +1499,7 @@ func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*i } // UpdateTask updates a single task with changeset. -func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) { +func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*Task, error) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() diff --git a/kv/task.go b/kv/task.go index 3d42e8c6cc..5c3d32850b 100644 --- a/kv/task.go +++ b/kv/task.go @@ -117,22 +117,6 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf if err := json.Unmarshal(v, t); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } - latestCompletedRun, err := s.findLatestCompleted(ctx, tx, t.ID) - if err != nil { - return nil, err - } - if latestCompletedRun != nil { - latestCompleted := latestCompletedRun.ScheduledFor - if t.LatestCompleted != "" { - tlc, err := time.Parse(time.RFC3339, t.LatestCompleted) - if err == nil && latestCompleted.After(tlc) { - t.LatestCompleted = latestCompleted.Format(time.RFC3339) - - } - } else { - t.LatestCompleted = latestCompleted.Format(time.RFC3339) - } - } if t.LatestCompleted == "" { t.LatestCompleted = t.CreatedAt @@ -492,16 +476,6 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF } if matchFn == nil || matchFn(t) { - latestCompleted, err := s.findLatestScheduledTimeForTask(ctx, tx, t) - if err != nil { - return nil, 0, err - } - if !latestCompleted.IsZero() { - t.LatestCompleted = latestCompleted.Format(time.RFC3339) - } else { - t.LatestCompleted = t.CreatedAt - } - ts = append(ts, t) if len(ts) >= filter.Limit { @@ -743,6 +717,13 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf } } + if upd.LatestScheduled != nil { + // make sure we only update latest scheduled one way + if upd.LatestScheduled.After(task.LatestScheduled) { + task.LatestScheduled = *upd.LatestScheduled + } + } + if upd.LastRunStatus != nil { task.LastRunStatus = *upd.LastRunStatus if *upd.LastRunStatus == "failed" && upd.LastRunError != nil { @@ -1783,31 +1764,6 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I return nil } -func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Run, error) { - bucket, err := tx.Bucket(taskRunBucket) - if err != nil { - return nil, influxdb.ErrUnexpectedTaskBucketErr(err) - } - key, err := taskLatestCompletedKey(id) - if err != nil { - return nil, err - } - - bytes, err := bucket.Get(key) - if err != nil { - if err == ErrKeyNotFound { - return nil, nil - } - return nil, influxdb.ErrUnexpectedTaskBucketErr(err) - } - - run := &influxdb.Run{} - if err = json.Unmarshal(bytes, run); err != nil { - return nil, influxdb.ErrInternalTaskServiceError(err) - } - - return run, nil -} func (s *Service) findLatestScheduledTimeForTask(ctx context.Context, tx Tx, task *influxdb.Task) (time.Time, error) { // Get the latest completed time @@ -1833,19 +1789,6 @@ func (s *Service) findLatestScheduledTimeForTask(ctx context.Context, tx Tx, tas } } - // look to see if we have a "latest completed run" - lRun, err := s.findLatestCompleted(ctx, tx, task.ID) - if err != nil { - return time.Time{}, err - } - - if lRun != nil { - runTime := lRun.ScheduledFor - if runTime.After(latestCompleted) { - latestCompleted = runTime - } - } - // find out if we have a currently running schedule that is after the latest completed currentRunning, err := s.currentlyRunning(ctx, tx, task.ID) if err != nil { diff --git a/task.go b/task.go index f60f99418c..c3ce788e2f 100644 --- a/task.go +++ b/task.go @@ -44,6 +44,7 @@ type Task struct { Cron string `json:"cron,omitempty"` Offset string `json:"offset,omitempty"` LatestCompleted string `json:"latestCompleted,omitempty"` + LatestScheduled time.Time `json:"latestScheduled,omitempty"` LastRunStatus string `json:"lastRunStatus,omitempty"` LastRunError string `json:"lastRunError,omitempty"` CreatedAt string `json:"createdAt,omitempty"` @@ -232,13 +233,13 @@ type TaskUpdate struct { // LatestCompleted us to set latest completed on startup to skip task catchup LatestCompleted *string `json:"-"` + LatestScheduled *time.Time `json:"-"` LastRunStatus *string `json:"-"` LastRunError *string `json:"-"` Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend. // Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status. Options options.Options // when we unmarshal this gets unmarshalled from flat key-values - } func (t *TaskUpdate) UnmarshalJSON(data []byte) error { diff --git a/task/backend/coordinator/task_coordinator.go b/task/backend/coordinator/task_coordinator.go index e0ecfebbb9..853b1f10cd 100644 --- a/task/backend/coordinator/task_coordinator.go +++ b/task/backend/coordinator/task_coordinator.go @@ -59,8 +59,16 @@ func (t SchedulableTask) Offset() time.Duration { // LastScheduled parses the task's LatestCompleted value as a Time object func (t SchedulableTask) LastScheduled() time.Time { - tm, _ := t.LatestCompletedTime() - return tm + if !t.LatestScheduled.IsZero() { + return t.LatestScheduled + } + if t.LatestCompleted != "" { + latestCompleted, _ := t.LatestCompletedTime() + return latestCompleted + } + + createdAt, _ := time.Parse(time.RFC3339, t.CreatedAt) + return createdAt } func WithLimitOpt(i int) CoordinatorOption { diff --git a/task/backend/schedulable_task_service.go b/task/backend/schedulable_task_service.go index c0c83a0745..4c6f291830 100644 --- a/task/backend/schedulable_task_service.go +++ b/task/backend/schedulable_task_service.go @@ -28,10 +28,8 @@ func NewSchedulableTaskService(ts UpdateTaskService) SchedulableTaskService { // UpdateLastScheduled uses the task service to store the latest time a task was scheduled to run func (s SchedulableTaskService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error { - tm := t.Format(time.RFC3339) - tid := influxdb.ID(id) - _, err := s.UpdateTask(ctx, tid, influxdb.TaskUpdate{ - LatestCompleted: &tm, + _, err := s.UpdateTask(ctx, influxdb.ID(id), influxdb.TaskUpdate{ + LatestScheduled: &t, }) if err != nil { diff --git a/task/backend/task_service_checkpointer.go b/task/backend/task_service_checkpointer.go deleted file mode 100644 index 7b156d94da..0000000000 --- a/task/backend/task_service_checkpointer.go +++ /dev/null @@ -1,47 +0,0 @@ -package backend - -import ( - "context" - "fmt" - "time" - - "github.com/influxdata/influxdb" -) - -// Checkpointer allows us to restart a service from the last time we executed. -type TaskServiceCheckpointer struct { - ts influxdb.TaskService -} - -func NewTaskServiceCheckpointer(ts influxdb.TaskService) *TaskServiceCheckpointer { - return &TaskServiceCheckpointer{ - ts: ts, - } -} - -// Checkpoint updates a task's LatestCompleted value with the given time -func (c *TaskServiceCheckpointer) Checkpoint(ctx context.Context, id influxdb.ID, t time.Time) error { - s := t.Format(time.RFC3339Nano) - _, err := c.ts.UpdateTask(ctx, id, influxdb.TaskUpdate{ - LatestCompleted: &s, - }) - - if err != nil { - return fmt.Errorf("could not update checkpoint for task: %v", err) - } - return nil -} - -// Last retrieves a task by its ID and returns its LatestCompleted value -func (c *TaskServiceCheckpointer) Last(ctx context.Context, id influxdb.ID) (time.Time, error) { - task, err := c.ts.FindTaskByID(ctx, id) - if err != nil { - return time.Time{}, fmt.Errorf("could not fetch task: %v", err) - } - - last, err := time.Parse(time.RFC3339Nano, task.LatestCompleted) - if err != nil { - return time.Time{}, fmt.Errorf("internal server error: corrupt LastCompleted format: %v", err) - } - return last, nil -} diff --git a/task/backend/task_service_checkpointer_test.go b/task/backend/task_service_checkpointer_test.go deleted file mode 100644 index bce265586b..0000000000 --- a/task/backend/task_service_checkpointer_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package backend_test - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/mock" - "github.com/influxdata/influxdb/task/backend" -) - -func TestCheckpoint(t *testing.T) { - mockService := &mock.TaskService{ - UpdateTaskFn: func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) { return nil, nil }, - } - - cp := backend.NewTaskServiceCheckpointer(mockService) - err := cp.Checkpoint(context.Background(), influxdb.ID(1), time.Now()) - if err != nil { - t.Fatal(err) - } -} - -func TestCheckpointError(t *testing.T) { - mockService := &mock.TaskService{ - UpdateTaskFn: func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) { - return nil, fmt.Errorf("error!") - }, - } - - cp := backend.NewTaskServiceCheckpointer(mockService) - err := cp.Checkpoint(context.Background(), influxdb.ID(1), time.Now()) - if err == nil { - t.Fatalf("Expected error") - } -} - -func TestLast(t *testing.T) { - want := time.Now() - s := want.Format(time.RFC3339Nano) - mockService := &mock.TaskService{ - FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) { - return &influxdb.Task{LatestCompleted: s}, nil - }, - } - - cp := backend.NewTaskServiceCheckpointer(mockService) - got, err := cp.Last(context.Background(), influxdb.ID(1)) - if err != nil { - t.Fatal(err) - } - if !got.Equal(want) { - t.Fatalf("wrong time, wanted: %v, got: %v", want, got) - } -} - -func TestLastFetchError(t *testing.T) { - mockService := &mock.TaskService{ - FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) { - return nil, fmt.Errorf("error 1") - }, - } - - cp := backend.NewTaskServiceCheckpointer(mockService) - _, err := cp.Last(context.Background(), influxdb.ID(1)) - if err == nil { - t.Fatal("expected error") - } -} - -func TestLastParseError(t *testing.T) { - mockService := &mock.TaskService{ - FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) { - return &influxdb.Task{LatestCompleted: "howdy"}, nil - }, - } - - cp := backend.NewTaskServiceCheckpointer(mockService) - _, err := cp.Last(context.Background(), influxdb.ID(1)) - if err == nil { - t.Fatalf("expected error parsing invalid time: %v", err) - } -} diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index 60eeef3ef7..1338b23393 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -587,6 +587,10 @@ func testUpdate(t *testing.T, sys *System) { t.Fatal(err) } + if !task.LatestScheduled.IsZero() { + t.Fatal("expected a zero LatestScheduled on created task") + } + st, err := sys.TaskService.FindTaskByID(sys.Ctx, task.ID) if err != nil { t.Fatal(err) @@ -724,6 +728,21 @@ func testUpdate(t *testing.T, sys *System) { if earliestUA.After(ua) || latestUA.Before(ua) { t.Fatalf("updatedAt not accurate after pulling new task, expected %s to be between %s and %s", ua, earliestUA, latestUA) } + + ls := time.Now().Round(time.Second) // round to remove monotonic clock + task, err = sys.TaskService.UpdateTask(authorizedCtx, task.ID, influxdb.TaskUpdate{LatestScheduled: &ls}) + if err != nil { + t.Fatal(err) + } + + st, err = sys.TaskService.FindTaskByID(sys.Ctx, task.ID) + if err != nil { + t.Fatal(err) + } + if !st.LatestScheduled.Equal(ls) { + t.Fatalf("expected latest scheduled to update, expected: %v, got: %v", ls, st.LatestScheduled) + } + } func testTaskRuns(t *testing.T, sys *System) {