feat(task): Add a latestScheduled field for tasks (#15785)

Previously we overwrote the tasks existing latestCompleted to be used for latestCompleted as well as latestScheduled.
For obvious reasons this is confusing and missleading. I believe by seperating the two fields we can have a clear seperation
of concerns.
pull/15790/head
Lyon Hill 2019-11-06 15:10:52 -07:00 committed by GitHub
parent a0b17a690b
commit bb6aa1df3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 121 additions and 245 deletions

View File

@ -121,14 +121,13 @@ func taskCreateF(cmd *cobra.Command, args []string) error {
"Cron",
)
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"AuthorizationID": t.AuthorizationID.String(),
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
})
w.Flush()
@ -193,7 +192,7 @@ func taskFindF(cmd *cobra.Command, args []string) error {
}
filter.Limit = taskFindFlags.limit
var tasks []*platform.Task
var tasks []http.Task
var err error
if taskFindFlags.id != "" {
@ -207,7 +206,7 @@ func taskFindF(cmd *cobra.Command, args []string) error {
return err
}
tasks = append(tasks, task)
tasks = append(tasks, *task)
} else {
tasks, _, err = s.FindTasks(context.Background(), filter)
if err != nil {
@ -228,14 +227,13 @@ func taskFindF(cmd *cobra.Command, args []string) error {
)
for _, t := range tasks {
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"AuthorizationID": t.AuthorizationID.String(),
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
})
}
w.Flush()
@ -306,14 +304,13 @@ func taskUpdateF(cmd *cobra.Command, args []string) error {
"Cron",
)
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"AuthorizationID": t.AuthorizationID.String(),
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
})
w.Flush()
@ -374,14 +371,13 @@ func taskDeleteF(cmd *cobra.Command, args []string) error {
"Cron",
)
w.Write(map[string]interface{}{
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"AuthorizationID": t.AuthorizationID.String(),
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
"ID": t.ID.String(),
"Name": t.Name,
"OrganizationID": t.OrganizationID.String(),
"Organization": t.Organization,
"Status": t.Status,
"Every": t.Every,
"Cron": t.Cron,
})
w.Flush()

View File

