From a0f1184bb3eeb8a6f8fa1bbdb9898d41355ff67f Mon Sep 17 00:00:00 2001 From: Jeffrey Smith II Date: Thu, 13 Oct 2022 14:57:57 -0400 Subject: [PATCH] fix: manually scheduled task runs now run when expected (#23664) * fix: run manually scheduled tasks at their scheduled time * fix: actually use it * fix: get tests building * fix: fix tests * fix: lint --- cmd/influxd/launcher/launcher.go | 4 + kv/task.go | 9 ++ task/backend/coordinator/coordinator.go | 18 ++- task/backend/coordinator/coordinator_test.go | 24 +++- task/backend/coordinator/support_test.go | 12 +- task/backend/executor/executor.go | 134 ++++++++++++++++++- task/mock/executor.go | 4 + 7 files changed, 190 insertions(+), 15 deletions(-) diff --git a/cmd/influxd/launcher/launcher.go b/cmd/influxd/launcher/launcher.go index 881956aad4..e03e514144 100644 --- a/cmd/influxd/launcher/launcher.go +++ b/cmd/influxd/launcher/launcher.go @@ -461,6 +461,10 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) { combinedTaskService, executor.WithFlagger(m.flagger), ) + err = executor.LoadExistingScheduleRuns(ctx) + if err != nil { + m.log.Fatal("could not load existing scheduled runs", zap.Error(err)) + } m.executor = executor m.reg.MustRegister(executorMetrics.PrometheusCollectors()...) schLogger := m.log.With(zap.String("service", "task-scheduler")) diff --git a/kv/task.go b/kv/task.go index 90e0eb6298..924ed705ef 100644 --- a/kv/task.go +++ b/kv/task.go @@ -1012,6 +1012,15 @@ func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID platform runBytes, err := bucket.Get(key) if err != nil { if IsNotFound(err) { + runs, err := s.manualRuns(ctx, tx, taskID) + for _, run := range runs { + if run.ID == runID { + return run, nil + } + } + if err != nil { + return nil, taskmodel.ErrRunNotFound + } return nil, taskmodel.ErrRunNotFound } return nil, taskmodel.ErrUnexpectedTaskBucketErr(err) diff --git a/task/backend/coordinator/coordinator.go b/task/backend/coordinator/coordinator.go index ee5fe6a2b5..47476fc9eb 100644 --- a/task/backend/coordinator/coordinator.go +++ b/task/backend/coordinator/coordinator.go @@ -22,6 +22,7 @@ const DefaultLimit = 1000 // Executor is an abstraction of the task executor with only the functions needed by the coordinator type Executor interface { ManualRun(ctx context.Context, id platform.ID, runID platform.ID) (executor.Promise, error) + ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error Cancel(ctx context.Context, runID platform.ID) error } @@ -149,7 +150,7 @@ func (c *Coordinator) TaskUpdated(ctx context.Context, from, to *taskmodel.Task) return nil } -//TaskDeleted asks the Scheduler to release the deleted task +// TaskDeleted asks the Scheduler to release the deleted task func (c *Coordinator) TaskDeleted(ctx context.Context, id platform.ID) error { tid := scheduler.ID(id) if err := c.sch.Release(tid); err != nil && err != taskmodel.ErrTaskNotClaimed { @@ -166,14 +167,19 @@ func (c *Coordinator) RunCancelled(ctx context.Context, runID platform.ID) error return err } -// RunForced speaks directly to the Executor to run a task immediately +// RunForced speaks directly to the Executor to run a task immediately, or schedule the run if `scheduledFor` is set. func (c *Coordinator) RunForced(ctx context.Context, task *taskmodel.Task, run *taskmodel.Run) error { - // the returned promise is not used, since clients expect the HTTP server to return immediately after scheduling the - // task rather than waiting for the task to finish - _, err := c.ex.ManualRun(ctx, task.ID, run.ID) + var err error + if !run.ScheduledFor.IsZero() { + err = c.ex.ScheduleManualRun(ctx, task.ID, run.ID) + } else { + // the returned promise is not used, since clients expect the HTTP server to return immediately after scheduling the + // task rather than waiting for the task to finish + _, err = c.ex.ManualRun(ctx, task.ID, run.ID) + } + if err != nil { return taskmodel.ErrRunExecutionError(err) } - return nil } diff --git a/task/backend/coordinator/coordinator_test.go b/task/backend/coordinator/coordinator_test.go index 712ebac675..ccc9f068a2 100644 --- a/task/backend/coordinator/coordinator_test.go +++ b/task/backend/coordinator/coordinator_test.go @@ -20,12 +20,13 @@ func Test_Coordinator_Executor_Methods(t *testing.T) { taskOne = &taskmodel.Task{ID: one} runOne = &taskmodel.Run{ - ID: one, - TaskID: one, - ScheduledFor: time.Now(), + ID: one, + TaskID: one, } allowUnexported = cmp.AllowUnexported(executorE{}, schedulerC{}, SchedulableTask{}) + + scheduledTime = time.Now() ) for _, test := range []struct { @@ -45,7 +46,22 @@ func Test_Coordinator_Executor_Methods(t *testing.T) { }, executor: &executorE{ calls: []interface{}{ - manualRunCall{taskOne.ID, runOne.ID}, + manualRunCall{taskOne.ID, runOne.ID, false}, + }, + }, + }, + { + name: "RunForcedScheduled", + call: func(t *testing.T, c *Coordinator) { + rr := runOne + rr.ScheduledFor = scheduledTime + if err := c.RunForced(context.Background(), taskOne, runOne); err != nil { + t.Errorf("expected nil error found %q", err) + } + }, + executor: &executorE{ + calls: []interface{}{ + manualRunCall{taskOne.ID, runOne.ID, true}, }, }, }, diff --git a/task/backend/coordinator/support_test.go b/task/backend/coordinator/support_test.go index 4a05832e06..6ee351a5ee 100644 --- a/task/backend/coordinator/support_test.go +++ b/task/backend/coordinator/support_test.go @@ -17,8 +17,9 @@ type ( } manualRunCall struct { - TaskID platform.ID - RunID platform.ID + TaskID platform.ID + RunID platform.ID + WasScheduled bool } cancelCallC struct { @@ -96,7 +97,7 @@ func (s *schedulerC) Release(taskID scheduler.ID) error { } func (e *executorE) ManualRun(ctx context.Context, id platform.ID, runID platform.ID) (executor.Promise, error) { - e.calls = append(e.calls, manualRunCall{id, runID}) + e.calls = append(e.calls, manualRunCall{id, runID, false}) ctx, cancel := context.WithCancel(ctx) p := promise{ done: make(chan struct{}), @@ -109,6 +110,11 @@ func (e *executorE) ManualRun(ctx context.Context, id platform.ID, runID platfor return &p, err } +func (e *executorE) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error { + e.calls = append(e.calls, manualRunCall{id, runID, true}) + return nil +} + func (e *executorE) Cancel(ctx context.Context, runID platform.ID) error { e.calls = append(e.calls, cancelCallC{runID}) return nil diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 3d4bef7473..1ebfaa8303 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -145,6 +145,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t ps: us, currentPromises: sync.Map{}, + futurePromises: sync.Map{}, promiseQueue: make(chan *promise, maxPromises), workerLimit: make(chan struct{}, cfg.maxWorkers), limitFunc: func(*taskmodel.Task, *taskmodel.Run) error { return nil }, // noop @@ -159,6 +160,8 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t e: e, } + go e.processScheduledTasks() + e.workerPool = sync.Pool{New: wm.new} return e, e.metrics } @@ -177,6 +180,9 @@ type Executor struct { // currentPromises are all the promises we are made that have not been fulfilled currentPromises sync.Map + // futurePromises are promises that are scheduled to be executed in the future + futurePromises sync.Map + // keep a pool of promise's we have in queue promiseQueue chan *promise @@ -191,6 +197,52 @@ type Executor struct { flagger feature.Flagger } +func (e *Executor) LoadExistingScheduleRuns(ctx context.Context) error { + tasks, _, err := e.ts.FindTasks(ctx, taskmodel.TaskFilter{}) + if err != nil { + e.log.Error("err finding tasks:", zap.Error(err)) + return err + } + for _, t := range tasks { + beforeTime := time.Now().Add(time.Hour * 24 * 365).Format(time.RFC3339) + runs, _, err := e.ts.FindRuns(ctx, taskmodel.RunFilter{Task: t.ID, BeforeTime: beforeTime}) + if err != nil { + e.log.Error("err finding runs:", zap.Error(err)) + return err + } + for _, run := range runs { + if run.ScheduledFor.After(time.Now()) { + perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID) + if err != nil { + e.log.Error("err finding perms:", zap.Error(err)) + return err + } + + ctx, cancel := context.WithCancel(ctx) + // create promise + p := &promise{ + run: run, + task: t, + auth: &influxdb.Authorization{ + Status: influxdb.Active, + UserID: t.OwnerID, + ID: platform.ID(1), + OrgID: t.OrganizationID, + Permissions: perm, + }, + createdAt: time.Now().UTC(), + done: make(chan struct{}), + ctx: ctx, + cancelFunc: cancel, + } + e.futurePromises.Store(run.ID, p) + } + } + } + + return nil +} + // SetLimitFunc sets the limit func for this task executor func (e *Executor) SetLimitFunc(l LimitFunc) { e.limitFunc = l @@ -241,6 +293,58 @@ func (e *Executor) ManualRun(ctx context.Context, id platform.ID, runID platform return p, err } +func (e *Executor) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error { + // create promises for any manual runs + r, err := e.tcs.StartManualRun(ctx, id, runID) + if err != nil { + return err + } + + auth, err := icontext.GetAuthorizer(ctx) + if err != nil { + return err + } + + // create a new context for running the task in the background so that returning the HTTP response does not cancel the + // context of the task to be run + ctx = icontext.SetAuthorizer(context.Background(), auth) + + span, ctx := tracing.StartSpanFromContext(ctx) + defer span.Finish() + + t, err := e.ts.FindTaskByID(ctx, r.TaskID) + if err != nil { + return err + } + + perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID) + if err != nil { + return err + } + + ctx, cancel := context.WithCancel(ctx) + // create promise + p := &promise{ + run: r, + task: t, + auth: &influxdb.Authorization{ + Status: influxdb.Active, + UserID: t.OwnerID, + ID: platform.ID(1), + OrgID: t.OrganizationID, + Permissions: perm, + }, + createdAt: time.Now().UTC(), + done: make(chan struct{}), + ctx: ctx, + cancelFunc: cancel, + } + e.metrics.manualRunsCounter.WithLabelValues(id.String()).Inc() + + e.futurePromises.Store(runID, p) + return nil +} + func (e *Executor) ResumeCurrentRun(ctx context.Context, id platform.ID, runID platform.ID) (Promise, error) { cr, err := e.tcs.CurrentlyRunning(ctx, id) if err != nil { @@ -363,6 +467,23 @@ func (e *Executor) createPromise(ctx context.Context, run *taskmodel.Run) (*prom return p, nil } +func (e *Executor) processScheduledTasks() { + t := time.Tick(1 * time.Second) + for range t { + e.futurePromises.Range(func(k any, v any) bool { + vv := v.(*promise) + if vv.run.ScheduledFor.Equal(time.Now()) || vv.run.ScheduledFor.Before(time.Now()) { + if vv.run.RunAt.IsZero() { + e.promiseQueue <- vv + e.futurePromises.Delete(k) + e.startWorker() + } + } + return true + }) + } +} + type workerMaker struct { e *Executor } @@ -445,9 +566,18 @@ func (w *worker) start(p *promise) { defer span.Finish() // add to run log - w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux)) + if err := w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux)); err != nil { + tid := zap.String("taskID", p.task.ID.String()) + rid := zap.String("runID", p.run.ID.String()) + w.e.log.With(zap.Error(err)).With(tid).With(rid).Warn("error adding run log: ") + } + // update run status - w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), taskmodel.RunStarted) + if err := w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), taskmodel.RunStarted); err != nil { + tid := zap.String("taskID", p.task.ID.String()) + rid := zap.String("runID", p.run.ID.String()) + w.e.log.With(zap.Error(err)).With(tid).With(rid).Warn("error updating run state: ") + } // add to metrics w.e.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt)) diff --git a/task/mock/executor.go b/task/mock/executor.go index fd02135c43..f1eb25a9c0 100644 --- a/task/mock/executor.go +++ b/task/mock/executor.go @@ -108,6 +108,10 @@ func (e *Executor) ManualRun(ctx context.Context, id platform.ID, runID platform return p, err } +func (e *Executor) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error { + return nil +} + func (e *Executor) Wait() { e.wg.Wait() }