feat(task): Create a new task executor for the new modular scheduler (#14252)

* feat(task): create new executor for modular scheduler
pull/14246/head^2
Lyon Hill 2019-07-08 08:13:29 -06:00 committed by GitHub
parent 31bc5a2ded
commit c8becfd4a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1068 additions and 17 deletions

View File

@ -1263,6 +1263,51 @@ func (s *Service) createNextRun(ctx context.Context, tx Tx, taskID influxdb.ID,
}, nil
}
// CreateRun creates a run with a scheduledFor time as now.
func (s *Service) CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) {
var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.createRun(ctx, tx, taskID, scheduledFor)
if err != nil {
return err
}
r = run
return nil
})
return r, err
}
func (s *Service) createRun(ctx context.Context, tx Tx, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) {
id := s.IDGenerator.ID()
run := influxdb.Run{
ID: id,
TaskID: taskID,
ScheduledFor: scheduledFor.Format(time.RFC3339),
Status: backend.RunScheduled.String(),
Log: []influxdb.Log{},
}
b, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
runBytes, err := json.Marshal(run)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
runKey, err := taskRunKey(taskID, run.ID)
if err != nil {
return nil, err
}
if err := b.Put(runKey, runBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
return &run, nil
}
func (s *Service) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
var runs []*influxdb.Run
err := s.kv.View(ctx, func(tx Tx) error {
@ -1359,10 +1404,82 @@ func (s *Service) manualRuns(ctx context.Context, tx Tx, taskID influxdb.ID) ([]
if err := json.Unmarshal(val, &runs); err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
return runs, nil
}
func (s *Service) StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
var r *influxdb.Run
err := s.kv.Update(ctx, func(tx Tx) error {
run, err := s.startManualRun(ctx, tx, taskID, runID)
if err != nil {
return err
}
r = run
return nil
})
return r, err
}
func (s *Service) startManualRun(ctx context.Context, tx Tx, taskID, runID influxdb.ID) (*influxdb.Run, error) {
mRuns, err := s.manualRuns(ctx, tx, taskID)
if err != nil {
return nil, &influxdb.ErrRunNotFound
}
if len(mRuns) < 1 {
return nil, &influxdb.ErrRunNotFound
}
var run *influxdb.Run
for i, r := range mRuns {
if r.ID == runID {
run = r
mRuns = append(mRuns[:i], mRuns[i+1:]...)
}
}
if run == nil {
return nil, &influxdb.ErrRunNotFound
}
// save manual runs
mRunsBytes, err := json.Marshal(mRuns)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
runsKey, err := taskManualRunKey(taskID)
if err != nil {
return nil, err
}
b, err := tx.Bucket(taskRunBucket)
if err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
if err := b.Put(runsKey, mRunsBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
// add mRun to the list of currently running
mRunBytes, err := json.Marshal(run)
if err != nil {
return nil, influxdb.ErrInternalTaskServiceError(err)
}
runKey, err := taskRunKey(taskID, run.ID)
if err != nil {
return nil, err
}
if err := b.Put(runKey, mRunBytes); err != nil {
return nil, influxdb.ErrUnexpectedTaskBucketErr(err)
}
return run, nil
}
// FinishRun removes runID from the list of running tasks and if its `now` is later then last completed update it.
func (s *Service) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
var run *influxdb.Run

View File

@ -0,0 +1,66 @@
package executor
import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
"github.com/prometheus/client_golang/prometheus"
)
type ExecutorMetrics struct {
totalRunsComplete *prometheus.CounterVec
totalRunsActive prometheus.Gauge
queueDelta prometheus.Summary
}
func NewExecutorMetrics() *ExecutorMetrics {
const namespace = "task"
const subsystem = "executor"
return &ExecutorMetrics{
totalRunsComplete: prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_runs_complete",
Help: "Total number of runs completed across all tasks, split out by success or failure.",
}, []string{"status"}),
totalRunsActive: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "total_runs_active",
Help: "Total number of runs across all tasks that have started but not yet completed.",
}),
queueDelta: prometheus.NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "run_queue_delta",
Help: "The duration in seconds between a run being due to start and actually starting.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
}
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (em *ExecutorMetrics) PrometheusCollectors() []prometheus.Collector {
return []prometheus.Collector{
em.totalRunsComplete,
em.totalRunsActive,
em.queueDelta,
}
}
// StartRun adjusts the metrics to indicate a run is in progress for the given task ID.
// We are also storing the delta time between when a run is due to start and actually starting.
func (em *ExecutorMetrics) StartRun(taskID influxdb.ID, queueDelta time.Duration) {
em.totalRunsActive.Inc()
em.queueDelta.Observe(queueDelta.Seconds())
}
// FinishRun adjusts the metrics to indicate a run is no longer in progress for the given task ID.
func (em *ExecutorMetrics) FinishRun(taskID influxdb.ID, status backend.RunStatus) {
em.totalRunsActive.Dec()
em.totalRunsComplete.WithLabelValues(status.String()).Inc()
}