@ -152,10 +152,53 @@ func NewTaskHandler(b *TaskBackend) *TaskHandler {
return h
}
type Task struct {
ID influxdb.ID `json:"id"`
OrganizationID influxdb.ID `json:"orgID"`
Organization string `json:"org"`
OwnerID influxdb.ID `json:"ownerID"`
Name string `json:"name"`
Description string `json:"description,omitempty"`
Status string `json:"status"`
Flux string `json:"flux"`
Every string `json:"every,omitempty"`
Cron string `json:"cron,omitempty"`
Offset string `json:"offset,omitempty"`
LatestCompleted string `json:"latestCompleted,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
UpdatedAt string `json:"updatedAt,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
type taskResponse struct {
Links map[string]string `json:"links"`
Labels []influxdb.Label `json:"labels"`
influxdb.Task
Task
}
// NewFrontEndTask converts a internal task type to a task that we want to display to users
func NewFrontEndTask(t influxdb.Task) Task {
return Task{
ID: t.ID,
OrganizationID: t.OrganizationID,
Organization: t.Organization,
OwnerID: t.OwnerID,
Name: t.Name,
Description: t.Description,
Status: t.Status,
Flux: t.Flux,
Every: t.Every,
Cron: t.Cron,
Offset: t.Offset,
LatestCompleted: t.LatestCompleted,
LastRunStatus: t.LastRunStatus,
LastRunError: t.LastRunError,
CreatedAt: t.CreatedAt,
UpdatedAt: t.UpdatedAt,
Metadata: t.Metadata,
}
}
func newTaskResponse(t influxdb.Task, labels []*influxdb.Label) taskResponse {
@ -168,7 +211,7 @@ func newTaskResponse(t influxdb.Task, labels []*influxdb.Label) taskResponse {
"runs": fmt.Sprintf("/api/v2/tasks/%s/runs", t.ID),
"logs": fmt.Sprintf("/api/v2/tasks/%s/logs", t.ID),
},
Task: t,
Task: NewFrontEndTask(t),
Labels: []influxdb.Label{},
}
@ -1310,7 +1353,7 @@ type TaskService struct {
}
// FindTaskByID returns a single task
func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxdb.Task, error) {
func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*Task, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -1352,7 +1395,7 @@ func (t TaskService) FindTaskByID(ctx context.Context, id influxdb.ID) (*influxd
// FindTasks returns a list of tasks that match a filter (limit 100) and the total count
// of matching tasks.
func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]*influxdb.Task, int, error) {
func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter) ([]Task, int, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -1406,15 +1449,15 @@ func (t TaskService) FindTasks(ctx context.Context, filter influxdb.TaskFilter)
return nil, 0, err
}
tasks := make([]*influxdb.Task, len(tr.Tasks))
tasks := make([]Task, len(tr.Tasks))
for i := range tr.Tasks {
tasks[i] = &tr.Tasks[i].Task
tasks[i] = tr.Tasks[i].Task
}
return tasks, len(tasks), nil
}
// CreateTask creates a new task.
func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*influxdb.Task, error) {
func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*Task, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()
@ -1456,7 +1499,7 @@ func (t TaskService) CreateTask(ctx context.Context, tc influxdb.TaskCreate) (*i
}
// UpdateTask updates a single task with changeset.
func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*influxdb.Task, error) {
func (t TaskService) UpdateTask(ctx context.Context, id influxdb.ID, upd influxdb.TaskUpdate) (*Task, error) {
span, _ := tracing.StartSpanFromContext(ctx)
defer span.Finish()

View File

@ -117,22 +117,6 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf
if err := json.Unmarshal(v, t); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
latestCompletedRun, err := s.findLatestCompleted(ctx, tx, t.ID)
if err != nil {
return nil, err
}
if latestCompletedRun != nil {
latestCompleted := latestCompletedRun.ScheduledFor
if t.LatestCompleted != "" {
tlc, err := time.Parse(time.RFC3339, t.LatestCompleted)
if err == nil && latestCompleted.After(tlc) {
t.LatestCompleted = latestCompleted.Format(time.RFC3339)
}
} else {
t.LatestCompleted = latestCompleted.Format(time.RFC3339)
}
}
if t.LatestCompleted == "" {
t.LatestCompleted = t.CreatedAt
@ -492,16 +476,6 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
}
if matchFn == nil || matchFn(t) {
latestCompleted, err := s.findLatestScheduledTimeForTask(ctx, tx, t)
if err != nil {
return nil, 0, err
}
if !latestCompleted.IsZero() {
t.LatestCompleted = latestCompleted.Format(time.RFC3339)
} else {
t.LatestCompleted = t.CreatedAt
}
ts = append(ts, t)
if len(ts) >= filter.Limit {
@ -743,6 +717,13 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf
}
}
if upd.LatestScheduled != nil {
// make sure we only update latest scheduled one way
if upd.LatestScheduled.After(task.LatestScheduled) {
task.LatestScheduled = *upd.LatestScheduled
}
}
if upd.LastRunStatus != nil {
task.LastRunStatus = *upd.LastRunStatus
if *upd.LastRunStatus == "failed" && upd.LastRunError != nil {
@ -1783,31 +1764,6 @@ func (s *Service) addRunLog(ctx context.Context, tx Tx, taskID, runID influxdb.I
return nil
}
func (s *Service) findLatestCompleted(ctx context.Context, tx Tx, id influxdb.ID) (*influxdb.Run, error) {
bucket, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
key, err := taskLatestCompletedKey(id)
if err != nil {
return nil, err
}
bytes, err := bucket.Get(key)
if err != nil {
if err == ErrKeyNotFound {
return nil, nil
}
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
run := &influxdb.Run{}
if err = json.Unmarshal(bytes, run); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
return run, nil
}
func (s *Service) findLatestScheduledTimeForTask(ctx context.Context, tx Tx, task *influxdb.Task) (time.Time, error) {
// Get the latest completed time
@ -1833,19 +1789,6 @@ func (s *Service) findLatestScheduledTimeForTask(ctx context.Context, tx Tx, tas
}
}
// look to see if we have a "latest completed run"
lRun, err := s.findLatestCompleted(ctx, tx, task.ID)
if err != nil {
return time.Time{}, err
}
if lRun != nil {
runTime := lRun.ScheduledFor
if runTime.After(latestCompleted) {
latestCompleted = runTime
}
}
// find out if we have a currently running schedule that is after the latest completed
currentRunning, err := s.currentlyRunning(ctx, tx, task.ID)
if err != nil {

View File

@ -44,6 +44,7 @@ type Task struct {
Cron string `json:"cron,omitempty"`
Offset string `json:"offset,omitempty"`
LatestCompleted string `json:"latestCompleted,omitempty"`
LatestScheduled time.Time `json:"latestScheduled,omitempty"`
LastRunStatus string `json:"lastRunStatus,omitempty"`
LastRunError string `json:"lastRunError,omitempty"`
CreatedAt string `json:"createdAt,omitempty"`
@ -232,13 +233,13 @@ type TaskUpdate struct {
// LatestCompleted us to set latest completed on startup to skip task catchup
LatestCompleted *string `json:"-"`
LatestScheduled *time.Time `json:"-"`
LastRunStatus *string `json:"-"`
LastRunError *string `json:"-"`
Metadata map[string]interface{} `json:"-"` // not to be set through a web request but rather used by a http service using tasks backend.
// Options gets unmarshalled from json as if it was flat, with the same level as Flux and Status.
Options options.Options // when we unmarshal this gets unmarshalled from flat key-values
}
func (t *TaskUpdate) UnmarshalJSON(data []byte) error {

View File

@ -59,8 +59,16 @@ func (t SchedulableTask) Offset() time.Duration {
// LastScheduled parses the task's LatestCompleted value as a Time object
func (t SchedulableTask) LastScheduled() time.Time {
tm, _ := t.LatestCompletedTime()
return tm
if !t.LatestScheduled.IsZero() {
return t.LatestScheduled
}
if t.LatestCompleted != "" {
latestCompleted, _ := t.LatestCompletedTime()
return latestCompleted
}
createdAt, _ := time.Parse(time.RFC3339, t.CreatedAt)
return createdAt
}
func WithLimitOpt(i int) CoordinatorOption {

View File

@ -28,10 +28,8 @@ func NewSchedulableTaskService(ts UpdateTaskService) SchedulableTaskService {
// UpdateLastScheduled uses the task service to store the latest time a task was scheduled to run
func (s SchedulableTaskService) UpdateLastScheduled(ctx context.Context, id scheduler.ID, t time.Time) error {
tm := t.Format(time.RFC3339)
tid := influxdb.ID(id)
_, err := s.UpdateTask(ctx, tid, influxdb.TaskUpdate{
LatestCompleted: &tm,
_, err := s.UpdateTask(ctx, influxdb.ID(id), influxdb.TaskUpdate{
LatestScheduled: &t,
})
if err != nil {

View File

@ -1,47 +0,0 @@
package backend
import (
"context"
"fmt"
"time"
"github.com/influxdata/influxdb"
)
// Checkpointer allows us to restart a service from the last time we executed.
type TaskServiceCheckpointer struct {
ts influxdb.TaskService
}
func NewTaskServiceCheckpointer(ts influxdb.TaskService) *TaskServiceCheckpointer {
return &TaskServiceCheckpointer{
ts: ts,
}
}
// Checkpoint updates a task's LatestCompleted value with the given time
func (c *TaskServiceCheckpointer) Checkpoint(ctx context.Context, id influxdb.ID, t time.Time) error {
s := t.Format(time.RFC3339Nano)
_, err := c.ts.UpdateTask(ctx, id, influxdb.TaskUpdate{
LatestCompleted: &s,
})
if err != nil {
return fmt.Errorf("could not update checkpoint for task: %v", err)
}
return nil
}
// Last retrieves a task by its ID and returns its LatestCompleted value
func (c *TaskServiceCheckpointer) Last(ctx context.Context, id influxdb.ID) (time.Time, error) {
task, err := c.ts.FindTaskByID(ctx, id)
if err != nil {
return time.Time{}, fmt.Errorf("could not fetch task: %v", err)
}
last, err := time.Parse(time.RFC3339Nano, task.LatestCompleted)
if err != nil {
return time.Time{}, fmt.Errorf("internal server error: corrupt LastCompleted format: %v", err)
}
return last, nil
}

View File

@ -1,85 +0,0 @@
package backend_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/mock"
"github.com/influxdata/influxdb/task/backend"
)
func TestCheckpoint(t *testing.T) {
mockService := &mock.TaskService{
UpdateTaskFn: func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) { return nil, nil },
}
cp := backend.NewTaskServiceCheckpointer(mockService)
err := cp.Checkpoint(context.Background(), influxdb.ID(1), time.Now())
if err != nil {
t.Fatal(err)
}
}
func TestCheckpointError(t *testing.T) {
mockService := &mock.TaskService{
UpdateTaskFn: func(context.Context, influxdb.ID, influxdb.TaskUpdate) (*influxdb.Task, error) {
return nil, fmt.Errorf("error!")
},
}
cp := backend.NewTaskServiceCheckpointer(mockService)
err := cp.Checkpoint(context.Background(), influxdb.ID(1), time.Now())
if err == nil {
t.Fatalf("Expected error")
}
}
func TestLast(t *testing.T) {
want := time.Now()
s := want.Format(time.RFC3339Nano)
mockService := &mock.TaskService{
FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) {
return &influxdb.Task{LatestCompleted: s}, nil
},
}
cp := backend.NewTaskServiceCheckpointer(mockService)
got, err := cp.Last(context.Background(), influxdb.ID(1))
if err != nil {
t.Fatal(err)
}
if !got.Equal(want) {
t.Fatalf("wrong time, wanted: %v, got: %v", want, got)
}
}
func TestLastFetchError(t *testing.T) {
mockService := &mock.TaskService{
FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) {
return nil, fmt.Errorf("error 1")
},
}
cp := backend.NewTaskServiceCheckpointer(mockService)
_, err := cp.Last(context.Background(), influxdb.ID(1))
if err == nil {
t.Fatal("expected error")
}
}
func TestLastParseError(t *testing.T) {
mockService := &mock.TaskService{
FindTaskByIDFn: func(context.Context, influxdb.ID) (*influxdb.Task, error) {
return &influxdb.Task{LatestCompleted: "howdy"}, nil
},
}
cp := backend.NewTaskServiceCheckpointer(mockService)
_, err := cp.Last(context.Background(), influxdb.ID(1))
if err == nil {
t.Fatalf("expected error parsing invalid time: %v", err)
}
}

View File

@ -587,6 +587,10 @@ func testUpdate(t *testing.T, sys *System) {
t.Fatal(err)
}
if !task.LatestScheduled.IsZero() {
t.Fatal("expected a zero LatestScheduled on created task")
}
st, err := sys.TaskService.FindTaskByID(sys.Ctx, task.ID)
if err != nil {
t.Fatal(err)
@ -724,6 +728,21 @@ func testUpdate(t *testing.T, sys *System) {
if earliestUA.After(ua) || latestUA.Before(ua) {
t.Fatalf("updatedAt not accurate after pulling new task, expected %s to be between %s and %s", ua, earliestUA, latestUA)
}
ls := time.Now().Round(time.Second) // round to remove monotonic clock
task, err = sys.TaskService.UpdateTask(authorizedCtx, task.ID, influxdb.TaskUpdate{LatestScheduled: &ls})
if err != nil {
t.Fatal(err)
}
st, err = sys.TaskService.FindTaskByID(sys.Ctx, task.ID)
if err != nil {
t.Fatal(err)
}
if !st.LatestScheduled.Equal(ls) {
t.Fatalf("expected latest scheduled to update, expected: %v, got: %v", ls, st.LatestScheduled)
}
}
func testTaskRuns(t *testing.T, sys *System) {