From b5ccad3c07a1d57d997f12fa7850872afd5c7e5b Mon Sep 17 00:00:00 2001 From: Alirie Gray Date: Wed, 11 Dec 2019 14:50:32 -0800 Subject: [PATCH] feat(metrics): add run latency to executor metrics (#16190) --- cmd/influxd/launcher/launcher.go | 4 +-- kv/task.go | 7 +++-- mock/task_service.go | 6 ++-- task.go | 3 +- task/backend/analytical_storage.go | 2 +- task/backend/executor/executor_metrics.go | 14 ++++++++- task/backend/executor/limits_test.go | 6 ++-- task/backend/executor/task_executor.go | 18 ++++++------ task/backend/executor/task_executor_test.go | 29 +++++++++++++------ task/backend/scheduler/scheduler.go | 2 +- task/backend/scheduler/scheduler_test.go | 32 ++++++++++----------- task/backend/scheduler/treescheduler.go | 4 +-- task/backend/task.go | 4 +-- task/mock/task_control_service.go | 2 +- 14 files changed, 80 insertions(+), 53 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 37ae49f70d..c07bf70fa4 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -647,11 +647,11 @@ func (m *Launcher) run(ctx context.Context) (err error) { sch, sm, err := scheduler.NewScheduler( executor, taskbackend.NewSchedulableTaskService(m.kvService), - scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) { + scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledFor time.Time, err error) { schLogger.Info( "error in scheduler run", zap.String("taskID", platform.ID(taskID).String()), - zap.Time("scheduledAt", scheduledAt), + zap.Time("scheduledFor", scheduledFor), zap.Error(err)) }), ) diff --git a/kv/task.go b/kv/task.go index 6cceb264be..fe863a94b8 100644 --- a/kv/task.go +++ b/kv/task.go @@ -1360,10 +1360,10 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID, } // CreateRun creates a run with a scheduledFor time as now. -func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { +func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { var r *influxdb.Run err := s.kv.Update(ctx, func(tx Tx) error { - run, err := s.createRun(ctx, tx, taskID, scheduledFor) + run, err := s.createRun(ctx, tx, taskID, scheduledFor, runAt) if err != nil { return err } @@ -1372,13 +1372,14 @@ func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFo }) return r, err } -func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { +func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { id := s.IDGenerator.ID() run := influxdb.Run{ ID: id, TaskID: taskID, ScheduledFor: scheduledFor, + RunAt: runAt, Status: backend.RunScheduled.String(), Log: []influxdb.Log{}, } diff --git a/mock/task_service.go b/mock/task_service.go index 5586e60f6b..42f1440432 100644 --- a/mock/task_service.go +++ b/mock/task_service.go @@ -72,7 +72,7 @@ func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, schedule type TaskControlService struct { CreateNextRunFn func(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) NextDueRunFn func(ctx context.Context, taskID influxdb.ID) (int64, error) - CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) + CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) CurrentlyRunningFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) @@ -87,8 +87,8 @@ func (tcs *TaskControlService) CreateNextRun(ctx context.Context, taskID influxd func (tcs *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) { return tcs.NextDueRunFn(ctx, taskID) } -func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { - return tcs.CreateRunFn(ctx, taskID, scheduledFor) +func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { + return tcs.CreateRunFn(ctx, taskID, scheduledFor, runAt) } func (tcs *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { return tcs.CurrentlyRunningFn(ctx, taskID) diff --git a/task.go b/task.go index a6f3c4e27d..7453446962 100644 --- a/task.go +++ b/task.go @@ -84,7 +84,8 @@ type Run struct { ID ID `json:"id,omitempty"` TaskID ID `json:"taskID"` Status string `json:"status"` - ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the time the task is scheduled to run at + ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the Now time used in the task's query + RunAt time.Time `json:"runAt"` // RunAt is the time the task is scheduled to be run, which is ScheduledFor + Offset StartedAt time.Time `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task FinishedAt time.Time `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task RequestedAt time.Time `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task diff --git a/task/backend/analytical_storage.go b/task/backend/analytical_storage.go index e3501b4ae0..44f1205022 100644 --- a/task/backend/analytical_storage.go +++ b/task/backend/analytical_storage.go @@ -386,7 +386,7 @@ func (re *runReader) readRuns(cr flux.ColReader) error { case scheduledForField: scheduled, err := time.Parse(time.RFC3339, cr.Strings(j).ValueString(i)) if err != nil { - re.log.Info("Failed to parse scheduledAt time", zap.Error(err)) + re.log.Info("Failed to parse scheduledFor time", zap.Error(err)) continue } r.ScheduledFor = scheduled.UTC() diff --git a/task/backend/executor/executor_metrics.go b/task/backend/executor/executor_metrics.go index 92ff7d8293..4e345c77cb 100644 --- a/task/backend/executor/executor_metrics.go +++ b/task/backend/executor/executor_metrics.go @@ -17,6 +17,7 @@ type ExecutorMetrics struct { manualRunsCounter *prometheus.CounterVec resumeRunsCounter *prometheus.CounterVec unrecoverableCounter *prometheus.CounterVec + runLatency *prometheus.HistogramVec } type runCollector struct { @@ -83,6 +84,13 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics { Name: "resume_runs_counter", Help: "Total number of runs resumed by task ID", }, []string{"taskID"}), + + runLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: namespace, + Subsystem: subsystem, + Name: "run_latency_seconds", + Help: "Records the latency between the time the run was due to run and the time the task started execution, by task type", + }, []string{"task_type"}), } } @@ -122,13 +130,17 @@ func (em *ExecutorMetrics) PrometheusCollectors() []prometheus.Collector { em.manualRunsCounter, em.resumeRunsCounter, em.unrecoverableCounter, + em.runLatency, } } // StartRun store the delta time between when a run is due to start and actually starting. -func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration) { +func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration, runLatency time.Duration) { em.queueDelta.WithLabelValues(task.Type, "all").Observe(queueDelta.Seconds()) em.queueDelta.WithLabelValues("", task.ID.String()).Observe(queueDelta.Seconds()) + + // schedule interval duration = (time task was scheduled to run) - (time it actually ran) + em.runLatency.WithLabelValues(task.Type).Observe(runLatency.Seconds()) } // FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID. diff --git a/task/backend/executor/limits_test.go b/task/backend/executor/limits_test.go index e9af710ea6..dd8dd1228f 100644 --- a/task/backend/executor/limits_test.go +++ b/task/backend/executor/limits_test.go @@ -16,15 +16,15 @@ var ( func TestTaskConcurrency(t *testing.T) { tes := taskExecutorSystem(t) te := tes.ex - r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second)) + r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second), time.Now()) if err != nil { t.Fatal(err) } - r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second)) + r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second), time.Now()) if err != nil { t.Fatal(err) } - r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second)) + r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second), time.Now()) if err != nil { t.Fatal(err) } diff --git a/task/backend/executor/task_executor.go b/task/backend/executor/task_executor.go index 725e078317..b423260723 100644 --- a/task/backend/executor/task_executor.go +++ b/task/backend/executor/task_executor.go @@ -97,20 +97,20 @@ func (e *TaskExecutor) SetLimitFunc(l LimitFunc) { } // Execute is a executor to satisfy the needs of tasks -func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) error { - _, err := e.PromisedExecute(ctx, id, scheduledAt) +func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) error { + _, err := e.PromisedExecute(ctx, id, scheduledFor, runAt) return err } -// PromisedExecute begins execution for the tasks id with a specific scheduledAt time. -// When we execute we will first build a run for the scheduledAt time, +// PromisedExecute begins execution for the tasks id with a specific scheduledFor time. +// When we execute we will first build a run for the scheduledFor time, // We then want to add to the queue anything that was manually queued to run. // If the queue is full the call to execute should hang and apply back pressure to the caller // We then start a worker to work the newly queued jobs. -func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) (Promise, error) { +func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) (Promise, error) { iid := influxdb.ID(id) // create a run - p, err := e.createRun(ctx, iid, scheduledAt) + p, err := e.createRun(ctx, iid, scheduledFor, runAt) if err != nil { return nil, err } @@ -154,8 +154,8 @@ func (e *TaskExecutor) ResumeCurrentRun(ctx context.Context, id influxdb.ID, run return nil, influxdb.ErrRunNotFound } -func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*promise, error) { - r, err := e.tcs.CreateRun(ctx, id, scheduledAt.UTC()) +func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledFor time.Time, runAt time.Time) (*promise, error) { + r, err := e.tcs.CreateRun(ctx, id, scheduledFor.UTC(), runAt.UTC()) if err != nil { return nil, err } @@ -310,7 +310,7 @@ func (w *worker) start(p *promise) { w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), backend.RunStarted) // add to metrics - w.te.metrics.StartRun(p.task, time.Since(p.createdAt)) + w.te.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt)) p.startedAt = time.Now() } diff --git a/task/backend/executor/task_executor_test.go b/task/backend/executor/task_executor_test.go index 7b797924f3..c275b23e8a 100644 --- a/task/backend/executor/task_executor_test.go +++ b/task/backend/executor/task_executor_test.go @@ -71,7 +71,7 @@ func testQuerySuccess(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -86,6 +86,10 @@ func testQuerySuccess(t *testing.T) { t.Fatal("promise and run dont match") } + if run.RunAt != time.Unix(126, 0).UTC() { + t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt) + } + tes.svc.WaitForQueryLive(t, script) tes.svc.SucceedQuery(script) @@ -113,7 +117,7 @@ func testQueryFailure(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -196,7 +200,7 @@ func testResumingRun(t *testing.T) { t.Fatal(err) } - stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0)) + stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -239,7 +243,7 @@ func testWorkerLimit(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -281,7 +285,7 @@ func testLimitFunc(t *testing.T) { return nil }) - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -317,7 +321,7 @@ func testMetrics(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -383,6 +387,15 @@ func testMetrics(t *testing.T) { t.Fatalf("expected 1 manual run, got %v", got) } + m = promtest.MustFindMetric(t, mg, "task_executor_run_latency_seconds", map[string]string{"task_type": ""}) + if got := *m.Histogram.SampleCount; got != 1 { + t.Fatalf("expected to count 1 run latency metric, got %v", got) + } + + if got := *m.Histogram.SampleSum; got <= 100 { + t.Fatalf("expected run latency metric to be very large, got %v", got) + } + } func testIteratorFailure(t *testing.T) { @@ -403,7 +416,7 @@ func testIteratorFailure(t *testing.T) { t.Fatal(err) } - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } @@ -447,7 +460,7 @@ func testErrorHandling(t *testing.T) { forcedErr := errors.New("could not find bucket") tes.svc.FailNextQuery(forcedErr) - promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) + promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0)) if err != nil { t.Fatal(err) } diff --git a/task/backend/scheduler/scheduler.go b/task/backend/scheduler/scheduler.go index 9fe6febf3f..53aa22fc10 100644 --- a/task/backend/scheduler/scheduler.go +++ b/task/backend/scheduler/scheduler.go @@ -19,7 +19,7 @@ type Executor interface { // Errors returned from the execute request imply that this attempt has failed and // should be put back in scheduler and re executed at a alter time. We will add scheduler specific errors // so the time can be configurable. - Execute(ctx context.Context, id ID, scheduledAt time.Time) error + Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error } // Schedulable is the interface that encapsulates work that diff --git a/task/backend/scheduler/scheduler_test.go b/task/backend/scheduler/scheduler_test.go index fa73a9b922..04fff141b1 100644 --- a/task/backend/scheduler/scheduler_test.go +++ b/task/backend/scheduler/scheduler_test.go @@ -11,7 +11,7 @@ import ( type mockExecutor struct { sync.Mutex - fn func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) + fn func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) Err error } @@ -36,12 +36,12 @@ func (s mockSchedulable) LastScheduled() time.Time { return s.lastScheduled } -func (e *mockExecutor) Execute(ctx context.Context, id ID, scheduledAt time.Time) error { +func (e *mockExecutor) Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error { done := make(chan struct{}, 1) select { case <-ctx.Done(): default: - e.fn(&sync.Mutex{}, ctx, id, scheduledAt) + e.fn(&sync.Mutex{}, ctx, id, scheduledFor) done <- struct{}{} } return nil @@ -60,11 +60,11 @@ func TestSchedule_Next(t *testing.T) { t.Run("fires properly with non-mocked time", func(t *testing.T) { now := time.Now() c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: default: t.Errorf("called the executor too many times") } @@ -99,11 +99,11 @@ func TestSchedule_Next(t *testing.T) { mockTime := clock.NewMock() mockTime.Set(time.Now()) c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: default: t.Errorf("called the executor too many times") } @@ -144,11 +144,11 @@ func TestSchedule_Next(t *testing.T) { t.Run("fires the correct number of times for the interval with a single schedulable", func(t *testing.T) { c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: } }} mockTime := clock.NewMock() @@ -216,7 +216,7 @@ func TestSchedule_Next(t *testing.T) { ts time.Time id ID }, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") @@ -224,7 +224,7 @@ func TestSchedule_Next(t *testing.T) { ts time.Time id ID }{ - ts: scheduledAt, + ts: scheduledFor, id: id, }: } @@ -303,7 +303,7 @@ func TestTreeScheduler_Stop(t *testing.T) { now := time.Now().Add(-20 * time.Second) mockTime := clock.NewMock() mockTime.Set(now) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {}} + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {}} sch, _, err := NewScheduler(exe, &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { return nil }}, @@ -322,7 +322,7 @@ func TestSchedule_panic(t *testing.T) { err error }, 1) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { panic("yikes oh no!") }} @@ -370,7 +370,7 @@ func TestTreeScheduler_LongPanicTest(t *testing.T) { mockTime := clock.NewMock() mockTime.Set(now) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") @@ -423,11 +423,11 @@ func TestTreeScheduler_LongPanicTest(t *testing.T) { func TestTreeScheduler_Release(t *testing.T) { c := make(chan time.Time, 100) - exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { + exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) { select { case <-ctx.Done(): t.Log("ctx done") - case c <- scheduledAt: + case c <- scheduledFor: } }} mockTime := clock.NewMock() diff --git a/task/backend/scheduler/treescheduler.go b/task/backend/scheduler/treescheduler.go index e2e4fb1ed3..e816ab9718 100644 --- a/task/backend/scheduler/treescheduler.go +++ b/task/backend/scheduler/treescheduler.go @@ -78,7 +78,7 @@ type TreeScheduler struct { } // ErrorFunc is a function for error handling. It is a good way to inject logging into a TreeScheduler. -type ErrorFunc func(ctx context.Context, taskID ID, scheduledAt time.Time, err error) +type ErrorFunc func(ctx context.Context, taskID ID, scheduledFor time.Time, err error) type treeSchedulerOptFunc func(t *TreeScheduler) error @@ -318,7 +318,7 @@ func (s *TreeScheduler) work(ctx context.Context, ch chan Item) { s.sm.reportScheduleDelay(time.Since(it.Next())) preExec := time.Now() // execute - err = s.executor.Execute(ctx, it.id, t) + err = s.executor.Execute(ctx, it.id, t, it.when()) // report how long execution took s.sm.reportExecution(err, time.Since(preExec)) return err diff --git a/task/backend/task.go b/task/backend/task.go index 326e52b07c..b497746497 100644 --- a/task/backend/task.go +++ b/task/backend/task.go @@ -22,10 +22,10 @@ type TaskControlService interface { NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) // CreateRun creates a run with a schedule for time. - // This differes from CreateNextRun in that it should not to use some scheduling system to determin when the run + // This differs from CreateNextRun in that it should not to use some scheduling system to determine when the run // should happen. // TODO(lh): remove comment once we no longer need create next run. - CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) + CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) diff --git a/task/mock/task_control_service.go b/task/mock/task_control_service.go index e7093f8d72..3cec8d7912 100644 --- a/task/mock/task_control_service.go +++ b/task/mock/task_control_service.go @@ -155,7 +155,7 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back }, nil } -func (t *TaskControlService) CreateRun(_ context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { +func (t *TaskControlService) CreateRun(_ context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) { t.mu.Lock() defer t.mu.Unlock()