View File

@ -1,4 +1,4 @@
package executor_test
package executor
import (
"context"
@ -22,7 +22,6 @@ import (
"github.com/influxdata/influxdb/query"
_ "github.com/influxdata/influxdb/query/builtin"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/executor"
"go.uber.org/zap"
)
@ -96,8 +95,15 @@ func (s *fakeQueryService) SucceedQuery(script string) {
defer s.mu.Unlock()
// Unblock the flux.
spec := makeASTString(makeAST(script))
close(s.queries[spec].wait)
ast := makeAST(script)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
ast.Now = ast.Now.UTC()
spec = makeASTString(ast)
fq = s.queries[spec]
}
close(fq.wait)
delete(s.queries, spec)
}
@ -107,9 +113,16 @@ func (s *fakeQueryService) FailQuery(script string, forced error) {
defer s.mu.Unlock()
// Unblock the flux.
spec := makeASTString(makeAST(script))
s.queries[spec].forcedError = forced
close(s.queries[spec].wait)
ast := makeAST(script)
spec := makeASTString(ast)
fq, ok := s.queries[spec]
if !ok {
ast.Now = ast.Now.UTC()
spec = makeASTString(ast)
fq = s.queries[spec]
}
fq.forcedError = forced
close(fq.wait)
delete(s.queries, spec)
}
@ -125,7 +138,11 @@ func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
t.Helper()
const attempts = 10
spec := makeASTString(makeAST(script))
ast := makeAST(script)
astUTC := makeAST(script)
astUTC.Now = ast.Now.UTC()
spec := makeASTString(ast)
specUTC := makeASTString(astUTC)
for i := 0; i < attempts; i++ {
if i != 0 {
time.Sleep(5 * time.Millisecond)
@ -137,6 +154,13 @@ func (s *fakeQueryService) WaitForQueryLive(t *testing.T, script string) {
if ok {
return
}
s.mu.Lock()
_, ok = s.queries[specUTC]
s.mu.Unlock()
if ok {
return
}
}
t.Fatalf("Did not see live query %q in time", script)
@ -253,7 +277,7 @@ func createAsyncSystem() *system {
name: "AsyncExecutor",
svc: svc,
ts: i,
ex: executor.NewAsyncQueryServiceExecutor(zap.NewNop(), svc, i, i),
ex: NewAsyncQueryServiceExecutor(zap.NewNop(), svc, i, i),
i: i,
}
}
@ -269,7 +293,7 @@ func createSyncSystem() *system {
name: "SynchronousExecutor",
svc: svc,
ts: i,
ex: executor.NewQueryServiceExecutor(
ex: NewQueryServiceExecutor(
zap.NewNop(),
query.QueryServiceBridge{
AsyncQueryService: svc,

View File

@ -0,0 +1,441 @@
package executor
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/scheduler"
"go.uber.org/zap"
)
// MultiLimit allows us to create a single limit func that applies more then one limit.
func MultiLimit(limits ...LimitFunc) LimitFunc {
return func(run *influxdb.Run) error {
for _, lf := range limits {
if err := lf(run); err != nil {
return err
}
}
return nil
}
}
// LimitFunc is a function the executor will use to
type LimitFunc func(*influxdb.Run) error
type Metrics interface {
StartRun(influxdb.ID, time.Duration)
FinishRun(influxdb.ID, backend.RunStatus)
}
// NewExecutor creates a new task executor
func NewExecutor(logger *zap.Logger, qs query.QueryService, as influxdb.AuthorizationService, ts influxdb.TaskService, tcs backend.TaskControlService, metrics Metrics) *TaskExecutor {
te := &TaskExecutor{
logger: logger,
ts: ts,
tcs: tcs,
qs: qs,
as: as,
metrics: metrics,
currentPromises: sync.Map{},
promiseQueue: make(chan *Promise, 1000), //TODO(lh): make this configurable
workerLimit: make(chan struct{}, 100), //TODO(lh): make this configurable
limitFunc: func(*influxdb.Run) error { return nil }, // noop
}
wm := &workerMaker{
te: te,
}
te.workerPool = sync.Pool{New: wm.new}
return te
}
// TaskExecutor it a task specific executor that works with the new scheduler system.
type TaskExecutor struct {
logger *zap.Logger
ts influxdb.TaskService
tcs backend.TaskControlService
qs query.QueryService
as influxdb.AuthorizationService
metrics Metrics
// currentPromises are all the promises we are made that have not been fulfilled
currentPromises sync.Map
// keep a pool of promise's we have in queue
promiseQueue chan *Promise
limitFunc LimitFunc
// keep a pool of execution workers.
workerPool sync.Pool
workerLimit chan struct{}
}
// SetLimitFunc sets the limit func for this task executor
func (e *TaskExecutor) SetLimitFunc(l LimitFunc) {
e.limitFunc = l
}
// Execute begins execution for the tasks id with a specific scheduledAt time.
// When we execute we will first build a run for the scheduledAt time,
// We then want to add to the queue anything that was manually queued to run.
// If the queue is full the call to execute should hang and apply back pressure to the caller
// We then start a worker to work the newly queued jobs.
func (e *TaskExecutor) Execute(ctx context.Context, id scheduler.ID, scheduledAt time.Time) (*Promise, error) {
iid := influxdb.ID(id)
var p *Promise
var err error
// look for manual run by scheduledAt
p, err = e.startManualRun(ctx, iid, scheduledAt)
if err == nil && p != nil {
goto PROMISEMADE
}
// look in currentlyrunning
p, err = e.resumeRun(ctx, iid, scheduledAt)
if err == nil && p != nil {
goto PROMISEMADE
}
// create a run
p, err = e.createRun(ctx, iid, scheduledAt)
if err != nil {
return nil, err
}
PROMISEMADE:
// see if have available workers
select {
case e.workerLimit <- struct{}{}:
default:
// we have reached our worker limit and we cannot start any more.
return p, nil
}
// fire up some workers
worker := e.workerPool.Get().(*worker)
if worker != nil {
// if the worker is nil all the workers are busy and one of them will pick up the work we enqueued.
go func() {
// don't forget to put the worker back when we are done
defer e.workerPool.Put(worker)
worker.work()
// remove a struct from the worker limit to another worker to work
<-e.workerLimit
}()
}
return p, nil
}
func (e *TaskExecutor) startManualRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*Promise, error) {
// create promises for any manual runs
mr, err := e.tcs.ManualRuns(ctx, id)
if err != nil {
return nil, err
}
for _, run := range mr {
sa, err := run.ScheduledForTime()
if err == nil && sa.UTC() == scheduledAt.UTC() {
r, err := e.tcs.StartManualRun(ctx, id, run.ID)
if err != nil {
fmt.Println("err", err)
return nil, err
}
return e.createPromise(ctx, r)
}
}
return nil, &influxdb.ErrRunNotFound
}
func (e *TaskExecutor) resumeRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*Promise, error) {
cr, err := e.tcs.CurrentlyRunning(ctx, id)
if err != nil {
return nil, err
}
for _, run := range cr {
sa, err := run.ScheduledForTime()
if err == nil && sa.UTC() == scheduledAt.UTC() {
if currentPromise, ok := e.currentPromises.Load(run.ID); ok {
// if we already have a promise we should just return that
return currentPromise.(*Promise), nil
}
return e.createPromise(ctx, run)
}
}
return nil, &influxdb.ErrRunNotFound
}
func (e *TaskExecutor) createRun(ctx context.Context, id influxdb.ID, scheduledAt time.Time) (*Promise, error) {
r, err := e.tcs.CreateRun(ctx, id, scheduledAt)
if err != nil {
return nil, err
}
return e.createPromise(ctx, r)
}
// Cancel a run of a specific task. promiseID is the id of the run object
func (e *TaskExecutor) Cancel(ctx context.Context, promiseID scheduler.ID) error {
// find the promise
val, ok := e.currentPromises.Load(influxdb.ID(promiseID))
if !ok {
return nil
}
promise := val.(*Promise)
// call cancel on it.
promise.Cancel(ctx)
return nil
}
func (e *TaskExecutor) createPromise(ctx context.Context, run *influxdb.Run) (*Promise, error) {
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
t, err := e.ts.FindTaskByID(ctx, run.TaskID)
if err != nil {
return nil, err
}
auth, err := e.as.FindAuthorizationByID(ctx, t.AuthorizationID)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(ctx)
// create promise
p := &Promise{
run: run,
task: t,
auth: auth,
done: make(chan struct{}),
ctx: ctx,
cancelFunc: cancel,
}
// insert promise into queue to be worked
// when the queue gets full we will hand and apply back pressure to the scheduler
e.promiseQueue <- p
// insert the promise into the registry
e.currentPromises.Store(run.ID, p)
return p, nil
}
type workerMaker struct {
te *TaskExecutor
}
func (wm *workerMaker) new() interface{} {
return &worker{wm.te}
}
type worker struct {
te *TaskExecutor
}
func (w *worker) work() {
// loop until we have no more work to do in the promise queue
for {
var prom *Promise
// check to see if we can execute
select {
case p, ok := <-w.te.promiseQueue:
if !ok {
// the promiseQueue has been closed
return
}
prom = p
default:
// if nothing is left in the queue we are done
return
}
// check to make sure we are below the limits.
for {
err := w.te.limitFunc(prom.run)
if err == nil {
break
}
// add to the run log
w.te.tcs.AddRunLog(prom.ctx, prom.task.ID, prom.run.ID, time.Now(), fmt.Sprintf("Task limit reached: %s", err.Error()))
// sleep
select {
// If done the promise was canceled
case <-prom.ctx.Done():
w.te.tcs.AddRunLog(prom.ctx, prom.task.ID, prom.run.ID, time.Now(), "Run canceled")
w.te.tcs.UpdateRunState(prom.ctx, prom.task.ID, prom.run.ID, time.Now(), backend.RunCanceled)
prom.err = &influxdb.ErrRunCanceled
close(prom.done)
return
case <-time.After(time.Second):
}
}
// execute the promise
w.executeQuery(prom)
// close promise done channel and set appropriate error
close(prom.done)
// remove promise from registry
w.te.currentPromises.Delete(prom.run.ID)
}
}
func (w *worker) start(p *Promise) {
// trace
span, ctx := tracing.StartSpanFromContext(p.ctx)
defer span.Finish()
// add to run log
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), fmt.Sprintf("Started task from script: %q", p.task.Flux))
// update run status
w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now(), backend.RunStarted)
// add to metrics
s, _ := p.run.ScheduledForTime()
w.te.metrics.StartRun(p.task.ID, time.Since(s))
}
func (w *worker) finish(p *Promise, rs backend.RunStatus, err error) {
// trace
span, ctx := tracing.StartSpanFromContext(p.ctx)
defer span.Finish()
// add to run log
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), fmt.Sprintf("Completed(%s)", rs.String()))
// update run status
w.te.tcs.UpdateRunState(ctx, p.task.ID, p.run.ID, time.Now(), rs)
// add to metrics
w.te.metrics.FinishRun(p.task.ID, rs)
// log error
if err != nil {
w.te.logger.Debug("execution failed", zap.Error(err), zap.String("taskID", p.task.ID.String()))
p.err = err
} else {
w.te.logger.Debug("Completed successfully", zap.String("taskID", p.task.ID.String()))
}
}
func (w *worker) executeQuery(p *Promise) {
span, ctx := tracing.StartSpanFromContext(p.ctx)
defer span.Finish()
// start
w.start(p)
pkg, err := flux.Parse(p.task.Flux)
if err != nil {
w.finish(p, backend.RunFail, err)
return
}
sf, err := p.run.ScheduledForTime()
if err != nil {
w.finish(p, backend.RunFail, err)
return
}
req := &query.Request{
Authorization: p.auth,
OrganizationID: p.task.OrganizationID,
Compiler: lang.ASTCompiler{
AST: pkg,
Now: sf,
},
}
it, err := w.te.qs.Query(ctx, req)
if err != nil {
// Assume the error should not be part of the runResult.
w.finish(p, backend.RunFail, err)
return
}
// Drain the result iterator.
for it.More() {
// Consume the full iterator so that we don't leak outstanding iterators.
res := it.Next()
if err := exhaustResultIterators(res); err != nil {
w.te.logger.Info("Error exhausting result iterator", zap.Error(err), zap.String("name", res.Name()))
}
}
it.Release()
// log the statistics on the run
stats := it.Statistics()
b, err := json.Marshal(stats)
if err == nil {
w.te.tcs.AddRunLog(p.ctx, p.task.ID, p.run.ID, time.Now(), string(b))
}
w.finish(p, backend.RunSuccess, it.Err())
}
// Promise represents a promise the executor makes to finish a run's execution asynchronously.
type Promise struct {
run *influxdb.Run
task *influxdb.Task
auth *influxdb.Authorization
done chan struct{}
err error
ctx context.Context
cancelFunc context.CancelFunc
}
// ID is the id of the run that was created
func (p *Promise) ID() scheduler.ID {
return scheduler.ID(p.run.ID)
}
// Cancel is used to cancel a executing query
func (p *Promise) Cancel(ctx context.Context) {
// call cancelfunc
p.cancelFunc()
// wait for ctx.Done or p.Done
select {
case <-p.Done():
case <-ctx.Done():
}
}
// Done provides a channel that closes on completion of a rpomise
func (p *Promise) Done() <-chan struct{} {
return p.done
}
// Error returns the error resulting from a run execution.
// If the execution is not complete error waits on Done().
func (p *Promise) Error() error {
<-p.done
return p.err
}

