fix: manually scheduled task runs now run when expected (#23664)
* fix: run manually scheduled tasks at their scheduled time * fix: actually use it * fix: get tests building * fix: fix tests * fix: lintpull/23812/head
parent
34254ee040
commit
a0f1184bb3
|
@ -461,6 +461,10 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
|
|||
combinedTaskService,
|
||||
executor.WithFlagger(m.flagger),
|
||||
)
|
||||
err = executor.LoadExistingScheduleRuns(ctx)
|
||||
if err != nil {
|
||||
m.log.Fatal("could not load existing scheduled runs", zap.Error(err))
|
||||
}
|
||||
m.executor = executor
|
||||
m.reg.MustRegister(executorMetrics.PrometheusCollectors()...)
|
||||
schLogger := m.log.With(zap.String("service", "task-scheduler"))
|
||||
|
|
|
@ -1012,6 +1012,15 @@ func (s *Service) findRunByID(ctx context.Context, tx Tx, taskID, runID platform
|
|||
runBytes, err := bucket.Get(key)
|
||||
if err != nil {
|
||||
if IsNotFound(err) {
|
||||
runs, err := s.manualRuns(ctx, tx, taskID)
|
||||
for _, run := range runs {
|
||||
if run.ID == runID {
|
||||
return run, nil
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return nil, taskmodel.ErrRunNotFound
|
||||
}
|
||||
return nil, taskmodel.ErrRunNotFound
|
||||
}
|
||||
return nil, taskmodel.ErrUnexpectedTaskBucketErr(err)
|
||||
|
|
|
@ -22,6 +22,7 @@ const DefaultLimit = 1000
|
|||
// Executor is an abstraction of the task executor with only the functions needed by the coordinator
|
||||
type Executor interface {
|
||||
ManualRun(ctx context.Context, id platform.ID, runID platform.ID) (executor.Promise, error)
|
||||
ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error
|
||||
Cancel(ctx context.Context, runID platform.ID) error
|
||||
}
|
||||
|
||||
|
@ -149,7 +150,7 @@ func (c *Coordinator) TaskUpdated(ctx context.Context, from, to *taskmodel.Task)
|
|||
return nil
|
||||
}
|
||||
|
||||
//TaskDeleted asks the Scheduler to release the deleted task
|
||||
// TaskDeleted asks the Scheduler to release the deleted task
|
||||
func (c *Coordinator) TaskDeleted(ctx context.Context, id platform.ID) error {
|
||||
tid := scheduler.ID(id)
|
||||
if err := c.sch.Release(tid); err != nil && err != taskmodel.ErrTaskNotClaimed {
|
||||
|
@ -166,14 +167,19 @@ func (c *Coordinator) RunCancelled(ctx context.Context, runID platform.ID) error
|
|||
return err
|
||||
}
|
||||
|
||||
// RunForced speaks directly to the Executor to run a task immediately
|
||||
// RunForced speaks directly to the Executor to run a task immediately, or schedule the run if `scheduledFor` is set.
|
||||
func (c *Coordinator) RunForced(ctx context.Context, task *taskmodel.Task, run *taskmodel.Run) error {
|
||||
// the returned promise is not used, since clients expect the HTTP server to return immediately after scheduling the
|
||||
// task rather than waiting for the task to finish
|
||||
_, err := c.ex.ManualRun(ctx, task.ID, run.ID)
|
||||
var err error
|
||||
if !run.ScheduledFor.IsZero() {
|
||||
err = c.ex.ScheduleManualRun(ctx, task.ID, run.ID)
|
||||
} else {
|
||||
// the returned promise is not used, since clients expect the HTTP server to return immediately after scheduling the
|
||||
// task rather than waiting for the task to finish
|
||||
_, err = c.ex.ManualRun(ctx, task.ID, run.ID)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return taskmodel.ErrRunExecutionError(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -20,12 +20,13 @@ func Test_Coordinator_Executor_Methods(t *testing.T) {
|
|||
taskOne = &taskmodel.Task{ID: one}
|
||||
|
||||
runOne = &taskmodel.Run{
|
||||
ID: one,
|
||||
TaskID: one,
|
||||
ScheduledFor: time.Now(),
|
||||
ID: one,
|
||||
TaskID: one,
|
||||
}
|
||||
|
||||
allowUnexported = cmp.AllowUnexported(executorE{}, schedulerC{}, SchedulableTask{})
|
||||
|
||||
scheduledTime = time.Now()
|
||||
)
|
||||
|
||||
for _, test := range []struct {
|
||||
|
@ -45,7 +46,22 @@ func Test_Coordinator_Executor_Methods(t *testing.T) {
|
|||
},
|
||||
executor: &executorE{
|
||||
calls: []interface{}{
|
||||
manualRunCall{taskOne.ID, runOne.ID},
|
||||
manualRunCall{taskOne.ID, runOne.ID, false},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "RunForcedScheduled",
|
||||
call: func(t *testing.T, c *Coordinator) {
|
||||
rr := runOne
|
||||
rr.ScheduledFor = scheduledTime
|
||||
if err := c.RunForced(context.Background(), taskOne, runOne); err != nil {
|
||||
t.Errorf("expected nil error found %q", err)
|
||||
}
|
||||
},
|
||||
executor: &executorE{
|
||||
calls: []interface{}{
|
||||
manualRunCall{taskOne.ID, runOne.ID, true},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -17,8 +17,9 @@ type (
|
|||
}
|
||||
|
||||
manualRunCall struct {
|
||||
TaskID platform.ID
|
||||
RunID platform.ID
|
||||
TaskID platform.ID
|
||||
RunID platform.ID
|
||||
WasScheduled bool
|
||||
}
|
||||
|
||||
cancelCallC struct {
|
||||
|
@ -96,7 +97,7 @@ func (s *schedulerC) Release(taskID scheduler.ID) error {
|
|||
}
|
||||
|
||||
func (e *executorE) ManualRun(ctx context.Context, id platform.ID, runID platform.ID) (executor.Promise, error) {
|
||||
e.calls = append(e.calls, manualRunCall{id, runID})
|
||||
e.calls = append(e.calls, manualRunCall{id, runID, false})
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
p := promise{
|
||||
done: make(chan struct{}),
|
||||
|
@ -109,6 +110,11 @@ func (e *executorE) ManualRun(ctx context.Context, id platform.ID, runID platfor
|
|||
return &p, err
|
||||
}
|
||||
|
||||
func (e *executorE) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error {
|
||||
e.calls = append(e.calls, manualRunCall{id, runID, true})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *executorE) Cancel(ctx context.Context, runID platform.ID) error {
|
||||
e.calls = append(e.calls, cancelCallC{runID})
|
||||
return nil
|
||||
|
|
|
@ -145,6 +145,7 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t
|
|||
ps: us,
|
||||
|
||||
currentPromises: sync.Map{},
|
||||
futurePromises: sync.Map{},
|
||||
promiseQueue: make(chan *promise, maxPromises),
|
||||
workerLimit: make(chan struct{}, cfg.maxWorkers),
|
||||
limitFunc: func(*taskmodel.Task, *taskmodel.Run) error { return nil }, // noop
|
||||
|
@ -159,6 +160,8 @@ func NewExecutor(log *zap.Logger, qs query.QueryService, us PermissionService, t
|
|||
e: e,
|
||||
}
|
||||
|
||||
go e.processScheduledTasks()
|
||||
|
||||
e.workerPool = sync.Pool{New: wm.new}
|
||||
return e, e.metrics
|
||||
}
|
||||
|
@ -177,6 +180,9 @@ type Executor struct {
|
|||
// currentPromises are all the promises we are made that have not been fulfilled
|
||||
currentPromises sync.Map
|
||||
|
||||
// futurePromises are promises that are scheduled to be executed in the future
|
||||
futurePromises sync.Map
|
||||
|
||||
// keep a pool of promise's we have in queue
|
||||
promiseQueue chan *promise
|
||||
|
||||
|
@ -191,6 +197,52 @@ type Executor struct {
|
|||
flagger feature.Flagger
|
||||
}
|
||||
|
||||
func (e *Executor) LoadExistingScheduleRuns(ctx context.Context) error {
|
||||
tasks, _, err := e.ts.FindTasks(ctx, taskmodel.TaskFilter{})
|
||||
if err != nil {
|
||||
e.log.Error("err finding tasks:", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
for _, t := range tasks {
|
||||
beforeTime := time.Now().Add(time.Hour * 24 * 365).Format(time.RFC3339)
|
||||
runs, _, err := e.ts.FindRuns(ctx, taskmodel.RunFilter{Task: t.ID, BeforeTime: beforeTime})
|
||||
if err != nil {
|
||||
e.log.Error("err finding runs:", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
for _, run := range runs {
|
||||
if run.ScheduledFor.After(time.Now()) {
|
||||
perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID)
|
||||
if err != nil {
|
||||
e.log.Error("err finding perms:", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
// create promise
|
||||
p := &promise{
|
||||
run: run,
|
||||
task: t,
|
||||
auth: &influxdb.Authorization{
|
||||
Status: influxdb.Active,
|
||||
UserID: t.OwnerID,
|
||||
ID: platform.ID(1),
|
||||
OrgID: t.OrganizationID,
|
||||
Permissions: perm,
|
||||
},
|
||||
createdAt: time.Now().UTC(),
|
||||
done: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
cancelFunc: cancel,
|
||||
}
|
||||
e.futurePromises.Store(run.ID, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetLimitFunc sets the limit func for this task executor
|
||||
func (e *Executor) SetLimitFunc(l LimitFunc) {
|
||||
e.limitFunc = l
|
||||
|
@ -241,6 +293,58 @@ func (e *Executor) ManualRun(ctx context.Context, id platform.ID, runID platform
|
|||
return p, err
|
||||
}
|
||||
|
||||
func (e *Executor) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error {
|
||||
// create promises for any manual runs
|
||||
r, err := e.tcs.StartManualRun(ctx, id, runID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
auth, err := icontext.GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// create a new context for running the task in the background so that returning the HTTP response does not cancel the
|
||||
// context of the task to be run
|
||||
ctx = icontext.SetAuthorizer(context.Background(), auth)
|
||||
|
||||
span, ctx := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
t, err := e.ts.FindTaskByID(ctx, r.TaskID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
perm, err := e.ps.FindPermissionForUser(ctx, t.OwnerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
// create promise
|
||||
p := &promise{
|
||||
run: r,
|
||||
task: t,
|
||||
auth: &influxdb.Authorization{
|
||||
Status: influxdb.Active,
|
||||
UserID: t.OwnerID,
|
||||
ID: platform.ID(1),
|
||||
OrgID: t.OrganizationID,
|
||||
Permissions: perm,
|
||||
},
|
||||
createdAt: time.Now().UTC(),
|
||||
done: make(chan struct{}),
|
||||
ctx: ctx,
|
||||
cancelFunc: cancel,
|
||||
}
|
||||
e.metrics.manualRunsCounter.WithLabelValues(id.String()).Inc()
|
||||
|
||||
e.futurePromises.Store(runID, p)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Executor) ResumeCurrentRun(ctx context.Context, id platform.ID, runID platform.ID) (Promise, error) {
|
||||
cr, err := e.tcs.CurrentlyRunning(ctx, id)
|
||||
if err != nil {
|
||||
|
@ -363,6 +467,23 @@ func (e *Executor) createPromise(ctx context.Context, run *taskmodel.Run) (*prom
|
|||
return p, nil
|
||||
}
|
||||
|
||||
func (e *Executor) processScheduledTasks() {
|
||||
t := time.Tick(1 * time.Second)
|
||||
for range t {
|
||||
e.futurePromises.Range(func(k any, v any) bool {
|
||||
vv := v.(*promise)
|
||||
if vv.run.ScheduledFor.Equal(time.Now()) || vv.run.ScheduledFor.Before(time.Now()) {
|
||||
if vv.run.RunAt.IsZero() {
|
||||
e.promiseQueue <- vv
|
||||
e.futurePromises.Delete(k)
|
||||
e.startWorker()
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type workerMaker struct {
|
||||
e *Executor
|
||||
}
|
||||
|
@ -445,9 +566,18 @@ func (w *worker) start(p *promise) {
|
|||
defer span.Finish()
|
||||
|
||||
// add to run log
|
||||
w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux))
|
||||
if err := w.e.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now().UTC(), fmt.Sprintf("Started task from script: %q", p.task.Flux)); err != nil {
|
||||
tid := zap.String("taskID", p.task.ID.String())
|
||||
rid := zap.String("runID", p.run.ID.String())
|
||||
w.e.log.With(zap.Error(err)).With(tid).With(rid).Warn("error adding run log: ")
|
||||
}
|
||||
|
||||
// update run status
|
||||
w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), taskmodel.RunStarted)
|
||||
if err := w.e.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now().UTC(), taskmodel.RunStarted); err != nil {
|
||||
tid := zap.String("taskID", p.task.ID.String())
|
||||
rid := zap.String("runID", p.run.ID.String())
|
||||
w.e.log.With(zap.Error(err)).With(tid).With(rid).Warn("error updating run state: ")
|
||||
}
|
||||
|
||||
// add to metrics
|
||||
w.e.metrics.StartRun(p.task, time.Since(p.createdAt), time.Since(p.run.RunAt))
|
||||
|
|
|
@ -108,6 +108,10 @@ func (e *Executor) ManualRun(ctx context.Context, id platform.ID, runID platform
|
|||
return p, err
|
||||
}
|
||||
|
||||
func (e *Executor) ScheduleManualRun(ctx context.Context, id platform.ID, runID platform.ID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Executor) Wait() {
|
||||
e.wg.Wait()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue