feat(metrics): add run latency to executor metrics (#16190)

pull/16209/head
Alirie Gray 2019-12-11 14:50:32 -08:00 committed by GitHub
parent efdc6e592b
commit b5ccad3c07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 80 additions and 53 deletions

View File

@ -647,11 +647,11 @@ func (m *Launcher) run(ctx context.Context) (err error) {
sch, sm, err := scheduler.NewScheduler( sch, sm, err := scheduler.NewScheduler(
executor, executor,
taskbackend.NewSchedulableTaskService(m.kvService), taskbackend.NewSchedulableTaskService(m.kvService),
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) { scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledFor time.Time, err error) {
schLogger.Info( schLogger.Info(
"error in scheduler run", "error in scheduler run",
zap.String("taskID", platform.ID(taskID).String()), zap.String("taskID", platform.ID(taskID).String()),
zap.Time("scheduledAt", scheduledAt), zap.Time("scheduledFor", scheduledFor),
zap.Error(err)) zap.Error(err))
}), }),
) )

View File

@ -1360,10 +1360,10 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
} }
// CreateRun creates a run with a scheduledFor time as now. // CreateRun creates a run with a scheduledFor time as now.
func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
var r *influxdb.Run var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error { err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.createRun(ctx, tx, taskID, scheduledFor) run, err := s.createRun(ctx, tx, taskID, scheduledFor, runAt)
if err != nil { if err != nil {
return err return err
} }
@ -1372,13 +1372,14 @@ func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFo
}) })
return r, err return r, err
} }
func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
id := s.IDGenerator.ID() id := s.IDGenerator.ID()
run := influxdb.Run{ run := influxdb.Run{
ID: id, ID: id,
TaskID: taskID, TaskID: taskID,
ScheduledFor: scheduledFor, ScheduledFor: scheduledFor,
RunAt: runAt,
Status: backend.RunScheduled.String(), Status: backend.RunScheduled.String(),
Log: []influxdb.Log{}, Log: []influxdb.Log{},
} }

View File

@ -72,7 +72,7 @@ func (s *TaskService) ForceRun(ctx context.Context, taskID influxdb.ID, schedule
type TaskControlService struct { type TaskControlService struct {
CreateNextRunFn func(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) CreateNextRunFn func(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error)
NextDueRunFn func(ctx context.Context, taskID influxdb.ID) (int64, error) NextDueRunFn func(ctx context.Context, taskID influxdb.ID) (int64, error)
CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) CreateRunFn func(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error)
CurrentlyRunningFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) CurrentlyRunningFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) ManualRunsFn func(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) StartManualRunFn func(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
@ -87,8 +87,8 @@ func (tcs *TaskControlService) CreateNextRun(ctx context.Context, taskID influxd
func (tcs *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) { func (tcs *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
return tcs.NextDueRunFn(ctx, taskID) return tcs.NextDueRunFn(ctx, taskID)
} }
func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { func (tcs *TaskControlService) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
return tcs.CreateRunFn(ctx, taskID, scheduledFor) return tcs.CreateRunFn(ctx, taskID, scheduledFor, runAt)
} }
func (tcs *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) { func (tcs *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
return tcs.CurrentlyRunningFn(ctx, taskID) return tcs.CurrentlyRunningFn(ctx, taskID)

View File

@ -84,7 +84,8 @@ type Run struct {
ID ID `json:"id,omitempty"` ID ID `json:"id,omitempty"`
TaskID ID `json:"taskID"` TaskID ID `json:"taskID"`
Status string `json:"status"` Status string `json:"status"`
ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the time the task is scheduled to run at ScheduledFor time.Time `json:"scheduledFor"` // ScheduledFor is the Now time used in the task's query
RunAt time.Time `json:"runAt"` // RunAt is the time the task is scheduled to be run, which is ScheduledFor + Offset
StartedAt time.Time `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task StartedAt time.Time `json:"startedAt,omitempty"` // StartedAt is the time the executor begins running the task
FinishedAt time.Time `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task FinishedAt time.Time `json:"finishedAt,omitempty"` // FinishedAt is the time the executor finishes running the task
RequestedAt time.Time `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task RequestedAt time.Time `json:"requestedAt,omitempty"` // RequestedAt is the time the coordinator told the scheduler to schedule the task

View File

@ -386,7 +386,7 @@ func (re *runReader) readRuns(cr flux.ColReader) error {
case scheduledForField: case scheduledForField:
scheduled, err := time.Parse(time.RFC3339, cr.Strings(j).ValueString(i)) scheduled, err := time.Parse(time.RFC3339, cr.Strings(j).ValueString(i))
if err != nil { if err != nil {
re.log.Info("Failed to parse scheduledAt time", zap.Error(err)) re.log.Info("Failed to parse scheduledFor time", zap.Error(err))
continue continue
} }
r.ScheduledFor = scheduled.UTC() r.ScheduledFor = scheduled.UTC()

View File

@ -17,6 +17,7 @@ type ExecutorMetrics struct {
manualRunsCounter *prometheus.CounterVec manualRunsCounter *prometheus.CounterVec
resumeRunsCounter *prometheus.CounterVec resumeRunsCounter *prometheus.CounterVec
unrecoverableCounter *prometheus.CounterVec unrecoverableCounter *prometheus.CounterVec
runLatency *prometheus.HistogramVec
} }
type runCollector struct { type runCollector struct {
@ -83,6 +84,13 @@ func NewExecutorMetrics(te *TaskExecutor) *ExecutorMetrics {
Name: "resume_runs_counter", Name: "resume_runs_counter",
Help: "Total number of runs resumed by task ID", Help: "Total number of runs resumed by task ID",
}, []string{"taskID"}), }, []string{"taskID"}),
runLatency: prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "run_latency_seconds",
Help: "Records the latency between the time the run was due to run and the time the task started execution, by task type",
}, []string{"task_type"}),
} }
} }
@ -122,13 +130,17 @@ func (em *ExecutorMetrics) PrometheusCollectors() []prometheus.Collector {
em.manualRunsCounter, em.manualRunsCounter,
em.resumeRunsCounter, em.resumeRunsCounter,
em.unrecoverableCounter, em.unrecoverableCounter,
em.runLatency,
} }
} }
// StartRun store the delta time between when a run is due to start and actually starting. // StartRun store the delta time between when a run is due to start and actually starting.
func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration) { func (em *ExecutorMetrics) StartRun(task *influxdb.Task, queueDelta time.Duration, runLatency time.Duration) {
em.queueDelta.WithLabelValues(task.Type, "all").Observe(queueDelta.Seconds()) em.queueDelta.WithLabelValues(task.Type, "all").Observe(queueDelta.Seconds())
em.queueDelta.WithLabelValues("", task.ID.String()).Observe(queueDelta.Seconds()) em.queueDelta.WithLabelValues("", task.ID.String()).Observe(queueDelta.Seconds())
// schedule interval duration = (time task was scheduled to run) - (time it actually ran)
em.runLatency.WithLabelValues(task.Type).Observe(runLatency.Seconds())
} }
// FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID. // FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID.

View File

@ -16,15 +16,15 @@ var (
func TestTaskConcurrency(t *testing.T) { func TestTaskConcurrency(t *testing.T) {
tes := taskExecutorSystem(t) tes := taskExecutorSystem(t)
te := tes.ex te := tes.ex
r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second)) r1, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-4*time.Second), time.Now())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second)) r2, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-3*time.Second), time.Now())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second)) r3, err := te.tcs.CreateRun(context.Background(), taskWith1Concurrency.ID, time.Now().Add(-2*time.Second), time.Now())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -97,20 +97,20 @@ func (e *TaskExecutor) SetLimitFunc(l LimitFunc) {
} }
// Execute is a executor to satisfy the needs of tasks // Execute is a executor to satisfy the needs of tasks
func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) error { func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) error {
_, err := e.PromisedExecute(ctx, id, scheduledAt) _, err := e.PromisedExecute(ctx, id, scheduledFor, runAt)
return err return err
} }
// PromisedExecute begins execution for the tasks id with a specific scheduledAt time. // PromisedExecute begins execution for the tasks id with a specific scheduledFor time.
// When we execute we will first build a run for the scheduledAt time, // When we execute we will first build a run for the scheduledFor time,
// We then want to add to the queue anything that was manually queued to run. // We then want to add to the queue anything that was manually queued to run.
// If the queue is full the call to execute should hang and apply back pressure to the caller // If the queue is full the call to execute should hang and apply back pressure to the caller
// We then start a worker to work the newly queued jobs. // We then start a worker to work the newly queued jobs.
func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) (Promise, error) { func (e *TaskExecutor) PromisedExecute(ctx context.Context, id scheduler.ID, scheduledFor time.Time, runAt time.Time) (Promise, error) {
iid := influxdb.ID(id) iid := influxdb.ID(id)
// create a run // create a run
p, err := e.createRun(ctx, iid, scheduledAt) p, err := e.createRun(ctx, iid, scheduledFor, runAt)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -154,8 +154,8 @@ func (e *TaskExecutor) ResumeCurrentRun(ctx context.Context, id influxdb.ID, run
return nil, influxdb.ErrRunNotFound return nil, influxdb.ErrRunNotFound
} }
func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*promise, error) { func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledFor time.Time, runAt time.Time) (*promise, error) {
r, err := e.tcs.CreateRun(ctx, id, scheduledAt.UTC()) r, err := e.tcs.CreateRun(ctx, id, scheduledFor.UTC(), runAt.UTC())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -310,7 +310,7 @@ func (w *worker) start(p *promise) {
w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), backend.RunStarted) w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), backend.RunStarted)
// add to metrics // add to metrics
w.te.metrics.StartRun(p.task, time.Since(p.createdAt)) w.te.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt))
p.startedAt = time.Now() p.startedAt = time.Now()
} }

View File

@ -71,7 +71,7 @@ func testQuerySuccess(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -86,6 +86,10 @@ func testQuerySuccess(t *testing.T) {
t.Fatal("promise and run dont match") t.Fatal("promise and run dont match")
} }
if run.RunAt != time.Unix(126, 0).UTC() {
t.Fatalf("did not correctly set RunAt value, got: %v", run.RunAt)
}
tes.svc.WaitForQueryLive(t, script) tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script) tes.svc.SucceedQuery(script)
@ -113,7 +117,7 @@ func testQueryFailure(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -196,7 +200,7 @@ func testResumingRun(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0)) stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -239,7 +243,7 @@ func testWorkerLimit(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -281,7 +285,7 @@ func testLimitFunc(t *testing.T) {
return nil return nil
}) })
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -317,7 +321,7 @@ func testMetrics(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -383,6 +387,15 @@ func testMetrics(t *testing.T) {
t.Fatalf("expected 1 manual run, got %v", got) t.Fatalf("expected 1 manual run, got %v", got)
} }
m = promtest.MustFindMetric(t, mg, "task_executor_run_latency_seconds", map[string]string{"task_type": ""})
if got := *m.Histogram.SampleCount; got != 1 {
t.Fatalf("expected to count 1 run latency metric, got %v", got)
}
if got := *m.Histogram.SampleSum; got <= 100 {
t.Fatalf("expected run latency metric to be very large, got %v", got)
}
} }
func testIteratorFailure(t *testing.T) { func testIteratorFailure(t *testing.T) {
@ -403,7 +416,7 @@ func testIteratorFailure(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -447,7 +460,7 @@ func testErrorHandling(t *testing.T) {
forcedErr := errors.New("could not find bucket") forcedErr := errors.New("could not find bucket")
tes.svc.FailNextQuery(forcedErr) tes.svc.FailNextQuery(forcedErr)
promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0)) promise, err := tes.ex.PromisedExecute(ctx, scheduler.ID(task.ID), time.Unix(123, 0), time.Unix(126, 0))
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -19,7 +19,7 @@ type Executor interface {
// Errors returned from the execute request imply that this attempt has failed and // Errors returned from the execute request imply that this attempt has failed and
// should be put back in scheduler and re executed at a alter time. We will add scheduler specific errors // should be put back in scheduler and re executed at a alter time. We will add scheduler specific errors
// so the time can be configurable. // so the time can be configurable.
Execute(ctx context.Context, id ID, scheduledAt time.Time) error Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error
} }
// Schedulable is the interface that encapsulates work that // Schedulable is the interface that encapsulates work that

View File

@ -11,7 +11,7 @@ import (
type mockExecutor struct { type mockExecutor struct {
sync.Mutex sync.Mutex
fn func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) fn func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time)
Err error Err error
} }
@ -36,12 +36,12 @@ func (s mockSchedulable) LastScheduled() time.Time {
return s.lastScheduled return s.lastScheduled
} }
func (e *mockExecutor) Execute(ctx context.Context, id ID, scheduledAt time.Time) error { func (e *mockExecutor) Execute(ctx context.Context, id ID, scheduledFor time.Time, runAt time.Time) error {
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
select { select {
case <-ctx.Done(): case <-ctx.Done():
default: default:
e.fn(&sync.Mutex{}, ctx, id, scheduledAt) e.fn(&sync.Mutex{}, ctx, id, scheduledFor)
done <- struct{}{} done <- struct{}{}
} }
return nil return nil
@ -60,11 +60,11 @@ func TestSchedule_Next(t *testing.T) {
t.Run("fires properly with non-mocked time", func(t *testing.T) { t.Run("fires properly with non-mocked time", func(t *testing.T) {
now := time.Now() now := time.Now()
c := make(chan time.Time, 100) c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Log("ctx done") t.Log("ctx done")
case c <- scheduledAt: case c <- scheduledFor:
default: default:
t.Errorf("called the executor too many times") t.Errorf("called the executor too many times")
} }
@ -99,11 +99,11 @@ func TestSchedule_Next(t *testing.T) {
mockTime := clock.NewMock() mockTime := clock.NewMock()
mockTime.Set(time.Now()) mockTime.Set(time.Now())
c := make(chan time.Time, 100) c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Log("ctx done") t.Log("ctx done")
case c <- scheduledAt: case c <- scheduledFor:
default: default:
t.Errorf("called the executor too many times") t.Errorf("called the executor too many times")
} }
@ -144,11 +144,11 @@ func TestSchedule_Next(t *testing.T) {
t.Run("fires the correct number of times for the interval with a single schedulable", func(t *testing.T) { t.Run("fires the correct number of times for the interval with a single schedulable", func(t *testing.T) {
c := make(chan time.Time, 100) c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Log("ctx done") t.Log("ctx done")
case c <- scheduledAt: case c <- scheduledFor:
} }
}} }}
mockTime := clock.NewMock() mockTime := clock.NewMock()
@ -216,7 +216,7 @@ func TestSchedule_Next(t *testing.T) {
ts time.Time ts time.Time
id ID id ID
}, 100) }, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Log("ctx done") t.Log("ctx done")
@ -224,7 +224,7 @@ func TestSchedule_Next(t *testing.T) {
ts time.Time ts time.Time
id ID id ID
}{ }{
ts: scheduledAt, ts: scheduledFor,
id: id, id: id,
}: }:
} }
@ -303,7 +303,7 @@ func TestTreeScheduler_Stop(t *testing.T) {
now := time.Now().Add(-20 * time.Second) now := time.Now().Add(-20 * time.Second)
mockTime := clock.NewMock() mockTime := clock.NewMock()
mockTime.Set(now) mockTime.Set(now)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) {}} exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {}}
sch, _, err := NewScheduler(exe, &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error { sch, _, err := NewScheduler(exe, &mockSchedulableService{fn: func(ctx context.Context, id ID, t time.Time) error {
return nil return nil
}}, }},
@ -322,7 +322,7 @@ func TestSchedule_panic(t *testing.T) {
err error err error
}, 1) }, 1)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {
panic("yikes oh no!") panic("yikes oh no!")
}} }}
@ -370,7 +370,7 @@ func TestTreeScheduler_LongPanicTest(t *testing.T) {
mockTime := clock.NewMock() mockTime := clock.NewMock()
mockTime.Set(now) mockTime.Set(now)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Log("ctx done") t.Log("ctx done")
@ -423,11 +423,11 @@ func TestTreeScheduler_LongPanicTest(t *testing.T) {
func TestTreeScheduler_Release(t *testing.T) { func TestTreeScheduler_Release(t *testing.T) {
c := make(chan time.Time, 100) c := make(chan time.Time, 100)
exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledAt time.Time) { exe := &mockExecutor{fn: func(l *sync.Mutex, ctx context.Context, id ID, scheduledFor time.Time) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
t.Log("ctx done") t.Log("ctx done")
case c <- scheduledAt: case c <- scheduledFor:
} }
}} }}
mockTime := clock.NewMock() mockTime := clock.NewMock()

View File

@ -78,7 +78,7 @@ type TreeScheduler struct {
} }
// ErrorFunc is a function for error handling. It is a good way to inject logging into a TreeScheduler. // ErrorFunc is a function for error handling. It is a good way to inject logging into a TreeScheduler.
type ErrorFunc func(ctx context.Context, taskID ID, scheduledAt time.Time, err error) type ErrorFunc func(ctx context.Context, taskID ID, scheduledFor time.Time, err error)
type treeSchedulerOptFunc func(t *TreeScheduler) error type treeSchedulerOptFunc func(t *TreeScheduler) error
@ -318,7 +318,7 @@ func (s *TreeScheduler) work(ctx context.Context, ch chan Item) {
s.sm.reportScheduleDelay(time.Since(it.Next())) s.sm.reportScheduleDelay(time.Since(it.Next()))
preExec := time.Now() preExec := time.Now()
// execute // execute
err = s.executor.Execute(ctx, it.id, t) err = s.executor.Execute(ctx, it.id, t, it.when())
// report how long execution took // report how long execution took
s.sm.reportExecution(err, time.Since(preExec)) s.sm.reportExecution(err, time.Since(preExec))
return err return err

View File

@ -22,10 +22,10 @@ type TaskControlService interface {
NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error)
// CreateRun creates a run with a schedule for time. // CreateRun creates a run with a schedule for time.
// This differes from CreateNextRun in that it should not to use some scheduling system to determin when the run // This differs from CreateNextRun in that it should not to use some scheduling system to determine when the run
// should happen. // should happen.
// TODO(lh): remove comment once we no longer need create next run. // TODO(lh): remove comment once we no longer need create next run.
CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error)
CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)

View File

@ -155,7 +155,7 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back
}, nil }, nil
} }
func (t *TaskControlService) CreateRun(_ context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) { func (t *TaskControlService) CreateRun(_ context.Context, taskID influxdb.ID, scheduledFor time.Time, runAt time.Time) (*influxdb.Run, error) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()