View File

@ -0,0 +1,360 @@
package executor
import (
"context"
"errors"
"fmt"
"testing"
"time"
"github.com/influxdata/influxdb"
platform "github.com/influxdata/influxdb"
icontext "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/inmem"
"github.com/influxdata/influxdb/kit/prom"
"github.com/influxdata/influxdb/kit/prom/promtest"
"github.com/influxdata/influxdb/kv"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/scheduler"
"go.uber.org/zap/zaptest"
)
type tes struct {
svc *fakeQueryService
ex *TaskExecutor
i *kv.Service
tc testCreds
}
type noopMetrics struct{}
func (*noopMetrics) StartRun(influxdb.ID, time.Duration) {}
func (*noopMetrics) FinishRun(influxdb.ID, backend.RunStatus) {}
func taskExecutorSystem(t *testing.T) tes {
aqs := newFakeQueryService()
qs := query.QueryServiceBridge{
AsyncQueryService: aqs,
}
i := kv.NewService(inmem.NewKVStore())
ex := NewExecutor(zaptest.NewLogger(t), qs, i, i, i, &noopMetrics{})
return tes{
svc: aqs,
ex: ex,
i: i,
tc: createCreds(t, i),
}
}
func TestTaskExecutor(t *testing.T) {
t.Run("QuerySuccess", testQuerySuccess)
t.Run("QueryFailure", testQueryFailure)
t.Run("ManualRun", testManualRun)
t.Run("ResumeRun", testResumingRun)
t.Run("WorkerLimit", testWorkerLimit)
t.Run("LimitFunc", testLimitFunc)
t.Run("Metrics", testMetrics)
}
func testQuerySuccess(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
promiseID := influxdb.ID(promise.ID())
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
if err != nil {
t.Fatal(err)
}
if run.ID != promiseID {
t.Fatal("promise and run dont match")
}
tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
<-promise.Done()
if got := promise.Error(); got != nil {
t.Fatal(got)
}
}
func testQueryFailure(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
promiseID := influxdb.ID(promise.ID())
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
if err != nil {
t.Fatal(err)
}
if run.ID != promiseID {
t.Fatal("promise and run dont match")
}
tes.svc.WaitForQueryLive(t, script)
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
<-promise.Done()
if got := promise.Error(); got == nil {
t.Fatal("got no error when I should have")
}
}
func testManualRun(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
manualRun, err := tes.i.ForceRun(ctx, task.ID, 123)
if err != nil {
t.Fatal(err)
}
mr, err := tes.i.ManualRuns(ctx, task.ID)
if err != nil {
t.Fatal(err)
}
if len(mr) != 1 {
t.Fatal("manual run not created by force run")
}
promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
promiseID := influxdb.ID(promise.ID())
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
if err != nil {
t.Fatal(err)
}
if run.ID != promiseID || manualRun.ID != promiseID {
t.Fatal("promise and run and manual run dont match")
}
tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
if got := promise.Error(); got != nil {
t.Fatal(got)
}
}
func testResumingRun(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
stalledRun, err := tes.i.CreateRun(ctx, task.ID, time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
promiseID := influxdb.ID(promise.ID())
// ensure that it doesn't recreate a promise
promise2, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
if promise2 != promise {
t.Fatal("executing a current promise for a task that is already running created a new promise")
}
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
if err != nil {
t.Fatal(err)
}
if run.ID != promiseID || stalledRun.ID != promiseID {
t.Fatal("promise and run and manual run dont match")
}
tes.svc.WaitForQueryLive(t, script)
tes.svc.SucceedQuery(script)
if got := promise.Error(); got != nil {
t.Fatal(got)
}
}
func testWorkerLimit(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
if len(tes.ex.workerLimit) != 1 {
t.Fatal("expected a worker to be started")
}
tes.svc.WaitForQueryLive(t, script)
tes.svc.FailQuery(script, errors.New("blargyblargblarg"))
<-promise.Done()
if got := promise.Error(); got == nil {
t.Fatal("got no error when I should have")
}
}
func testLimitFunc(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
forcedErr := errors.New("forced")
tes.svc.FailNextQuery(forcedErr)
count := 0
tes.ex.SetLimitFunc(func(*influxdb.Run) error {
count++
if count < 2 {
return errors.New("not there yet")
}
return nil
})
promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
<-promise.Done()
if got := promise.Error(); got != forcedErr {
t.Fatal("failed to get failure from forced error")
}
if count != 2 {
t.Fatalf("failed to call limitFunc enough times: %d", count)
}
}
func testMetrics(t *testing.T) {
t.Parallel()
tes := taskExecutorSystem(t)
metrics := NewExecutorMetrics()
tes.ex.metrics = metrics
reg := prom.NewRegistry()
reg.MustRegister(metrics.PrometheusCollectors()...)
mg := promtest.MustGather(t, reg)
m := promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
if got := *m.Gauge.Value; got != 0 {
t.Fatalf("expected 0 total runs active, got %v", got)
}
script := fmt.Sprintf(fmtTestScript, t.Name())
ctx := icontext.SetAuthorizer(context.Background(), tes.tc.Auth)
task, err := tes.i.CreateTask(ctx, platform.TaskCreate{OrganizationID: tes.tc.OrgID, Token: tes.tc.Auth.Token, Flux: script})
if err != nil {
t.Fatal(err)
}
promise, err := tes.ex.Execute(ctx, scheduler.ID(task.ID), time.Unix(123, 0))
if err != nil {
t.Fatal(err)
}
promiseID := influxdb.ID(promise.ID())
run, err := tes.i.FindRunByID(context.Background(), task.ID, promiseID)
if err != nil {
t.Fatal(err)
}
if run.ID != promiseID {
t.Fatal("promise and run dont match")
}
tes.svc.WaitForQueryLive(t, script)
mg = promtest.MustGather(t, reg)
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 total runs active, got %v", got)
}
tes.svc.SucceedQuery(script)
<-promise.Done()
mg = promtest.MustGather(t, reg)
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_complete", map[string]string{"status": "success"})
if got := *m.Counter.Value; got != 1 {
t.Fatalf("expected 1 active runs, got %v", got)
}
m = promtest.MustFindMetric(t, mg, "task_executor_total_runs_active", nil)
if got := *m.Gauge.Value; got != 0 {
t.Fatalf("expected 0 total runs active, got %v", got)
}
if got := promise.Error(); got != nil {
t.Fatal(got)
}
}

