From f0ecc0e89d9e05c87464f33018f340b1e1ca492f Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Tue, 12 Nov 2019 17:13:56 -0800 Subject: [PATCH] refactor(tasks): use go Time for Task CreatedAt, UpdatedAt, LatestCompleted, Offset (#15672) --- crud_log.go | 34 ------ duration.go | 39 +++++++ http/task_service.go | 61 +++++++++- kv/task.go | 108 +++++++++++++----- kv/task_test.go | 2 +- task.go | 27 +---- task/backend/coordinator.go | 4 +- task/backend/coordinator/task_coordinator.go | 17 +-- .../coordinator/task_coordinator_test.go | 2 +- task/backend/coordinator_test.go | 5 +- task/backend/middleware/check_middleware.go | 4 +- .../middleware/check_middleware_test.go | 4 +- task/backend/middleware/middleware.go | 2 +- task/backend/middleware/middleware_test.go | 2 +- .../middleware/notification_middleware.go | 4 +- .../notification_middleware_test.go | 4 +- task/backend/schedulable_task_service_test.go | 7 +- task/backend/scheduler_test.go | 40 ++++--- task/mock/task_control_service.go | 46 +++----- task/servicetest/servicetest.go | 29 ++--- 20 files changed, 255 insertions(+), 186 deletions(-) create mode 100644 duration.go diff --git a/crud_log.go b/crud_log.go index b4574607c2..f77b27bf2b 100644 --- a/crud_log.go +++ b/crud_log.go @@ -1,8 +1,6 @@ package influxdb import ( - "encoding/json" - "errors" "time" ) @@ -41,35 +39,3 @@ type RealTimeGenerator struct{} func (g RealTimeGenerator) Now() time.Time { return time.Now() } - -// Duration is based on time.Duration to embed in any struct. -type Duration struct { - time.Duration -} - -// MarshalJSON implements json.Marshaler interface. -func (d Duration) MarshalJSON() ([]byte, error) { - return json.Marshal(d.String()) -} - -// UnmarshalJSON implements json.Unmarshaler interface. -func (d *Duration) UnmarshalJSON(b []byte) error { - var v interface{} - if err := json.Unmarshal(b, &v); err != nil { - return err - } - switch value := v.(type) { - case float64: - d.Duration = time.Duration(value) - return nil - case string: - var err error - d.Duration, err = time.ParseDuration(value) - if err != nil { - return err - } - return nil - default: - return errors.New("invalid duration") - } -} diff --git a/duration.go b/duration.go new file mode 100644 index 0000000000..de892e3f25 --- /dev/null +++ b/duration.go @@ -0,0 +1,39 @@ +package influxdb + +import ( + "encoding/json" + "errors" + "time" +) + +// Duration is based on time.Duration to embed in any struct. +type Duration struct { + time.Duration +} + +// MarshalJSON implements json.Marshaler interface. +func (d Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(d.String()) +} + +// UnmarshalJSON implements json.Unmarshaler interface. +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + if err := json.Unmarshal(b, &v); err != nil { + return err + } + switch value := v.(type) { + case float64: + d.Duration = time.Duration(value) + return nil + case string: + var err error + d.Duration, err = time.ParseDuration(value) + if err != nil { + return err + } + return nil + default: + return errors.New("invalid duration") + } +} diff --git a/http/task_service.go b/http/task_service.go index 2b441eda25..09fcc7dc3e 100644 --- a/http/task_service.go +++ b/http/task_service.go @@ -180,6 +180,23 @@ type taskResponse struct { // NewFrontEndTask converts a internal task type to a task that we want to display to users func NewFrontEndTask(t influxdb.Task) Task { + latestCompleted := "" + if !t.LatestCompleted.IsZero() { + latestCompleted = t.LatestCompleted.Format(time.RFC3339) + } + createdAt := "" + if !t.CreatedAt.IsZero() { + createdAt = t.CreatedAt.Format(time.RFC3339) + } + updatedAt := "" + if !t.UpdatedAt.IsZero() { + updatedAt = t.UpdatedAt.Format(time.RFC3339) + } + offset := "" + if t.Offset != 0*time.Second { + offset = customParseDuration(t.Offset) + } + return Task{ ID: t.ID, OrganizationID: t.OrganizationID, @@ -191,16 +208,52 @@ func NewFrontEndTask(t influxdb.Task) Task { Flux: t.Flux, Every: t.Every, Cron: t.Cron, - Offset: t.Offset, - LatestCompleted: t.LatestCompleted, + Offset: offset, + LatestCompleted: latestCompleted, LastRunStatus: t.LastRunStatus, LastRunError: t.LastRunError, - CreatedAt: t.CreatedAt, - UpdatedAt: t.UpdatedAt, + CreatedAt: createdAt, + UpdatedAt: updatedAt, Metadata: t.Metadata, } } +func customParseDuration(d time.Duration) string { + str := "" + if d < 0 { + str = "-" + d = d * -1 + } + + // parse hours + hours := d / time.Hour + if hours != 0 { + str = fmt.Sprintf("%s%dh", str, hours) + } + if d%time.Hour == 0 { + return str + } + // parse minutes + d = d - (time.Duration(hours) * time.Hour) + + min := d / time.Minute + if min != 0 { + str = fmt.Sprintf("%s%dm", str, min) + } + if d%time.Minute == 0 { + return str + } + + // parse seconds + d = d - time.Duration(min)*time.Minute + sec := d / time.Second + + if sec != 0 { + str = fmt.Sprintf("%s%ds", str, sec) + } + return str +} + func newTaskResponse(t influxdb.Task, labels []*influxdb.Label) taskResponse { response := taskResponse{ Links: map[string]string{ diff --git a/kv/task.go b/kv/task.go index 5c3d32850b..13822ef8e3 100644 --- a/kv/task.go +++ b/kv/task.go @@ -35,6 +35,52 @@ var ( var _ influxdb.TaskService = (*Service)(nil) var _ backend.TaskControlService = (*Service)(nil) +type kvTask struct { + ID influxdb.ID `json:"id"` + Type string `json:"type,omitempty"` + OrganizationID influxdb.ID `json:"orgID"` + Organization string `json:"org"` + OwnerID influxdb.ID `json:"ownerID"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Status string `json:"status"` + Flux string `json:"flux"` + Every string `json:"every,omitempty"` + Cron string `json:"cron,omitempty"` + LastRunStatus string `json:"lastRunStatus,omitempty"` + LastRunError string `json:"lastRunError,omitempty"` + Offset influxdb.Duration `json:"offset,omitempty"` + LatestCompleted time.Time `json:"latestCompleted,omitempty"` + LatestScheduled time.Time `json:"latestScheduled,omitempty"` + CreatedAt time.Time `json:"createdAt,omitempty"` + UpdatedAt time.Time `json:"updatedAt,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +func kvToInfluxTask(k *kvTask) *influxdb.Task { + return &influxdb.Task{ + ID: k.ID, + Type: k.Type, + OrganizationID: k.OrganizationID, + Organization: k.Organization, + OwnerID: k.OwnerID, + Name: k.Name, + Description: k.Description, + Status: k.Status, + Flux: k.Flux, + Every: k.Every, + Cron: k.Cron, + LastRunStatus: k.LastRunStatus, + LastRunError: k.LastRunError, + Offset: k.Offset.Duration, + LatestCompleted: k.LatestCompleted, + LatestScheduled: k.LatestScheduled, + CreatedAt: k.CreatedAt, + UpdatedAt: k.UpdatedAt, + Metadata: k.Metadata, + } +} + func (s *Service) initializeTasks(ctx context.Context, tx Tx) error { if _, err := tx.Bucket(taskBucket); err != nil { return err @@ -113,12 +159,14 @@ func (s *Service) findTaskByID(ctx context.Context, tx Tx, id influxdb.ID) (*inf if err != nil { return nil, err } - t := &influxdb.Task{} - if err := json.Unmarshal(v, t); err != nil { + kvTask := &kvTask{} + if err := json.Unmarshal(v, kvTask); err != nil { return nil, influxdb.ErrInternalTaskServiceError(err) } - if t.LatestCompleted == "" { + t := kvToInfluxTask(kvTask) + + if t.LatestCompleted.IsZero() { t.LatestCompleted = t.CreatedAt } @@ -470,11 +518,13 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF matchFn := newTaskMatchFn(filter, nil) for k != nil { - t := &influxdb.Task{} - if err := json.Unmarshal(v, t); err != nil { + kvTask := &kvTask{} + if err := json.Unmarshal(v, kvTask); err != nil { return nil, 0, influxdb.ErrInternalTaskServiceError(err) } + t := kvToInfluxTask(kvTask) + if matchFn == nil || matchFn(t) { ts = append(ts, t) @@ -553,7 +603,7 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) tc.Status = string(backend.TaskActive) } - createdAt := time.Now().UTC().Format(time.RFC3339) + createdAt := time.Now().Truncate(time.Second).UTC() task := &influxdb.Task{ ID: s.IDGenerator.ID(), Type: tc.Type, @@ -570,8 +620,14 @@ func (s *Service) createTask(ctx context.Context, tx Tx, tc influxdb.TaskCreate) CreatedAt: createdAt, LatestCompleted: createdAt, } + if opt.Offset != nil { - task.Offset = opt.Offset.String() + off, err := time.ParseDuration(opt.Offset.String()) + if err != nil { + return nil, influxdb.ErrTaskTimeParse(err) + } + task.Offset = off + } taskBucket, err := tx.Bucket(taskBucket) @@ -666,7 +722,7 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf return nil, err } - updatedAt := time.Now().UTC().Format(time.RFC3339) + updatedAt := time.Now().UTC() // update the flux script if !upd.Options.IsZero() || upd.Flux != nil { @@ -682,11 +738,15 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf task.Name = options.Name task.Every = options.Every.String() task.Cron = options.Cron - if options.Offset == nil { - task.Offset = "" - } else { - task.Offset = options.Offset.String() + + var off time.Duration + if options.Offset != nil { + off, err = time.ParseDuration(options.Offset.String()) + if err != nil { + return nil, influxdb.ErrTaskTimeParse(err) + } } + task.Offset = off task.UpdatedAt = updatedAt } @@ -709,8 +769,8 @@ func (s *Service) updateTask(ctx context.Context, tx Tx, id influxdb.ID, upd inf if upd.LatestCompleted != nil { // make sure we only update latest completed one way - tlc, _ := time.Parse(time.RFC3339, task.LatestCompleted) - ulc, _ := time.Parse(time.RFC3339, *upd.LatestCompleted) + tlc := task.LatestCompleted + ulc := *upd.LatestCompleted if !ulc.IsZero() && ulc.After(tlc) { task.LatestCompleted = *upd.LatestCompleted @@ -1298,7 +1358,7 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, nextScheduled := sch.Next(time.Unix(scheduledFor, 0)).UTC() offset := &options.Duration{} - if err := offset.Parse(task.Offset); err != nil { + if err := offset.Parse(task.Offset.String()); err != nil { return backend.RunCreation{}, influxdb.ErrTaskTimeParse(err) } nextDueAt, err := offset.Add(nextScheduled) @@ -1558,9 +1618,9 @@ func (s *Service) finishRun(ctx context.Context, tx Tx, taskID, runID influxdb.I } // tell task to update latest completed - scheduledStr := r.ScheduledFor.Format(time.RFC3339) + scheduled := r.ScheduledFor _, err = s.updateTask(ctx, tx, taskID, influxdb.TaskUpdate{ - LatestCompleted: &scheduledStr, + LatestCompleted: &scheduled, LastRunStatus: &r.Status, LastRunError: func() *string { if r.Status == "failed" { @@ -1659,7 +1719,7 @@ func (s *Service) nextDueRun(ctx context.Context, tx Tx, taskID influxdb.ID) (in nextScheduled := sch.Next(latestCompleted).UTC() offset := &options.Duration{} - if err := offset.Parse(task.Offset); err != nil { + if err := offset.Parse(task.Offset.String()); err != nil { return 0, 0, influxdb.ErrTaskTimeParse(err) } dueAt, err := offset.Add(nextScheduled) @@ -1777,16 +1837,10 @@ func (s *Service) findLatestScheduledTimeForTask(ctx context.Context, tx Tx, tas err error ) - if task.LatestCompleted == "" { - latestCompleted, err = time.Parse(time.RFC3339, task.CreatedAt) - if err != nil { - return time.Time{}, influxdb.ErrTaskTimeParse(err) - } + if task.LatestCompleted.IsZero() { + latestCompleted = task.CreatedAt } else { - latestCompleted, err = time.Parse(time.RFC3339, task.LatestCompleted) - if err != nil { - return time.Time{}, influxdb.ErrTaskTimeParse(err) - } + latestCompleted = task.LatestCompleted } // find out if we have a currently running schedule that is after the latest completed diff --git a/kv/task_test.go b/kv/task_test.go index 02df90fa90..e9e8ea313c 100644 --- a/kv/task_test.go +++ b/kv/task_test.go @@ -159,7 +159,7 @@ func TestNextRunDue(t *testing.T) { // +20 to account for the 20 second offset in the flux script oldNextDue := run.Created.Now - if task.Offset != "" { + if task.Offset != 0 { oldNextDue += 20 } if oldNextDue != nd { diff --git a/task.go b/task.go index 4f5de24482..082de2250c 100644 --- a/task.go +++ b/task.go @@ -42,13 +42,13 @@ type Task struct { Flux string `json:"flux"` Every string `json:"every,omitempty"` Cron string `json:"cron,omitempty"` - Offset string `json:"offset,omitempty"` - LatestCompleted string `json:"latestCompleted,omitempty"` + Offset time.Duration `json:"offset,omitempty"` + LatestCompleted time.Time `json:"latestCompleted,omitempty"` LatestScheduled time.Time `json:"latestScheduled,omitempty"` LastRunStatus string `json:"lastRunStatus,omitempty"` LastRunError string `json:"lastRunError,omitempty"` - CreatedAt string `json:"createdAt,omitempty"` - UpdatedAt string `json:"updatedAt,omitempty"` + CreatedAt time.Time `json:"createdAt,omitempty"` + UpdatedAt time.Time `json:"updatedAt,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` } @@ -67,23 +67,6 @@ func (t *Task) EffectiveCron() string { return "" } -// LatestCompletedTime gives the time.Time that the task was last queued to be run in RFC3339 format. -func (t *Task) LatestCompletedTime() (time.Time, error) { - tm := t.LatestCompleted - if tm == "" { - tm = t.CreatedAt - } - return time.Parse(time.RFC3339, tm) -} - -// OffsetDuration gives the time.Duration of the Task's Offset property, which represents a delay before execution -func (t *Task) OffsetDuration() (time.Duration, error) { - if t.Offset == "" { - return time.Duration(0), nil - } - return time.ParseDuration(t.Offset) -} - // Run is a record createId when a run of a task is scheduled. type Run struct { ID ID `json:"id,omitempty"` @@ -177,7 +160,7 @@ type TaskUpdate struct { Description *string `json:"description,omitempty"` // LatestCompleted us to set latest completed on startup to skip task catchup - LatestCompleted *string `json:"-"` + LatestCompleted *time.Time `json:"-"` LatestScheduled *time.Time `json:"-"` LastRunStatus *string `json:"-"` LastRunError *string `json:"-"` diff --git a/task/backend/coordinator.go b/task/backend/coordinator.go index c505ab7766..e7a01e396c 100644 --- a/task/backend/coordinator.go +++ b/task/backend/coordinator.go @@ -33,7 +33,7 @@ func NotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, coord Coor return err } - latestCompleted := now().Format(time.RFC3339) + latestCompleted := now() for len(tasks) > 0 { for _, task := range tasks { if task.Status != string(TaskActive) { @@ -74,7 +74,7 @@ func TaskNotifyCoordinatorOfExisting(ctx context.Context, ts TaskService, tcs Ta return err } - latestCompleted := now().Format(time.RFC3339) + latestCompleted := now() for len(tasks) > 0 { for _, task := range tasks { if task.Status != string(TaskActive) { diff --git a/task/backend/coordinator/task_coordinator.go b/task/backend/coordinator/task_coordinator.go index 1402a0409e..83217cfe74 100644 --- a/task/backend/coordinator/task_coordinator.go +++ b/task/backend/coordinator/task_coordinator.go @@ -53,8 +53,7 @@ func (t SchedulableTask) Schedule() scheduler.Schedule { // Offset returns a time.Duration for the Task's offset property func (t SchedulableTask) Offset() time.Duration { - offset, _ := t.OffsetDuration() - return offset + return t.Task.Offset } // LastScheduled parses the task's LatestCompleted value as a Time object @@ -62,13 +61,11 @@ func (t SchedulableTask) LastScheduled() time.Time { if !t.LatestScheduled.IsZero() { return t.LatestScheduled } - if t.LatestCompleted != "" { - latestCompleted, _ := t.LatestCompletedTime() - return latestCompleted + if !t.LatestCompleted.IsZero() { + return t.LatestCompleted } - createdAt, _ := time.Parse(time.RFC3339, t.CreatedAt) - return createdAt + return t.CreatedAt } func WithLimitOpt(i int) CoordinatorOption { @@ -79,13 +76,7 @@ func WithLimitOpt(i int) CoordinatorOption { // NewSchedulableTask transforms an influxdb task to a schedulable task type func NewSchedulableTask(task *influxdb.Task) (SchedulableTask, error) { - if offset, err := task.OffsetDuration(); offset != time.Duration(0) && err != nil { - return SchedulableTask{}, errors.New("could not create schedulable task: offset duration could not be parsed") - } - if _, err := task.LatestCompletedTime(); err != nil { - return SchedulableTask{}, errors.New("could not create schedulable task: latest completed time could not be parsed") - } if task.Cron == "" && task.Every == "" { return SchedulableTask{}, errors.New("invalid cron or every") } diff --git a/task/backend/coordinator/task_coordinator_test.go b/task/backend/coordinator/task_coordinator_test.go index 07afb0d009..a4d219572e 100644 --- a/task/backend/coordinator/task_coordinator_test.go +++ b/task/backend/coordinator/task_coordinator_test.go @@ -100,7 +100,7 @@ func Test_Coordinator_Scheduler_Methods(t *testing.T) { one = influxdb.ID(1) two = influxdb.ID(2) three = influxdb.ID(3) - now = time.Now().Format(time.RFC3339Nano) + now = time.Now().UTC() taskOne = &influxdb.Task{ID: one, CreatedAt: now, Cron: "* * * * *"} taskTwo = &influxdb.Task{ID: two, Status: "active", CreatedAt: now, Cron: "* * * * *"} diff --git a/task/backend/coordinator_test.go b/task/backend/coordinator_test.go index 6c9d4c8e99..b564c7dea9 100644 --- a/task/backend/coordinator_test.go +++ b/task/backend/coordinator_test.go @@ -16,8 +16,7 @@ var ( three = influxdb.ID(3) four = influxdb.ID(4) - aTime = time.Now() - aTimeStamp = aTime.Format(time.RFC3339) + aTime = time.Now().UTC() taskOne = &influxdb.Task{ID: one} taskTwo = &influxdb.Task{ID: two, Status: "active"} @@ -56,7 +55,7 @@ func Test_NotifyCoordinatorOfCreated(t *testing.T) { } if diff := cmp.Diff([]update{ - {two, influxdb.TaskUpdate{LatestCompleted: &aTimeStamp}}, + {two, influxdb.TaskUpdate{LatestCompleted: &aTime}}, }, tasks.updates); diff != "" { t.Errorf("unexpected updates to task service %v", diff) } diff --git a/task/backend/middleware/check_middleware.go b/task/backend/middleware/check_middleware.go index 7b13bc1f13..016a252168 100644 --- a/task/backend/middleware/check_middleware.go +++ b/task/backend/middleware/check_middleware.go @@ -81,7 +81,7 @@ func (cs *CoordinatingCheckService) UpdateCheck(ctx context.Context, id influxdb // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { - toTask.LatestCompleted = cs.Now().Format(time.RFC3339) + toTask.LatestCompleted = cs.Now() } return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask) @@ -112,7 +112,7 @@ func (cs *CoordinatingCheckService) PatchCheck(ctx context.Context, id influxdb. // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { - toTask.LatestCompleted = cs.Now().Format(time.RFC3339) + toTask.LatestCompleted = cs.Now() } return to, cs.coordinator.TaskUpdated(ctx, fromTask, toTask) diff --git a/task/backend/middleware/check_middleware_test.go b/task/backend/middleware/check_middleware_test.go index 5861bc32d8..7bd0fa377c 100644 --- a/task/backend/middleware/check_middleware_test.go +++ b/task/backend/middleware/check_middleware_test.go @@ -216,7 +216,7 @@ func TestCheckUpdateFromInactive(t *testing.T) { if task.ID != thecheck.GetTaskID() { t.Fatalf("task sent to coordinator doesn't match expected") } - if task.LatestCompleted != latest.Format(time.RFC3339) { + if task.LatestCompleted != latest { t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) } default: @@ -233,7 +233,7 @@ func TestCheckUpdateFromInactive(t *testing.T) { if task.ID != thecheck.GetTaskID() { t.Fatalf("task sent to coordinator doesn't match expected") } - if task.LatestCompleted != latest.Format(time.RFC3339) { + if task.LatestCompleted != latest { t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) } default: diff --git a/task/backend/middleware/middleware.go b/task/backend/middleware/middleware.go index 27d9296e0d..40e39c9582 100644 --- a/task/backend/middleware/middleware.go +++ b/task/backend/middleware/middleware.go @@ -76,7 +76,7 @@ func (s *CoordinatingTaskService) UpdateTask(ctx context.Context, id influxdb.ID if upd.Status != nil && *upd.Status == string(backend.TaskActive) { // confirm that it was inactive and this is an attempt to activate if from.Status == string(backend.TaskInactive) { - lc := s.now().Format(time.RFC3339) + lc := s.now() upd.LatestCompleted = &lc } } diff --git a/task/backend/middleware/middleware_test.go b/task/backend/middleware/middleware_test.go index 92f0a552e1..bee707b742 100644 --- a/task/backend/middleware/middleware_test.go +++ b/task/backend/middleware/middleware_test.go @@ -257,7 +257,7 @@ func TestCoordinatingTaskService_ClaimTaskUpdatesLatestCompleted(t *testing.T) { select { case claimedTask := <-cchan: - if claimedTask.LatestCompleted != latest.UTC().Format(time.RFC3339) { + if claimedTask.LatestCompleted != latest.UTC() { t.Fatal("failed up update latest completed in claimed task") } case <-time.After(time.Second): diff --git a/task/backend/middleware/notification_middleware.go b/task/backend/middleware/notification_middleware.go index 8dda611c4e..ad5f706d4c 100644 --- a/task/backend/middleware/notification_middleware.go +++ b/task/backend/middleware/notification_middleware.go @@ -80,7 +80,7 @@ func (ns *CoordinatingNotificationRuleStore) UpdateNotificationRule(ctx context. // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { - toTask.LatestCompleted = ns.Now().Format(time.RFC3339) + toTask.LatestCompleted = ns.Now() } return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask) @@ -111,7 +111,7 @@ func (ns *CoordinatingNotificationRuleStore) PatchNotificationRule(ctx context.C // if the update is to activate and the previous task was inactive we should add a "latest completed" update // this allows us to see not run the task for inactive time if fromTask.Status == string(backend.TaskInactive) && toTask.Status == string(backend.TaskActive) { - toTask.LatestCompleted = ns.Now().Format(time.RFC3339) + toTask.LatestCompleted = ns.Now() } return to, ns.coordinator.TaskUpdated(ctx, fromTask, toTask) diff --git a/task/backend/middleware/notification_middleware_test.go b/task/backend/middleware/notification_middleware_test.go index 2cc5e0397f..a654ca8e76 100644 --- a/task/backend/middleware/notification_middleware_test.go +++ b/task/backend/middleware/notification_middleware_test.go @@ -107,7 +107,7 @@ func TestNotificationRuleUpdateFromInactive(t *testing.T) { if task.ID != therule.GetTaskID() { t.Fatalf("task sent to coordinator doesn't match expected") } - if task.LatestCompleted != latest.Format(time.RFC3339) { + if task.LatestCompleted != latest { t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) } default: @@ -124,7 +124,7 @@ func TestNotificationRuleUpdateFromInactive(t *testing.T) { if task.ID != therule.GetTaskID() { t.Fatalf("task sent to coordinator doesn't match expected") } - if task.LatestCompleted != latest.Format(time.RFC3339) { + if task.LatestCompleted != latest { t.Fatalf("update returned incorrect LatestCompleted, expected %s got %s, or ", latest.Format(time.RFC3339), task.LatestCompleted) } default: diff --git a/task/backend/schedulable_task_service_test.go b/task/backend/schedulable_task_service_test.go index 5071223c74..e4e28ff388 100644 --- a/task/backend/schedulable_task_service_test.go +++ b/task/backend/schedulable_task_service_test.go @@ -10,13 +10,12 @@ import ( ) var ( - mockTaskID = influxdb.ID(1) - mockTimeNow = time.Now() - mockTimeNowStr = time.Now().Format(time.RFC3339Nano) + mockTaskID = influxdb.ID(1) + mockTimeNow = time.Now() ) func (m MockTaskService) UpdateTask(_ context.Context, id influxdb.ID, _ influxdb.TaskUpdate) (*influxdb.Task, error) { - return &influxdb.Task{ID: id, UpdatedAt: mockTimeNowStr}, nil + return &influxdb.Task{ID: id, UpdatedAt: mockTimeNow}, nil } type MockTaskService struct{} diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index 6ac0c238da..7b432a507a 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -34,11 +34,13 @@ func TestScheduler_Cancelation(t *testing.T) { defer o.Stop() const orgID = 2 + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:04Z") + task := &platform.Task{ ID: platform.ID(1), OrganizationID: orgID, Every: "1s", - LatestCompleted: "1970-01-01T00:00:04Z", + LatestCompleted: latestCompleted, Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } tcs.SetTask(task) @@ -78,10 +80,11 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { o.Start(context.Background()) defer o.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:03Z") task := &platform.Task{ ID: platform.ID(1), Cron: "* * * * *", - LatestCompleted: "1970-01-01T00:00:03Z", + LatestCompleted: latestCompleted, Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -99,7 +102,7 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { task = &platform.Task{ ID: platform.ID(2), Every: "1s", - LatestCompleted: "1970-01-01T00:00:03Z", + LatestCompleted: latestCompleted, Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -145,11 +148,12 @@ func TestScheduler_DontRunInactiveTasks(t *testing.T) { o := backend.NewScheduler(tcs, e, 5) o.Start(context.Background()) defer o.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:05Z") task := &platform.Task{ ID: platform.ID(1), Every: "1s", - LatestCompleted: "1970-01-01T00:00:05Z", + LatestCompleted: latestCompleted, Status: "inactive", Flux: `option task = {concurrency: 2, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -177,11 +181,12 @@ func TestScheduler_CreateNextRunOnTick(t *testing.T) { o := backend.NewScheduler(tcs, e, 5) o.Start(context.Background()) defer o.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:05Z") task := &platform.Task{ ID: platform.ID(1), Every: "1s", - LatestCompleted: "1970-01-01T00:00:05Z", + LatestCompleted: latestCompleted, Flux: `option task = {concurrency: 2, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -252,12 +257,13 @@ func TestScheduler_LogStatisticsOnSuccess(t *testing.T) { const taskID = 0x12345 const orgID = 0x54321 + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:05Z") task := &platform.Task{ ID: taskID, OrganizationID: orgID, Every: "1s", - LatestCompleted: "1970-01-01T00:00:05Z", + LatestCompleted: latestCompleted, Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -315,11 +321,12 @@ func TestScheduler_Release(t *testing.T) { o := backend.NewScheduler(tcs, e, 5) o.Start(context.Background()) defer o.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:05Z") task := &platform.Task{ ID: platform.ID(1), Every: "1s", - LatestCompleted: "1970-01-01T00:00:05Z", + LatestCompleted: latestCompleted, Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -351,11 +358,12 @@ func TestScheduler_UpdateTask(t *testing.T) { s := backend.NewScheduler(tcs, e, 3059, backend.WithLogger(zaptest.NewLogger(t))) s.Start(context.Background()) defer s.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:50:00Z") task := &platform.Task{ ID: platform.ID(1), Cron: "* * * * *", - LatestCompleted: "1970-01-01T00:50:00Z", + LatestCompleted: latestCompleted, Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -402,11 +410,12 @@ func TestScheduler_Queue(t *testing.T) { o := backend.NewScheduler(tcs, e, 3059, backend.WithLogger(zaptest.NewLogger(t))) o.Start(context.Background()) defer o.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:50:00Z") task := &platform.Task{ ID: platform.ID(1), Cron: "* * * * *", - LatestCompleted: "1970-01-01T00:50:00Z", + LatestCompleted: latestCompleted, Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } t1, _ := time.Parse(time.RFC3339, "1970-01-01T00:02:00Z") @@ -639,13 +648,14 @@ func TestScheduler_RunStatus(t *testing.T) { s := backend.NewScheduler(rl, e, 5, backend.WithLogger(zaptest.NewLogger(t))) s.Start(context.Background()) defer s.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:05Z") // Claim a task that starts later. task := &platform.Task{ ID: platform.ID(1), OrganizationID: platform.ID(2), Every: "1s", - LatestCompleted: "1970-01-01T00:00:05Z", + LatestCompleted: latestCompleted, Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -739,12 +749,13 @@ func TestScheduler_RunFailureCleanup(t *testing.T) { s := backend.NewScheduler(ll, e, 5, backend.WithLogger(zaptest.NewLogger(t))) s.Start(context.Background()) defer s.Stop() + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:05Z") // Task with concurrency 1 should continue after one run fails. task := &platform.Task{ ID: platform.ID(1), Every: "1s", - LatestCompleted: "1970-01-01T00:00:05Z", + LatestCompleted: latestCompleted, Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -829,10 +840,11 @@ func TestScheduler_Metrics(t *testing.T) { reg.MustRegister(s.PrometheusCollectors()...) // Claim a task that starts later. + latestCompleted, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:05Z") task := &platform.Task{ ID: platform.ID(1), Every: "1s", - LatestCompleted: "1970-01-01T00:00:05Z", + LatestCompleted: latestCompleted, Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, } @@ -1008,12 +1020,12 @@ func TestScheduler_WithTicker(t *testing.T) { o.Start(ctx) defer o.Stop() - createdAt := time.Now() + createdAt := time.Now().UTC() task := &platform.Task{ ID: platform.ID(1), Every: "1s", Flux: `option task = {concurrency: 5, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`, - LatestCompleted: createdAt.Format(time.RFC3339Nano), + LatestCompleted: createdAt, } tcs.SetTask(task) diff --git a/task/mock/task_control_service.go b/task/mock/task_control_service.go index 5de85e92cc..e7093f8d72 100644 --- a/task/mock/task_control_service.go +++ b/task/mock/task_control_service.go @@ -112,11 +112,9 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back if err != nil { return backend.RunCreation{}, err } - latest := int64(0) - lt, err := time.Parse(time.RFC3339, task.LatestCompleted) - if err == nil { - latest = lt.Unix() - } + + latest := task.LatestCompleted.Unix() + for _, r := range t.runs[task.ID] { if r.ScheduledFor.Unix() > latest { latest = r.ScheduledFor.Unix() @@ -126,13 +124,12 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back nextScheduled := sch.Next(time.Unix(latest, 0)) nextScheduledUnix := nextScheduled.Unix() + offset := int64(0) - if task.Offset != "" { - toff, err := time.ParseDuration(task.Offset) - if err == nil { - offset = toff.Nanoseconds() - } + if task.Offset != 0 { + offset = task.Offset.Nanoseconds() } + if dueAt := nextScheduledUnix + int64(offset); dueAt > now { return backend.RunCreation{}, influxdb.ErrRunNotDueYet(dueAt) } @@ -201,19 +198,11 @@ func (d *TaskControlService) FinishRun(_ context.Context, taskID, runID influxdb r := d.runs[tid][rid] delete(d.runs[tid], rid) t := d.tasks[tid] - schedFor := r.ScheduledFor.Format(time.RFC3339) - if t.LatestCompleted != "" { - var latest time.Time - latest, err := time.Parse(time.RFC3339, t.LatestCompleted) - if err != nil { - return nil, err - } - - if r.ScheduledFor.After(latest) { - t.LatestCompleted = schedFor - } + if r.ScheduledFor.After(t.LatestCompleted) { + t.LatestCompleted = r.ScheduledFor } + d.finishedRuns[rid] = r delete(d.created, tid.String()+rid.String()) return r, nil @@ -254,11 +243,8 @@ func (d *TaskControlService) nextDueRun(ctx context.Context, taskID influxdb.ID) if err != nil { return 0, err } - latest := int64(0) - lt, err := time.Parse(time.RFC3339, task.LatestCompleted) - if err == nil { - latest = lt.Unix() - } + + latest := task.LatestCompleted.Unix() for _, r := range d.runs[task.ID] { if r.ScheduledFor.Unix() > latest { @@ -268,12 +254,10 @@ func (d *TaskControlService) nextDueRun(ctx context.Context, taskID influxdb.ID) nextScheduled := sch.Next(time.Unix(latest, 0)) nextScheduledUnix := nextScheduled.Unix() + offset := int64(0) - if task.Offset != "" { - toff, err := time.ParseDuration(task.Offset) - if err == nil { - offset = toff.Nanoseconds() - } + if task.Offset != 0 { + offset = task.Offset.Nanoseconds() } return nextScheduledUnix + int64(offset), nil diff --git a/task/servicetest/servicetest.go b/task/servicetest/servicetest.go index 1338b23393..a6b80319b5 100644 --- a/task/servicetest/servicetest.go +++ b/task/servicetest/servicetest.go @@ -249,7 +249,7 @@ func testTaskCRUD(t *testing.T, sys *System) { OwnerID: tsk.OwnerID, Name: "task #0", Cron: "* * * * *", - Offset: "5s", + Offset: 5 * time.Second, Status: string(backend.DefaultTaskStatus), Flux: fmt.Sprintf(scriptFmt, 0), Type: influxdb.TaskSystemType, @@ -563,7 +563,8 @@ from(bucket: "b") if err != nil { t.Fatal(err) } - if fNoOffset.Offset != "" { + var zero time.Duration + if fNoOffset.Offset != zero { t.Fatal("removing offset failed") } }) @@ -599,19 +600,13 @@ func testUpdate(t *testing.T, sys *System) { after := time.Now() latestCA := after.Add(time.Second) - ca, err := time.Parse(time.RFC3339, st.CreatedAt) - if err != nil { - t.Fatal(err) - } + ca := st.CreatedAt if earliestCA.After(ca) || latestCA.Before(ca) { t.Fatalf("createdAt not accurate, expected %s to be between %s and %s", ca, earliestCA, latestCA) } - ti, err := time.Parse(time.RFC3339, st.LatestCompleted) - if err != nil { - t.Fatal(err) - } + ti := st.LatestCompleted if now.Sub(ti) > 10*time.Second { t.Fatalf("latest completed not accurate, expected: ~%s, got %s", now, ti) @@ -641,7 +636,7 @@ func testUpdate(t *testing.T, sys *System) { t.Fatal(err) } - if st2.LatestCompleted <= st.LatestCompleted { + if st2.LatestCompleted.Before(st.LatestCompleted) { t.Fatalf("executed task has not updated latest complete: expected %s > %s", st2.LatestCompleted, st.LatestCompleted) } @@ -683,7 +678,7 @@ func testUpdate(t *testing.T, sys *System) { t.Fatal(err) } - if st3.LatestCompleted <= st2.LatestCompleted { + if st3.LatestCompleted.Before(st2.LatestCompleted) { t.Fatalf("executed task has not updated latest complete: expected %s > %s", st3.LatestCompleted, st2.LatestCompleted) } @@ -706,10 +701,7 @@ func testUpdate(t *testing.T, sys *System) { earliestUA := now.Add(-time.Second) latestUA := after.Add(time.Second) - ua, err := time.Parse(time.RFC3339, task.UpdatedAt) - if err != nil { - t.Fatal(err) - } + ua := task.UpdatedAt if earliestUA.After(ua) || latestUA.Before(ua) { t.Fatalf("updatedAt not accurate, expected %s to be between %s and %s", ua, earliestUA, latestUA) @@ -720,10 +712,7 @@ func testUpdate(t *testing.T, sys *System) { t.Fatal(err) } - ua, err = time.Parse(time.RFC3339, st.UpdatedAt) - if err != nil { - t.Fatal(err) - } + ua = st.UpdatedAt if earliestUA.After(ua) || latestUA.Before(ua) { t.Fatalf("updatedAt not accurate after pulling new task, expected %s to be between %s and %s", ua, earliestUA, latestUA)