View File

@ -17,16 +17,25 @@ type TaskControlService interface {
// If the run's ScheduledFor would be later than the passed-in now, CreateNextRun returns an ErrRunNotDueYet.
CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (RunCreation, error)
CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
// FinishRun removes runID from the list of running tasks and if its `ScheduledFor` is later then last completed update it.
FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
// NextDueRun returns the Unix timestamp of when the next call to CreateNextRun will be ready.
// The returned timestamp reflects the task's offset, so it does not necessarily exactly match the schedule time.
NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error)
// CreateRun creates a run with a schedule for time.
// This differes from CreateNextRun in that it should not to use some scheduling system to determin when the run
// should happen.
// TODO(lh): remove comment once we no longer need create next run.
CreateRun(ctx context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error)
CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
// StartManualRun pulls a manual run from the list and moves it to currently running.
StartManualRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
// FinishRun removes runID from the list of running tasks and if its `ScheduledFor` is later then last completed update it.
FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
// UpdateRunState sets the run state at the respective time.
UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state RunStatus) error

View File

@ -163,6 +163,40 @@ func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (back
}, nil
}
func (t *TaskControlService) CreateRun(_ context.Context, taskID influxdb.ID, scheduledFor time.Time) (*influxdb.Run, error) {
t.mu.Lock()
defer t.mu.Unlock()
runID := idgen.ID()
runs, ok := t.runs[taskID]
if !ok {
runs = make(map[influxdb.ID]*influxdb.Run)
}
runs[runID] = &influxdb.Run{
ID: runID,
ScheduledFor: scheduledFor.Format(time.RFC3339),
}
t.runs[taskID] = runs
return runs[runID], nil
}
func (t *TaskControlService) StartManualRun(_ context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
t.mu.Lock()
defer t.mu.Unlock()
var run *influxdb.Run
for i, r := range t.manualRuns {
if r.ID == runID {
run = r
t.manualRuns = append(t.manualRuns[:i], t.manualRuns[i+1:]...)
}
}
if run == nil {
return nil, &influxdb.ErrRunNotFound
}
return run, nil
}
func (d *TaskControlService) FinishRun(_ context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
d.mu.Lock()
defer d.mu.Unlock()