update task schedler to use the new task control service (#12949)

pull/12995/head
Lyon Hill 2019-03-28 16:52:25 -06:00 committed by GitHub
parent 0da5abb660
commit c78344c441
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 1065 additions and 630 deletions

View File

@ -547,12 +547,13 @@ func (m *Launcher) run(ctx context.Context) (err error) {
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.logger.With(zap.String("service", "task-executor")), m.queryController, authSvc, store)
lw := taskbackend.NewPointLogWriter(pointsWriter)
m.scheduler = taskbackend.NewScheduler(store, executor, lw, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskControlService := taskbackend.TaskControlAdaptor(store, lw, lr)
m.scheduler = taskbackend.NewScheduler(taskControlService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond), taskbackend.WithLogger(m.logger))
m.scheduler.Start(ctx)
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)
queryService := query.QueryServiceBridge{AsyncQueryService: m.queryController}
lr := taskbackend.NewQueryLogReader(queryService)
taskSvc = task.PlatformAdapter(coordinator.New(m.logger.With(zap.String("service", "task-coordinator")), m.scheduler, store), lr, m.scheduler, authSvc, userResourceSvc, orgSvc)
taskSvc = task.NewValidator(m.logger.With(zap.String("service", "task-authz-validator")), taskSvc, bucketSvc)
m.taskStore = store

View File

@ -96,7 +96,7 @@ stuff f=-123.456,b=true,s="hello"
}
from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, bOut.Name, be.Org.Name),
}
created, err := be.TaskService().CreateTask(pctx.SetAuthorizer(ctx, be.Auth), create)
created, err := be.TaskService().CreateTask(ctx, create)
if err != nil {
t.Fatal(err)
}

View File

@ -71,7 +71,7 @@ func httpTaskServiceFactory(t *testing.T) (*servicetest.System, context.CancelFu
}
return &servicetest.System{
TaskControlService: servicetest.TaskControlAdaptor(store, rrw, rrw),
TaskControlService: backend.TaskControlAdaptor(store, rrw, rrw),
TaskService: taskService,
Ctx: ctx,
I: i,

15
task.go
View File

@ -40,6 +40,21 @@ type Task struct {
UpdatedAt string `json:"updatedAt,omitempty"`
}
// EffectiveCron returns the effective cron string of the options.
// If the cron option was specified, it is returned.
// If the every option was specified, it is converted into a cron string using "@every".
// Otherwise, the empty string is returned.
// The value of the offset option is not considered.
func (t *Task) EffectiveCron() string {
if t.Cron != "" {
return t.Cron
}
if t.Every != "" {
return "@every " + t.Every
}
return ""
}
// Run is a record created when a run of a task is scheduled.
type Run struct {
ID ID `json:"id,omitempty"`

View File

@ -58,8 +58,13 @@ func (c *Coordinator) claimExistingTasks() {
continue
}
t := task // Copy to avoid mistaken closure around task value.
if err := c.sch.ClaimTask(&t.Task, &t.Meta); err != nil {
t, err := backend.ToInfluxTask(&task.Task, &task.Meta)
if err != nil {
continue
}
// I may need a context with an auth here
if err := c.sch.ClaimTask(context.Background(), t); err != nil {
c.logger.Error("failed claim task", zap.Error(err))
continue
}
@ -84,8 +89,11 @@ func (c *Coordinator) CreateTask(ctx context.Context, req backend.CreateTaskRequ
if err != nil {
return id, err
}
if err := c.sch.ClaimTask(task, meta); err != nil {
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return id, err
}
if err := c.sch.ClaimTask(ctx, t); err != nil {
_, delErr := c.Store.DeleteTask(ctx, id)
if delErr != nil {
return id, fmt.Errorf("schedule task failed: %s\n\tcleanup also failed: %s", err, delErr)
@ -114,13 +122,18 @@ func (c *Coordinator) UpdateTask(ctx context.Context, req backend.UpdateTaskRequ
}
}
if err := c.sch.UpdateTask(task, meta); err != nil && err != backend.ErrTaskNotClaimed {
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return res, err
}
if err := c.sch.UpdateTask(ctx, t); err != nil && err != backend.ErrTaskNotClaimed {
return res, err
}
// If enabling the task, claim it after modifying the script.
if req.Status == backend.TaskActive {
if err := c.sch.ClaimTask(task, meta); err != nil && err != backend.ErrTaskAlreadyClaimed {
if err := c.sch.ClaimTask(ctx, t); err != nil && err != backend.ErrTaskAlreadyClaimed {
return res, err
}
}
@ -162,9 +175,15 @@ func (c *Coordinator) ManuallyRunTimeRange(ctx context.Context, taskID platform.
if err != nil {
return r, err
}
t, m, err := c.Store.FindTaskByIDWithMeta(ctx, taskID)
task, meta, err := c.Store.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}
return r, c.sch.UpdateTask(t, m)
t, err := backend.ToInfluxTask(task, meta)
if err != nil {
return nil, err
}
return r, c.sch.UpdateTask(ctx, t)
}

View File

@ -15,11 +15,11 @@ import (
"go.uber.org/zap/zaptest"
)
func timeoutSelector(ch <-chan *mock.Task) (*mock.Task, error) {
func timeoutSelector(ch <-chan *platform.Task) (*platform.Task, error) {
select {
case task := <-ch:
return task, nil
case <-time.After(time.Second):
case <-time.After(10 * time.Second):
return nil, errors.New("timeout on select")
}
}
@ -47,7 +47,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}
if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
@ -65,7 +65,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}
if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
@ -102,7 +102,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}
if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
@ -115,7 +115,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}
if task.Script != script {
if task.Flux != script {
t.Fatal("task sent to scheduler doesnt match task created")
}
@ -129,7 +129,7 @@ func TestCoordinator(t *testing.T) {
t.Fatal(err)
}
if task.Script != newScript {
if task.Flux != newScript {
t.Fatal("task sent to scheduler doesnt match task created")
}
}

View File

@ -11,12 +11,12 @@ import (
"time"
"github.com/influxdata/flux"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/task/options"
opentracing "github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
)
var (
@ -30,21 +30,6 @@ var (
ErrTaskAlreadyClaimed = errors.New("task already claimed")
)
// DesiredState persists the desired state of a run.
type DesiredState interface {
// CreateNextRun requests the next run from the desired state, delegating to (*StoreTaskMeta).CreateNextRun.
// This allows the scheduler to be "dumb" and just tell DesiredState what time the scheduler thinks it is,
// and the DesiredState will create the appropriate run according to the task's cron schedule,
// and according to what's in progress and what's been finished.
//
// If a Run is requested and the cron schedule says the schedule isn't ready, a RunNotYetDueError is returned.
CreateNextRun(ctx context.Context, taskID platform.ID, now int64) (RunCreation, error)
// FinishRun indicates that the given run is no longer intended to be executed.
// This may be called after a successful or failed execution, or upon cancellation.
FinishRun(ctx context.Context, taskID, runID platform.ID) error
}
// Executor handles execution of a run.
type Executor interface {
// Execute attempts to begin execution of a run.
@ -114,10 +99,10 @@ type Scheduler interface {
Stop()
// ClaimTask begins control of task execution in this scheduler.
ClaimTask(task *StoreTask, meta *StoreTaskMeta) error
ClaimTask(authCtx context.Context, task *platform.Task) error
// UpdateTask will update the concurrency and the runners for a task
UpdateTask(task *StoreTask, meta *StoreTaskMeta) error
UpdateTask(authCtx context.Context, task *platform.Task) error
// ReleaseTask immediately cancels any in-progress runs for the given task ID,
// and releases any resources related to management of that task.
@ -166,16 +151,15 @@ func WithLogger(logger *zap.Logger) TickSchedulerOption {
}
// NewScheduler returns a new scheduler with the given desired state and the given now UTC timestamp.
func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, now int64, opts ...TickSchedulerOption) *TickScheduler {
func NewScheduler(taskControlService TaskControlService, executor Executor, now int64, opts ...TickSchedulerOption) *TickScheduler {
o := &TickScheduler{
desiredState: desiredState,
executor: executor,
logWriter: lw,
now: now,
taskSchedulers: make(map[platform.ID]*taskScheduler),
logger: zap.NewNop(),
wg: &sync.WaitGroup{},
metrics: newSchedulerMetrics(),
taskControlService: taskControlService,
executor: executor,
now: now,
taskSchedulers: make(map[platform.ID]*taskScheduler),
logger: zap.NewNop(),
wg: &sync.WaitGroup{},
metrics: newSchedulerMetrics(),
}
for _, opt := range opts {
@ -186,9 +170,8 @@ func NewScheduler(desiredState DesiredState, executor Executor, lw LogWriter, no
}
type TickScheduler struct {
desiredState DesiredState
executor Executor
logWriter LogWriter
taskControlService TaskControlService
executor Executor
now int64
logger *zap.Logger
@ -286,7 +269,7 @@ func (s *TickScheduler) Stop() {
s.executor.Wait()
}
func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err error) {
func (s *TickScheduler) ClaimTask(authCtx context.Context, task *platform.Task) (err error) {
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
if s.ctx == nil {
@ -302,7 +285,7 @@ func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err err
defer s.metrics.ClaimTask(err == nil)
ts, err := newTaskScheduler(s.ctx, s.wg, s, task, meta, s.metrics)
ts, err := newTaskScheduler(s.ctx, authCtx, s.wg, s, task, s.metrics)
if err != nil {
return err
}
@ -314,8 +297,13 @@ func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err err
s.taskSchedulers[task.ID] = ts
if len(meta.CurrentlyRunning) > 0 {
if err := ts.WorkCurrentlyRunning(meta); err != nil {
// pickup any runs that are still "running from a previous failure"
runs, err := s.taskControlService.CurrentlyRunning(authCtx, task.ID)
if err != nil {
return err
}
if len(runs) > 0 {
if err := ts.WorkCurrentlyRunning(runs); err != nil {
return err
}
}
@ -327,7 +315,12 @@ func (s *TickScheduler) ClaimTask(task *StoreTask, meta *StoreTaskMeta) (err err
return nil
}
func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error {
func (s *TickScheduler) UpdateTask(authCtx context.Context, task *platform.Task) error {
opt, err := options.FromScript(task.Flux)
if err != nil {
return err
}
s.schedulerMu.Lock()
defer s.schedulerMu.Unlock()
@ -337,20 +330,29 @@ func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error {
}
ts.task = task
next, err := meta.NextDueRun()
next, err := s.taskControlService.NextDueRun(authCtx, task.ID)
if err != nil {
return err
}
hasQueue := len(meta.ManualRuns) > 0
runs, err := s.taskControlService.ManualRuns(authCtx, task.ID)
if err != nil {
return err
}
hasQueue := len(runs) > 0
// update the queued information
ts.nextDueMu.Lock()
ts.hasQueue = hasQueue
ts.nextDue = next
ts.authCtx = authCtx
ts.nextDueMu.Unlock()
// check the concurrency
// todo(lh): In the near future we may not be using the scheduler to manage concurrency.
maxC := int(meta.MaxConcurrency)
maxC := len(ts.runners)
if opt.Concurrency != nil {
maxC = int(*opt.Concurrency)
}
if maxC != len(ts.runners) {
ts.runningMu.Lock()
if maxC < len(ts.runners) {
@ -360,7 +362,7 @@ func (s *TickScheduler) UpdateTask(task *StoreTask, meta *StoreTaskMeta) error {
if maxC > len(ts.runners) {
delta := maxC - len(ts.runners)
for i := 0; i < delta; i++ {
ts.runners = append(ts.runners, newRunner(s.ctx, ts.wg, s.logger, task, s.desiredState, s.executor, s.logWriter, ts))
ts.runners = append(ts.runners, newRunner(s.ctx, ts.wg, s.logger, task, s.taskControlService, s.executor, ts))
}
}
ts.runningMu.Unlock()
@ -404,7 +406,10 @@ type taskScheduler struct {
now *int64
// Task we are scheduling for.
task *StoreTask
task *platform.Task
// Authorization context for using the TaskControlService
authCtx context.Context
// CancelFunc for context passed to runners, to enable Cancel method.
cancel context.CancelFunc
@ -427,13 +432,26 @@ type taskScheduler struct {
func newTaskScheduler(
ctx context.Context,
authCtx context.Context,
wg *sync.WaitGroup,
s *TickScheduler,
task *StoreTask,
meta *StoreTaskMeta,
task *platform.Task,
metrics *schedulerMetrics,
) (*taskScheduler, error) {
firstDue, err := meta.NextDueRun()
firstDue, err := s.taskControlService.NextDueRun(authCtx, task.ID)
if err != nil {
return nil, err
}
opt, err := options.FromScript(task.Flux)
if err != nil {
return nil, err
}
maxC := 1
if opt.Concurrency != nil {
maxC = int(*opt.Concurrency)
}
runs, err := s.taskControlService.ManualRuns(authCtx, task.ID)
if err != nil {
return nil, err
}
@ -442,20 +460,21 @@ func newTaskScheduler(
ts := &taskScheduler{
now: &s.now,
task: task,
authCtx: authCtx,
cancel: cancel,
wg: wg,
runners: make([]*runner, meta.MaxConcurrency),
running: make(map[platform.ID]runCtx, meta.MaxConcurrency),
runners: make([]*runner, maxC),
running: make(map[platform.ID]runCtx, maxC),
logger: s.logger.With(zap.String("task_id", task.ID.String())),
metrics: s.metrics,
nextDue: firstDue,
nextDueSource: math.MinInt64,
hasQueue: len(meta.ManualRuns) > 0,
hasQueue: len(runs) > 0,
}
for i := range ts.runners {
logger := ts.logger.With(zap.Int("run_slot", i))
ts.runners[i] = newRunner(ctx, wg, logger, task, s.desiredState, s.executor, s.logWriter, ts)
ts.runners[i] = newRunner(ctx, wg, logger, task, s.taskControlService, s.executor, ts)
}
return ts, nil
@ -473,11 +492,15 @@ func (ts *taskScheduler) Work() {
}
}
func (ts *taskScheduler) WorkCurrentlyRunning(meta *StoreTaskMeta) error {
for _, cr := range meta.CurrentlyRunning {
func (ts *taskScheduler) WorkCurrentlyRunning(runs []*platform.Run) error {
for _, cr := range runs {
foundWorker := false
for _, r := range ts.runners {
qr := QueuedRun{TaskID: ts.task.ID, RunID: platform.ID(cr.RunID), Now: cr.Now}
time, err := time.Parse(time.RFC3339, cr.ScheduledFor)
if err != nil {
return err
}
qr := QueuedRun{TaskID: ts.task.ID, RunID: platform.ID(cr.ID), Now: time.Unix()}
if r.RestartRun(qr) {
foundWorker = true
break
@ -523,11 +546,10 @@ type runner struct {
ctx context.Context
wg *sync.WaitGroup
task *StoreTask
task *platform.Task
desiredState DesiredState
executor Executor
logWriter LogWriter
taskControlService TaskControlService
executor Executor
// Parent taskScheduler.
ts *taskScheduler
@ -539,22 +561,20 @@ func newRunner(
ctx context.Context,
wg *sync.WaitGroup,
logger *zap.Logger,
task *StoreTask,
desiredState DesiredState,
task *platform.Task,
taskControlService TaskControlService,
executor Executor,
logWriter LogWriter,
ts *taskScheduler,
) *runner {
return &runner{
ctx: ctx,
wg: wg,
state: new(uint32),
task: task,
desiredState: desiredState,
executor: executor,
logWriter: logWriter,
ts: ts,
logger: logger,
ctx: ctx,
wg: wg,
state: new(uint32),
task: task,
taskControlService: taskControlService,
executor: executor,
ts: ts,
logger: logger,
}
}
@ -624,7 +644,7 @@ func (r *runner) startFromWorking(now int64) {
defer span.Finish()
ctx, cancel := context.WithCancel(ctx)
rc, err := r.desiredState.CreateNextRun(ctx, r.task.ID, now)
rc, err := r.taskControlService.CreateNextRun(ctx, r.task.ID, now)
if err != nil {
r.logger.Info("Failed to create run", zap.Error(err))
atomic.StoreUint32(r.state, runnerIdle)
@ -658,13 +678,7 @@ func (r *runner) clearRunning(id platform.ID) {
// fail sets r's state to failed, and marks this runner as idle.
func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason error) {
rlb := RunLogBase{
Task: r.task,
RunID: qr.RunID,
RunScheduledFor: qr.Now,
RequestedAt: qr.RequestedAt,
}
if err := r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), stage+": "+reason.Error()); err != nil {
if err := r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), stage+": "+reason.Error()); err != nil {
runLogger.Info("Failed to update run log", zap.Error(err))
}
@ -674,6 +688,16 @@ func (r *runner) fail(qr QueuedRun, runLogger *zap.Logger, stage string, reason
func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *zap.Logger) {
defer r.wg.Done()
errMsg := "Failed to finish run"
defer func() {
if _, err := r.taskControlService.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
runLogger.Error(errMsg, zap.Error(err))
atomic.StoreUint32(r.state, runnerIdle)
}
}()
sp, spCtx := tracing.StartSpanFromContext(ctx)
defer sp.Finish()
@ -681,11 +705,7 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
rp, err := r.executor.Execute(spCtx, qr)
if err != nil {
runLogger.Info("Failed to begin run execution", zap.Error(err))
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
runLogger.Error("Beginning run execution failed, and desired state update failed", zap.Error(err))
}
errMsg = "Beginning run execution failed, " + errMsg
// TODO(mr): retry?
r.fail(qr, runLogger, "Run failed to begin execution", err)
return
@ -713,19 +733,14 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
close(ready)
if err != nil {
if err == ErrRunCanceled {
_ = r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID)
r.updateRunState(qr, RunCanceled, runLogger)
errMsg = "Waiting for execution result failed, " + errMsg
// Move on to the next execution, for a canceled run.
r.startFromWorking(atomic.LoadInt64(r.ts.now))
return
}
runLogger.Info("Failed to wait for execution result", zap.Error(err))
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
runLogger.Error("Waiting for execution result failed, and desired state update failed", zap.Error(err))
}
// TODO(mr): retry?
r.fail(qr, runLogger, "Waiting for execution result", err)
@ -733,34 +748,22 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
}
if err := rr.Err(); err != nil {
runLogger.Info("Run failed to execute", zap.Error(err))
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
// TODO(mr): Need to figure out how to reconcile this error, on the next run, if it happens.
runLogger.Error("Run failed to execute, and desired state update failed", zap.Error(err))
}
errMsg = "Run failed to execute, " + errMsg
// TODO(mr): retry?
r.fail(qr, runLogger, "Run failed to execute", err)
return
}
if err := r.desiredState.FinishRun(r.ctx, qr.TaskID, qr.RunID); err != nil {
runLogger.Info("Failed to finish run", zap.Error(err))
// TODO(mr): retry?
// Need to think about what it means if there was an error finishing a run.
atomic.StoreUint32(r.state, runnerIdle)
r.updateRunState(qr, RunFail, runLogger)
return
}
rlb := RunLogBase{
Task: r.task,
RunID: qr.RunID,
RunScheduledFor: qr.Now,
RequestedAt: qr.RequestedAt,
}
stats := rr.Statistics()
b, err := json.Marshal(stats)
if err == nil {
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), string(b))
// authctx can be updated mid process
r.ts.nextDueMu.RLock()
authCtx := r.ts.authCtx
r.ts.nextDueMu.RUnlock()
r.taskControlService.AddRunLog(authCtx, r.task.ID, qr.RunID, time.Now(), string(b))
}
r.updateRunState(qr, RunSuccess, runLogger)
runLogger.Info("Execution succeeded")
@ -770,26 +773,19 @@ func (r *runner) executeAndWait(ctx context.Context, qr QueuedRun, runLogger *za
}
func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger) {
rlb := RunLogBase{
Task: r.task,
RunID: qr.RunID,
RunScheduledFor: qr.Now,
RequestedAt: qr.RequestedAt,
}
switch s {
case RunStarted:
r.ts.metrics.StartRun(r.task.ID.String())
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), fmt.Sprintf("Started task from script: %q", r.task.Script))
r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), fmt.Sprintf("Started task from script: %q", r.task.Flux))
case RunSuccess:
r.ts.metrics.FinishRun(r.task.ID.String(), true)
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), "Completed successfully")
r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), "Completed successfully")
case RunFail:
r.ts.metrics.FinishRun(r.task.ID.String(), false)
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), "Failed")
r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), "Failed")
case RunCanceled:
r.ts.metrics.FinishRun(r.task.ID.String(), false)
r.logWriter.AddRunLog(r.ctx, rlb, time.Now(), "Canceled")
r.taskControlService.AddRunLog(r.ts.authCtx, r.task.ID, qr.RunID, time.Now(), "Canceled")
default: // We are deliberately not handling RunQueued yet.
// There is not really a notion of being queued in this runner architecture.
runLogger.Warn("Unhandled run state", zap.Stringer("state", s))
@ -799,7 +795,7 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger
// If we start seeing errors from this, we know the time limit is too short or the system is overloaded.
ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond)
defer cancel()
if err := r.logWriter.UpdateRunState(ctx, rlb, time.Now(), s); err != nil {
if err := r.taskControlService.UpdateRunState(ctx, r.task.ID, qr.RunID, time.Now(), s); err != nil {
runLogger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err))
}
}

View File

@ -7,6 +7,7 @@ import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
@ -23,55 +24,46 @@ import (
func TestScheduler_Cancelation(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
e.WithHanging(100 * time.Millisecond)
rl := backend.NewInMemRunReaderWriter()
o := backend.NewScheduler(d, e, rl, 5, backend.WithLogger(zaptest.NewLogger(t)))
o := backend.NewScheduler(tcs, e, 5, backend.WithLogger(zaptest.NewLogger(t)))
o.Start(context.Background())
defer o.Stop()
const orgID = 2
task := &backend.StoreTask{
ID: platform.ID(1),
Org: orgID,
task := &platform.Task{
ID: platform.ID(1),
OrganizationID: orgID,
Every: "1s",
LatestCompleted: "1970-01-01T00:00:04Z",
Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "@every 1s",
LatestCompleted: 4,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
runs, err := rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID})
runs, err := tcs.CurrentlyRunning(context.Background(), task.ID)
if err != nil {
t.Fatal(err)
}
if err = o.CancelRun(context.Background(), task.ID, runs[0].ID); err != nil {
run := runs[0]
if err = o.CancelRun(context.Background(), task.ID, run.ID); err != nil {
t.Fatal(err)
}
time.Sleep(10 * time.Millisecond) // we have to do this because the storage system we are using for the logs is eventually consistent.
runs, err = rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID})
time.Sleep(20 * time.Millisecond) // we have to do this because the storage system we are using for the logs is eventually consistent.
runs, err = tcs.CurrentlyRunning(context.Background(), task.ID)
if err != nil {
t.Fatal(err)
}
if runs[0].Status != "canceled" {
t.Fatalf("Run not logged as canceled, but is %s", runs[0].Status)
}
// check to make sure it is really canceling, and that the status doesn't get changed to something else after it would have finished
time.Sleep(500 * time.Millisecond)
runs, err = rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID})
if err != nil {
t.Fatal(err)
}
if runs[0].Status != "canceled" {
t.Fatalf("Run not actually canceled, but is %s", runs[0].Status)
if len(runs) != 0 {
t.Fatal("canceled run still running")
}
// check for when we cancel something already canceled
if err = o.CancelRun(context.Background(), task.ID, runs[0].ID); err != backend.ErrRunNotFound {
time.Sleep(500 * time.Millisecond)
if err = o.CancelRun(context.Background(), task.ID, run.ID); err != backend.ErrRunNotFound {
t.Fatalf("expected ErrRunNotFound but got %s", err)
}
}
@ -79,55 +71,47 @@ func TestScheduler_Cancelation(t *testing.T) {
func TestScheduler_StartScriptOnClaim(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5, backend.WithLogger(zaptest.NewLogger(t)))
o := backend.NewScheduler(tcs, e, 5, backend.WithLogger(zaptest.NewLogger(t)))
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
task := &platform.Task{
ID: platform.ID(1),
Cron: "* * * * *",
LatestCompleted: "1970-01-01T00:00:03Z",
Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "* * * * *",
LatestCompleted: 3,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
// No valid timestamps between 3 and 5 for every minute.
if n := len(d.CreatedFor(task.ID)); n > 0 {
if n := len(tcs.CreatedFor(task.ID)); n > 0 {
t.Fatalf("expected no runs queued, but got %d", n)
}
// For every second, can queue for timestamps 4 and 5.
task = &backend.StoreTask{
ID: platform.ID(2),
task = &platform.Task{
ID: platform.ID(2),
Every: "1s",
LatestCompleted: "1970-01-01T00:00:03Z",
Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
meta = &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LatestCompleted: 3,
CurrentlyRunning: []*backend.StoreTaskMetaRun{
&backend.StoreTaskMetaRun{
Now: 4,
RunID: uint64(10),
},
},
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
if n := len(d.CreatedFor(task.ID)); n != 1 {
if n := len(tcs.CreatedFor(task.ID)); n != 2 {
t.Fatalf("expected 2 runs queued for 'every 1s' script, but got %d", n)
}
if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
if x, err := tcs.PollForNumberCreated(task.ID, 2); err != nil {
t.Fatalf("expected 1 runs queued, but got %d", len(x))
}
@ -143,7 +127,7 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) {
rp.Finish(mock.NewRunResult(nil, false), nil)
}
if x, err := d.PollForNumberCreated(task.ID, 0); err != nil {
if x, err := tcs.PollForNumberCreated(task.ID, 0); err != nil {
t.Fatalf("expected 1 runs queued, but got %d", len(x))
}
@ -155,32 +139,30 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) {
func TestScheduler_CreateNextRunOnTick(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
o := backend.NewScheduler(tcs, e, 5)
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 2,
EffectiveCron: "@every 1s",
LatestCompleted: 5,
task := &platform.Task{
ID: platform.ID(1),
Every: "1s",
LatestCompleted: "1970-01-01T00:00:05Z",
Flux: `option task = {concurrency: 2, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
if x, err := d.PollForNumberCreated(task.ID, 0); err != nil {
if x, err := tcs.PollForNumberCreated(task.ID, 0); err != nil {
t.Fatalf("expected no runs queued, but got %d", len(x))
}
o.Tick(6)
if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
if x, err := tcs.PollForNumberCreated(task.ID, 1); err != nil {
t.Fatalf("expected 1 run queued, but got %d", len(x))
}
running, err := e.PollForNumberRunning(task.ID, 1)
@ -193,7 +175,7 @@ func TestScheduler_CreateNextRunOnTick(t *testing.T) {
}
o.Tick(7)
if x, err := d.PollForNumberCreated(task.ID, 2); err != nil {
if x, err := tcs.PollForNumberCreated(task.ID, 2); err != nil {
t.Fatalf("expected 2 runs queued, but got %d", len(x))
}
running, err = e.PollForNumberRunning(task.ID, 2)
@ -212,7 +194,7 @@ func TestScheduler_CreateNextRunOnTick(t *testing.T) {
}
o.Tick(8) // Can't exceed concurrency of 2.
if x, err := d.PollForNumberCreated(task.ID, 2); err != nil {
if x, err := tcs.PollForNumberCreated(task.ID, 2); err != nil {
t.Fatalf("expected 2 runs queued, but got %d", len(x))
}
run6.Cancel() // 7 and 8 should be running.
@ -227,28 +209,26 @@ func TestScheduler_CreateNextRunOnTick(t *testing.T) {
func TestScheduler_LogStatisticsOnSuccess(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
rl := backend.NewInMemRunReaderWriter()
o := backend.NewScheduler(d, e, rl, 5, backend.WithLogger(zaptest.NewLogger(t)))
o := backend.NewScheduler(tcs, e, 5, backend.WithLogger(zaptest.NewLogger(t)))
o.Start(context.Background())
defer o.Stop()
const taskID = 0x12345
const orgID = 0x54321
task := &backend.StoreTask{
ID: taskID,
Org: orgID,
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "@every 1s",
LatestCompleted: 5,
task := &platform.Task{
ID: taskID,
OrganizationID: orgID,
Every: "1s",
LatestCompleted: "1970-01-01T00:00:05Z",
Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(taskID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
@ -269,14 +249,11 @@ func TestScheduler_LogStatisticsOnSuccess(t *testing.T) {
t.Fatal(err)
}
logs, err := rl.ListLogs(context.Background(), orgID, platform.LogFilter{Task: taskID, Run: &runID})
if err != nil {
t.Fatal(err)
}
run := tcs.FinishedRun(runID)
// For now, assume the stats line is the only line beginning with "{".
var statJSON string
for _, log := range logs {
for _, log := range run.Log {
if len(log.Message) > 0 && log.Message[0] == '{' {
statJSON = log.Message
break
@ -299,29 +276,27 @@ func TestScheduler_LogStatisticsOnSuccess(t *testing.T) {
func TestScheduler_Release(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
o := backend.NewScheduler(tcs, e, 5)
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LatestCompleted: 5,
task := &platform.Task{
ID: platform.ID(1),
Every: "1s",
LatestCompleted: "1970-01-01T00:00:05Z",
Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
o.Tick(6)
o.Tick(7)
if n := len(d.CreatedFor(task.ID)); n != 2 {
if n := len(tcs.CreatedFor(task.ID)); n != 2 {
t.Fatalf("expected 2 runs queued, but got %d", n)
}
@ -329,7 +304,7 @@ func TestScheduler_Release(t *testing.T) {
t.Fatal(err)
}
if _, err := d.PollForNumberCreated(task.ID, 0); err != nil {
if _, err := tcs.PollForNumberCreated(task.ID, 0); err != nil {
t.Fatal(err)
}
}
@ -337,23 +312,21 @@ func TestScheduler_Release(t *testing.T) {
func TestScheduler_UpdateTask(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
s := backend.NewScheduler(d, e, backend.NopLogWriter{}, 3059, backend.WithLogger(zaptest.NewLogger(t)))
s := backend.NewScheduler(tcs, e, 3059, backend.WithLogger(zaptest.NewLogger(t)))
s.Start(context.Background())
defer s.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "* * * * *", // Every minute.
LatestCompleted: 3000,
task := &platform.Task{
ID: platform.ID(1),
Cron: "* * * * *",
LatestCompleted: "1970-01-01T00:50:00Z",
Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(task.ID, *meta)
if err := s.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := s.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
@ -365,11 +338,11 @@ func TestScheduler_UpdateTask(t *testing.T) {
p[0].Finish(mock.NewRunResult(nil, false), nil)
meta.EffectiveCron = "0 * * * *"
meta.MaxConcurrency = 30
d.SetTaskMeta(task.ID, *meta)
task.Cron = "0 * * * *"
task.Flux = `option task = {concurrency: 50, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`
tcs.SetTask(task)
if err := s.UpdateTask(task, meta); err != nil {
if err := s.UpdateTask(context.Background(), task); err != nil {
t.Fatal(err)
}
@ -390,30 +363,41 @@ func TestScheduler_UpdateTask(t *testing.T) {
func TestScheduler_Queue(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 3059, backend.WithLogger(zaptest.NewLogger(t)))
o := backend.NewScheduler(tcs, e, 3059, backend.WithLogger(zaptest.NewLogger(t)))
o.Start(context.Background())
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "* * * * *", // Every minute.
LatestCompleted: 3000,
ManualRuns: []*backend.StoreTaskMetaManualRun{
{Start: 120, End: 240, LatestCompleted: 119, RequestedAt: 3001},
},
task := &platform.Task{
ID: platform.ID(1),
Cron: "* * * * *",
LatestCompleted: "1970-01-01T00:50:00Z",
Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
tcs.SetManualRuns([]*platform.Run{
&platform.Run{
ID: platform.ID(10),
TaskID: task.ID,
ScheduledFor: "1970-01-01T00:02:00Z",
},
&platform.Run{
ID: platform.ID(11),
TaskID: task.ID,
ScheduledFor: "1970-01-01T00:03:00Z",
}, &platform.Run{
ID: platform.ID(12),
TaskID: task.ID,
ScheduledFor: "1970-01-01T00:04:00Z",
},
})
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
cs, err := d.PollForNumberCreated(task.ID, 1)
cs, err := tcs.PollForNumberCreated(task.ID, 1)
if err != nil {
t.Fatal(err)
}
@ -456,40 +440,64 @@ func TestScheduler_Queue(t *testing.T) {
// Tick the scheduler so the next natural run will happen once 180 finishes.
o.Tick(3062)
// Cancel 180. Next run should be 3060, the next natural schedule.
e.RunningFor(task.ID)[0].Cancel()
pollForRun(3060)
// Cancel the 3060 run; 240 should pick up.
// Cancel 180. Next run should be 240, manual runs get priority.
e.RunningFor(task.ID)[0].Cancel()
pollForRun(240)
// Cancel 240; jobs should be idle.
// Cancel the 240 run; 3060 should pick up.
e.RunningFor(task.ID)[0].Cancel()
pollForRun(3060)
// Cancel 3060; jobs should be idle.
e.RunningFor(task.ID)[0].Cancel()
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
}
func pollForRunLog(t *testing.T, r backend.LogReader, taskID, runID, orgID platform.ID, exp string) {
// LogListener allows us to act as a middleware and see if specific logs have been written
type logListener struct {
mu sync.Mutex
backend.TaskControlService
logs map[string][]string
}
func newLogListener(tcs backend.TaskControlService) *logListener {
return &logListener{
TaskControlService: tcs,
logs: make(map[string][]string),
}
}
func (l *logListener) AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error {
l.mu.Lock()
defer l.mu.Unlock()
logs := l.logs[taskID.String()+runID.String()]
logs = append(logs, log)
l.logs[taskID.String()+runID.String()] = logs
return l.TaskControlService.AddRunLog(ctx, taskID, runID, when, log)
}
func pollForRunLog(t *testing.T, ll *logListener, taskID, runID platform.ID, exp string) {
t.Helper()
var logs []platform.Log
var err error
var logs []string
const maxAttempts = 50
for i := 0; i < maxAttempts; i++ {
if i != 0 {
time.Sleep(10 * time.Millisecond)
}
logs, err = r.ListLogs(context.Background(), orgID, platform.LogFilter{Task: taskID, Run: &runID})
if err != nil {
t.Fatal(err)
}
ll.mu.Lock()
logs = ll.logs[taskID.String()+runID.String()]
ll.mu.Unlock()
for _, log := range logs {
if log.Message == exp {
if log == exp {
return
}
}
@ -497,33 +505,75 @@ func pollForRunLog(t *testing.T, r backend.LogReader, taskID, runID, orgID platf
t.Logf("Didn't find message %q in logs:", exp)
for _, log := range logs {
t.Logf("\t%s", log.Message)
t.Logf("\t%s", log)
}
t.FailNow()
}
// runListener allows us to act as a middleware and see if specific states are updated
type runListener struct {
mu sync.Mutex
backend.TaskControlService
rs map[platform.ID][]*platform.Run
}
func newRunListener(tcs backend.TaskControlService) *runListener {
return &runListener{
TaskControlService: tcs,
rs: make(map[platform.ID][]*platform.Run),
}
}
func (l *runListener) UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, state backend.RunStatus) error {
l.mu.Lock()
defer l.mu.Unlock()
runs, ok := l.rs[taskID]
if !ok {
runs = []*platform.Run{}
}
found := false
for _, run := range runs {
if run.ID == runID {
found = true
run.Status = state.String()
}
}
if !found {
runs = append(runs, &platform.Run{ID: runID, Status: state.String()})
}
l.rs[taskID] = runs
return l.TaskControlService.UpdateRunState(ctx, taskID, runID, when, state)
}
// pollForRunStatus tries a few times to find runs matching supplied conditions, before failing.
func pollForRunStatus(t *testing.T, r backend.LogReader, taskID, orgID platform.ID, expCount, expIndex int, expStatus string) {
func pollForRunStatus(t *testing.T, r *runListener, taskID platform.ID, expCount, expIndex int, expStatus string) {
t.Helper()
var runs []*platform.Run
var err error
const maxAttempts = 50
for i := 0; i < maxAttempts; i++ {
if i != 0 {
time.Sleep(10 * time.Millisecond)
}
runs, err = r.ListRuns(context.Background(), orgID, platform.RunFilter{Task: taskID})
if err != nil {
t.Fatal(err)
}
r.mu.Lock()
runs = r.rs[taskID]
r.mu.Unlock()
if len(runs) != expCount {
continue
}
// make sure we dont panic
if len(runs) < expIndex {
continue
}
if runs[expIndex].Status != expStatus {
continue
}
@ -542,26 +592,24 @@ func pollForRunStatus(t *testing.T, r backend.LogReader, taskID, orgID platform.
func TestScheduler_RunStatus(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
rl := backend.NewInMemRunReaderWriter()
s := backend.NewScheduler(d, e, rl, 5, backend.WithLogger(zaptest.NewLogger(t)))
rl := newRunListener(tcs)
s := backend.NewScheduler(rl, e, 5, backend.WithLogger(zaptest.NewLogger(t)))
s.Start(context.Background())
defer s.Stop()
// Claim a task that starts later.
task := &backend.StoreTask{
ID: platform.ID(1),
Org: 2,
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LatestCompleted: 5,
task := &platform.Task{
ID: platform.ID(1),
OrganizationID: platform.ID(2),
Every: "1s",
LatestCompleted: "1970-01-01T00:00:05Z",
Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(task.ID, *meta)
if err := s.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := s.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
@ -571,7 +619,7 @@ func TestScheduler_RunStatus(t *testing.T) {
t.Fatal(err)
}
runs, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID})
runs, err := tcs.CurrentlyRunning(context.Background(), task.ID)
if err != nil {
t.Fatal(err)
}
@ -589,7 +637,7 @@ func TestScheduler_RunStatus(t *testing.T) {
t.Fatal(err)
}
pollForRunStatus(t, rl, task.ID, task.Org, 1, 0, backend.RunSuccess.String())
pollForRunStatus(t, rl, task.ID, 1, 0, backend.RunSuccess.String())
// Create a new run, but fail this time.
s.Tick(7)
@ -598,7 +646,7 @@ func TestScheduler_RunStatus(t *testing.T) {
t.Fatal(err)
}
pollForRunStatus(t, rl, task.ID, task.Org, 2, 1, backend.RunStarted.String())
pollForRunStatus(t, rl, task.ID, 2, 1, backend.RunStarted.String())
// Finish with failure to create the run.
promises[0].Finish(nil, errors.New("forced failure"))
@ -606,7 +654,7 @@ func TestScheduler_RunStatus(t *testing.T) {
t.Fatal(err)
}
pollForRunStatus(t, rl, task.ID, task.Org, 2, 1, backend.RunFail.String())
pollForRunStatus(t, rl, task.ID, 2, 1, backend.RunFail.String())
// Create a new run that starts but fails.
s.Tick(8)
@ -615,12 +663,12 @@ func TestScheduler_RunStatus(t *testing.T) {
t.Fatal(err)
}
pollForRunStatus(t, rl, task.ID, task.Org, 3, 2, backend.RunStarted.String())
pollForRunStatus(t, rl, task.ID, 3, 2, backend.RunStarted.String())
promises[0].Finish(mock.NewRunResult(errors.New("started but failed to finish properly"), false), nil)
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
pollForRunStatus(t, rl, task.ID, task.Org, 3, 2, backend.RunFail.String())
pollForRunStatus(t, rl, task.ID, 3, 2, backend.RunFail.String())
// One more run, but cancel this time.
s.Tick(9)
@ -629,7 +677,7 @@ func TestScheduler_RunStatus(t *testing.T) {
t.Fatal(err)
}
pollForRunStatus(t, rl, task.ID, task.Org, 4, 3, backend.RunStarted.String())
pollForRunStatus(t, rl, task.ID, 4, 3, backend.RunStarted.String())
// Finish with failure.
promises[0].Cancel()
@ -637,31 +685,29 @@ func TestScheduler_RunStatus(t *testing.T) {
t.Fatal(err)
}
pollForRunStatus(t, rl, task.ID, task.Org, 4, 3, backend.RunCanceled.String())
pollForRunStatus(t, rl, task.ID, 4, 3, backend.RunCanceled.String())
}
func TestScheduler_RunFailureCleanup(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
rl := backend.NewInMemRunReaderWriter()
s := backend.NewScheduler(d, e, rl, 5, backend.WithLogger(zaptest.NewLogger(t)))
ll := newLogListener(tcs)
s := backend.NewScheduler(ll, e, 5, backend.WithLogger(zaptest.NewLogger(t)))
s.Start(context.Background())
defer s.Stop()
// Task with concurrency 1 should continue after one run fails.
task := &backend.StoreTask{
ID: platform.ID(1),
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 1,
EffectiveCron: "@every 1s",
LatestCompleted: 5,
task := &platform.Task{
ID: platform.ID(1),
Every: "1s",
LatestCompleted: "1970-01-01T00:00:05Z",
Flux: `option task = {name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(task.ID, *meta)
if err := s.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := s.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
@ -676,7 +722,7 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
pollForRunLog(t, rl, task.ID, promises[0].Run().RunID, task.Org, "Waiting for execution result: forced failure")
pollForRunLog(t, ll, task.ID, promises[0].Run().RunID, "Waiting for execution result: forced failure")
// Should continue even if max concurrency == 1.
// This run will start and then fail.
@ -690,10 +736,10 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
pollForRunLog(t, rl, task.ID, promises[0].Run().RunID, task.Org, "Run failed to execute: started but failed to finish properly")
pollForRunLog(t, ll, task.ID, promises[0].Run().RunID, "Run failed to execute: started but failed to finish properly")
// Fail to execute next run.
if n := d.TotalRunsCreatedForTask(task.ID); n != 2 {
if n := tcs.TotalRunsCreatedForTask(task.ID); n != 2 {
t.Fatalf("should have created 2 runs so far, got %d", n)
}
e.FailNextCallToExecute(errors.New("forced failure on Execute"))
@ -702,7 +748,7 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
const attempts = 50
for i := 0; i < attempts; i++ {
time.Sleep(2 * time.Millisecond)
n := d.TotalRunsCreatedForTask(task.ID)
n := tcs.TotalRunsCreatedForTask(task.ID)
if n == 3 {
break
}
@ -712,11 +758,11 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
}
}
// We don't have a good hook to get the run ID right now, so list the runs and assume the final one is ours.
runs, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID})
runs := tcs.FinishedRuns()
if err != nil {
t.Fatal(err)
}
pollForRunLog(t, rl, task.ID, runs[len(runs)-1].ID, task.Org, "Run failed to begin execution: forced failure on Execute")
pollForRunLog(t, ll, task.ID, runs[len(runs)-1].ID, "Run failed to begin execution: forced failure on Execute")
// One more tick just to ensure that we can keep going after this type of failure too.
s.Tick(9)
@ -729,9 +775,9 @@ func TestScheduler_RunFailureCleanup(t *testing.T) {
func TestScheduler_Metrics(t *testing.T) {
t.Parallel()
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
s := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
s := backend.NewScheduler(tcs, e, 5)
s.Start(context.Background())
defer s.Stop()
@ -741,17 +787,15 @@ func TestScheduler_Metrics(t *testing.T) {
reg.MustRegister(s.PrometheusCollectors()...)
// Claim a task that starts later.
task := &backend.StoreTask{
ID: platform.ID(1),
}
meta := &backend.StoreTaskMeta{
MaxConcurrency: 99,
EffectiveCron: "@every 1s",
LatestCompleted: 5,
task := &platform.Task{
ID: platform.ID(1),
Every: "1s",
LatestCompleted: "1970-01-01T00:00:05Z",
Flux: `option task = {concurrency: 99, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
}
d.SetTaskMeta(task.ID, *meta)
if err := s.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := s.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
@ -871,7 +915,7 @@ func TestScheduler_Stop(t *testing.T) {
t.Parallel()
e := &fakeWaitExecutor{wait: make(chan struct{})}
o := backend.NewScheduler(mock.NewDesiredState(), e, backend.NopLogWriter{}, 4, backend.WithLogger(zaptest.NewLogger(t)))
o := backend.NewScheduler(mock.NewTaskControlService(), e, 4, backend.WithLogger(zaptest.NewLogger(t)))
o.Start(context.Background())
stopped := make(chan struct{})
@ -904,33 +948,30 @@ func TestScheduler_WithTicker(t *testing.T) {
defer cancel()
tickFreq := 100 * time.Millisecond
d := mock.NewDesiredState()
tcs := mock.NewTaskControlService()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5, backend.WithLogger(zaptest.NewLogger(t)), backend.WithTicker(ctx, tickFreq))
o := backend.NewScheduler(tcs, e, 5, backend.WithLogger(zaptest.NewLogger(t)), backend.WithTicker(ctx, tickFreq))
o.Start(ctx)
defer o.Stop()
task := &backend.StoreTask{
ID: platform.ID(1),
}
createdAt := time.Now().Unix()
meta := &backend.StoreTaskMeta{
MaxConcurrency: 5,
EffectiveCron: "@every 1s",
LatestCompleted: createdAt,
createdAt := time.Now()
task := &platform.Task{
ID: platform.ID(1),
Every: "1s",
Flux: `option task = {concurrency: 5, name:"x", every:1m} from(bucket:"a") |> to(bucket:"b", org: "o")`,
LatestCompleted: createdAt.Format(time.RFC3339Nano),
}
d.SetTaskMeta(task.ID, *meta)
if err := o.ClaimTask(task, meta); err != nil {
tcs.SetTask(task)
if err := o.ClaimTask(context.Background(), task); err != nil {
t.Fatal(err)
}
for time.Now().Unix() == createdAt {
for time.Now().Unix() == createdAt.Unix() {
time.Sleep(tickFreq + 10*time.Millisecond)
}
if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
if x, err := tcs.PollForNumberCreated(task.ID, 1); err != nil {
t.Fatalf("expected 1 run queued, but got %d", len(x))
}
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/options"
)
// TaskControlService is a low-level controller interface, intended to be passed to
@ -16,6 +17,9 @@ type TaskControlService interface {
// If the run's ScheduledFor would be later than the passed-in now, CreateNextRun returns a RunNotYetDueError.
CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (RunCreation, error)
CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error)
// FinishRun removes runID from the list of running tasks and if its `ScheduledFor` is later then last completed update it.
FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error)
@ -29,3 +33,210 @@ type TaskControlService interface {
// AddRunLog adds a log line to the run.
AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error
}
// TaskControlAdaptor creates a TaskControlService for the older TaskStore system.
// TODO(lh): remove task control adaptor when we transition away from Store.
func TaskControlAdaptor(s Store, lw LogWriter, lr LogReader) TaskControlService {
return &taskControlAdaptor{s, lw, lr}
}
// taskControlAdaptor adapts a Store and log readers and writers to implement the task control service.
type taskControlAdaptor struct {
s Store
lw LogWriter
lr LogReader
}
func (tcs *taskControlAdaptor) CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (RunCreation, error) {
return tcs.s.CreateNextRun(ctx, taskID, now)
}
func (tcs *taskControlAdaptor) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
// Once we completely switch over to the new system we can look at the returned run in the tests.
task, err := tcs.s.FindTaskByID(ctx, taskID)
if err != nil {
return nil, err
}
tcs.lr.FindRunByID(ctx, task.Org, runID)
return nil, tcs.s.FinishRun(ctx, taskID, runID)
}
func (tcs *taskControlAdaptor) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
t, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}
var rtn = make([]*influxdb.Run, len(m.CurrentlyRunning))
for i, cr := range m.CurrentlyRunning {
rtn[i] = &influxdb.Run{
ID: influxdb.ID(cr.RunID),
TaskID: t.ID,
ScheduledFor: time.Unix(cr.Now, 0).UTC().Format(time.RFC3339),
}
if cr.RequestedAt != 0 {
rtn[i].RequestedAt = time.Unix(cr.RequestedAt, 0).UTC().Format(time.RFC3339)
}
}
return rtn, nil
}
func (tcs *taskControlAdaptor) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
t, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return nil, err
}
var rtn = make([]*influxdb.Run, len(m.ManualRuns))
for i, cr := range m.ManualRuns {
rtn[i] = &influxdb.Run{
ID: influxdb.ID(cr.RunID),
TaskID: t.ID,
ScheduledFor: time.Unix(cr.Start, 0).UTC().Format(time.RFC3339),
}
if cr.RequestedAt != 0 {
rtn[i].RequestedAt = time.Unix(cr.RequestedAt, 0).Format(time.RFC3339)
}
}
return rtn, nil
}
func (tcs *taskControlAdaptor) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
m, err := tcs.s.FindTaskMetaByID(ctx, taskID)
if err != nil {
return 0, err
}
return m.NextDueRun()
}
func (tcs *taskControlAdaptor) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state RunStatus) error {
st, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return err
}
var (
schedFor, reqAt time.Time
)
// check the log store
r, err := tcs.lr.FindRunByID(ctx, st.Org, runID)
if err == nil && r != nil {
schedFor, err = time.Parse(time.RFC3339, r.ScheduledFor)
if err != nil {
return err
}
if r.RequestedAt != "" {
reqAt, err = time.Parse(time.RFC3339, r.RequestedAt)
if err != nil {
return err
}
}
}
// in the old system the log store may not have the run until after the first
// state update, so we will need to pull the currently running.
if schedFor.IsZero() {
for _, cr := range m.CurrentlyRunning {
if influxdb.ID(cr.RunID) == runID {
schedFor = time.Unix(cr.Now, 0)
if cr.RequestedAt != 0 {
reqAt = time.Unix(cr.RequestedAt, 0)
}
}
}
}
rlb := RunLogBase{
Task: st,
RunID: runID,
RunScheduledFor: schedFor.Unix(),
RequestedAt: reqAt.Unix(),
}
if err := tcs.lw.UpdateRunState(ctx, rlb, when, state); err != nil {
return err
}
return nil
}
func (tcs *taskControlAdaptor) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error {
st, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return err
}
var (
schedFor, reqAt time.Time
)
r, err := tcs.lr.FindRunByID(ctx, st.Org, runID)
if err == nil && r != nil {
schedFor, err = time.Parse(time.RFC3339, r.ScheduledFor)
if err != nil {
return err
}
if r.RequestedAt != "" {
reqAt, err = time.Parse(time.RFC3339, r.RequestedAt)
if err != nil {
return err
}
}
}
// in the old system the log store may not have the run until after the first
// state update, so we will need to pull the currently running.
if schedFor.IsZero() {
for _, cr := range m.CurrentlyRunning {
if influxdb.ID(cr.RunID) == runID {
schedFor = time.Unix(cr.Now, 0)
if cr.RequestedAt != 0 {
reqAt = time.Unix(cr.RequestedAt, 0)
}
}
}
}
rlb := RunLogBase{
Task: st,
RunID: runID,
RunScheduledFor: schedFor.Unix(),
RequestedAt: reqAt.Unix(),
}
return tcs.lw.AddRunLog(ctx, rlb, when, log)
}
// ToInfluxTask converts a backend tas and meta to a influxdb.Task
// TODO(lh): remove this when we no longer need the backend store.
func ToInfluxTask(t *StoreTask, m *StoreTaskMeta) (*influxdb.Task, error) {
opts, err := options.FromScript(t.Script)
if err != nil {
return nil, err
}
pt := &influxdb.Task{
ID: t.ID,
OrganizationID: t.Org,
Name: t.Name,
Flux: t.Script,
Cron: opts.Cron,
AuthorizationID: influxdb.ID(m.AuthorizationID),
}
if opts.Every != 0 {
pt.Every = opts.Every.String()
}
if opts.Offset != nil && *opts.Offset != 0 {
pt.Offset = opts.Offset.String()
}
if m != nil {
pt.Status = string(m.Status)
pt.LatestCompleted = time.Unix(m.LatestCompleted, 0).UTC().Format(time.RFC3339)
if m.CreatedAt != 0 {
pt.CreatedAt = time.Unix(m.CreatedAt, 0).UTC().Format(time.RFC3339)
}
if m.UpdatedAt != 0 {
pt.UpdatedAt = time.Unix(m.UpdatedAt, 0).UTC().Format(time.RFC3339)
}
pt.AuthorizationID = influxdb.ID(m.AuthorizationID)
}
return pt, nil
}

View File

@ -3,16 +3,13 @@ package mock
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/influxdata/flux"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/task/backend"
scheduler "github.com/influxdata/influxdb/task/backend"
"go.uber.org/zap"
)
@ -22,28 +19,19 @@ type Scheduler struct {
lastTick int64
claims map[string]*Task
meta map[string]backend.StoreTaskMeta
claims map[platform.ID]*platform.Task
createChan chan *Task
releaseChan chan *Task
updateChan chan *Task
createChan chan *platform.Task
releaseChan chan *platform.Task
updateChan chan *platform.Task
claimError error
releaseError error
}
// Task is a mock implementation of a task.
type Task struct {
Script string
StartExecution int64
ConcurrencyLimit uint8
}
func NewScheduler() *Scheduler {
return &Scheduler{
claims: map[string]*Task{},
meta: map[string]backend.StoreTaskMeta{},
claims: map[platform.ID]*platform.Task{},
}
}
@ -60,7 +48,7 @@ func (s *Scheduler) Start(context.Context) {}
func (s *Scheduler) Stop() {}
func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error {
func (s *Scheduler) ClaimTask(_ context.Context, task *platform.Task) error {
if s.claimError != nil {
return s.claimError
}
@ -68,40 +56,33 @@ func (s *Scheduler) ClaimTask(task *backend.StoreTask, meta *backend.StoreTaskMe
s.Lock()
defer s.Unlock()
_, ok := s.claims[task.ID.String()]
_, ok := s.claims[task.ID]
if ok {
return backend.ErrTaskAlreadyClaimed
}
s.meta[task.ID.String()] = *meta
t := &Task{Script: task.Script, StartExecution: meta.LatestCompleted, ConcurrencyLimit: uint8(meta.MaxConcurrency)}
s.claims[task.ID.String()] = t
s.claims[task.ID] = task
if s.createChan != nil {
s.createChan <- t
s.createChan <- task
}
return nil
}
func (s *Scheduler) UpdateTask(task *backend.StoreTask, meta *backend.StoreTaskMeta) error {
func (s *Scheduler) UpdateTask(_ context.Context, task *platform.Task) error {
s.Lock()
defer s.Unlock()
_, ok := s.claims[task.ID.String()]
_, ok := s.claims[task.ID]
if !ok {
return backend.ErrTaskNotClaimed
}
s.meta[task.ID.String()] = *meta
t := &Task{Script: task.Script, StartExecution: meta.LatestCompleted, ConcurrencyLimit: uint8(meta.MaxConcurrency)}
s.claims[task.ID.String()] = t
s.claims[task.ID] = task
if s.updateChan != nil {
s.updateChan <- t
s.updateChan <- task
}
return nil
@ -115,7 +96,7 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error {
s.Lock()
defer s.Unlock()
t, ok := s.claims[taskID.String()]
t, ok := s.claims[taskID]
if !ok {
return backend.ErrTaskNotClaimed
}
@ -123,28 +104,27 @@ func (s *Scheduler) ReleaseTask(taskID platform.ID) error {
s.releaseChan <- t
}
delete(s.claims, taskID.String())
delete(s.meta, taskID.String())
delete(s.claims, taskID)
return nil
}
func (s *Scheduler) TaskFor(id platform.ID) *Task {
func (s *Scheduler) TaskFor(id platform.ID) *platform.Task {
s.Lock()
defer s.Unlock()
return s.claims[id.String()]
return s.claims[id]
}
func (s *Scheduler) TaskCreateChan() <-chan *Task {
s.createChan = make(chan *Task, 10)
func (s *Scheduler) TaskCreateChan() <-chan *platform.Task {
s.createChan = make(chan *platform.Task, 10)
return s.createChan
}
func (s *Scheduler) TaskReleaseChan() <-chan *Task {
s.releaseChan = make(chan *Task, 10)
func (s *Scheduler) TaskReleaseChan() <-chan *platform.Task {
s.releaseChan = make(chan *platform.Task, 10)
return s.releaseChan
}
func (s *Scheduler) TaskUpdateChan() <-chan *Task {
s.updateChan = make(chan *Task, 10)
func (s *Scheduler) TaskUpdateChan() <-chan *platform.Task {
s.updateChan = make(chan *platform.Task, 10)
return s.updateChan
}
@ -162,134 +142,6 @@ func (s *Scheduler) CancelRun(_ context.Context, taskID, runID platform.ID) erro
return nil
}
// DesiredState is a mock implementation of DesiredState (used by NewScheduler).
type DesiredState struct {
mu sync.Mutex
// Map of stringified task ID to last ID used for run.
runIDs map[string]uint64
// Map of stringified, concatenated task and platform ID, to runs that have been created.
created map[string]backend.QueuedRun
// Map of stringified task ID to task meta.
meta map[string]backend.StoreTaskMeta
// Map of task ID to total number of runs created for that task.
totalRunsCreated map[platform.ID]int
}
var _ backend.DesiredState = (*DesiredState)(nil)
func NewDesiredState() *DesiredState {
return &DesiredState{
runIDs: make(map[string]uint64),
created: make(map[string]backend.QueuedRun),
meta: make(map[string]backend.StoreTaskMeta),
totalRunsCreated: make(map[platform.ID]int),
}
}
// SetTaskMeta sets the task meta for the given task ID.
// SetTaskMeta must be called before CreateNextRun, for a given task ID.
func (d *DesiredState) SetTaskMeta(taskID platform.ID, meta backend.StoreTaskMeta) {
d.mu.Lock()
defer d.mu.Unlock()
d.meta[taskID.String()] = meta
}
// CreateNextRun creates the next run for the given task.
// Refer to the documentation for SetTaskPeriod to understand how the times are determined.
func (d *DesiredState) CreateNextRun(_ context.Context, taskID platform.ID, now int64) (backend.RunCreation, error) {
d.mu.Lock()
defer d.mu.Unlock()
if !taskID.Valid() {
return backend.RunCreation{}, errors.New("invalid task id")
}
tid := taskID.String()
meta, ok := d.meta[tid]
if !ok {
panic(fmt.Sprintf("meta not set for task with ID %s", tid))
}
makeID := func() (platform.ID, error) {
d.runIDs[tid]++
runID := platform.ID(d.runIDs[tid])
return runID, nil
}
rc, err := meta.CreateNextRun(now, makeID)
if err != nil {
return backend.RunCreation{}, err
}
d.meta[tid] = meta
rc.Created.TaskID = taskID
d.created[tid+rc.Created.RunID.String()] = rc.Created
d.totalRunsCreated[taskID]++
return rc, nil
}
func (d *DesiredState) FinishRun(_ context.Context, taskID, runID platform.ID) error {
d.mu.Lock()
defer d.mu.Unlock()
tid := taskID.String()
rid := runID.String()
m := d.meta[tid]
if !m.FinishRun(runID) {
var knownIDs []string
for _, r := range m.CurrentlyRunning {
knownIDs = append(knownIDs, platform.ID(r.RunID).String())
}
return fmt.Errorf("unknown run ID %s; known run IDs: %s", rid, strings.Join(knownIDs, ", "))
}
d.meta[tid] = m
delete(d.created, tid+rid)
return nil
}
func (d *DesiredState) CreatedFor(taskID platform.ID) []backend.QueuedRun {
d.mu.Lock()
defer d.mu.Unlock()
var qrs []backend.QueuedRun
for _, qr := range d.created {
if qr.TaskID == taskID {
qrs = append(qrs, qr)
}
}
return qrs
}
// TotalRunsCreatedForTask returns the number of runs created for taskID.
func (d *DesiredState) TotalRunsCreatedForTask(taskID platform.ID) int {
d.mu.Lock()
defer d.mu.Unlock()
return d.totalRunsCreated[taskID]
}
// PollForNumberCreated blocks for a small amount of time waiting for exactly the given count of created and unfinished runs for the given task ID.
// If the expected number isn't found in time, it returns an error.
//
// Because the scheduler and executor do a lot of state changes asynchronously, this is useful in test.
func (d *DesiredState) PollForNumberCreated(taskID platform.ID, count int) ([]scheduler.QueuedRun, error) {
const numAttempts = 50
actualCount := 0
var created []scheduler.QueuedRun
for i := 0; i < numAttempts; i++ {
time.Sleep(2 * time.Millisecond) // we sleep even on first so it becomes more likely that we catch when too many are produced.
created = d.CreatedFor(taskID)
actualCount = len(created)
if actualCount == count {
return created, nil
}
}
return created, fmt.Errorf("did not see count of %d created run(s) for task with ID %s in time, instead saw %d", count, taskID.String(), actualCount) // we return created anyways, to make it easier to debug
}
type Executor struct {
mu sync.Mutex
hangingFor time.Duration
@ -385,7 +237,7 @@ func (e *Executor) PollForNumberRunning(taskID platform.ID, count int) ([]*RunPr
return running, nil
}
}
return nil, fmt.Errorf("did not see count of %d running task(s) for ID %s in time; last count was %d", count, taskID.String(), len(running))
return nil, fmt.Errorf("did not see count of %d running task(s) for ID %s in time; last count was %d", count, taskID, len(running))
}
// RunPromise is a mock RunPromise.

View File

@ -0,0 +1,346 @@
package mock
import (
"context"
"errors"
"fmt"
"sort"
"sync"
"time"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/snowflake"
"github.com/influxdata/influxdb/task/backend"
cron "gopkg.in/robfig/cron.v2"
)
var idgen = snowflake.NewDefaultIDGenerator()
// TaskControlService is a mock implementation of TaskControlService (used by NewScheduler).
type TaskControlService struct {
mu sync.Mutex
// Map of stringified task ID to last ID used for run.
runs map[influxdb.ID]map[influxdb.ID]*influxdb.Run
// Map of stringified, concatenated task and platform ID, to runs that have been created.
created map[string]backend.QueuedRun
// Map of stringified task ID to task meta.
tasks map[influxdb.ID]*influxdb.Task
manualRuns []*influxdb.Run
// Map of task ID to total number of runs created for that task.
totalRunsCreated map[influxdb.ID]int
finishedRuns map[influxdb.ID]*influxdb.Run
}
var _ backend.TaskControlService = (*TaskControlService)(nil)
func NewTaskControlService() *TaskControlService {
return &TaskControlService{
runs: make(map[influxdb.ID]map[influxdb.ID]*influxdb.Run),
finishedRuns: make(map[influxdb.ID]*influxdb.Run),
tasks: make(map[influxdb.ID]*influxdb.Task),
created: make(map[string]backend.QueuedRun),
totalRunsCreated: make(map[influxdb.ID]int),
}
}
// SetTask sets the task.
// SetTask must be called before CreateNextRun, for a given task ID.
func (d *TaskControlService) SetTask(task *influxdb.Task) {
d.mu.Lock()
defer d.mu.Unlock()
d.tasks[task.ID] = task
}
func (d *TaskControlService) SetManualRuns(runs []*influxdb.Run) {
d.manualRuns = runs
}
// CreateNextRun creates the next run for the given task.
// Refer to the documentation for SetTaskPeriod to understand how the times are determined.
func (d *TaskControlService) CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) {
d.mu.Lock()
defer d.mu.Unlock()
if !taskID.Valid() {
return backend.RunCreation{}, errors.New("invalid task id")
}
tid := taskID
task, ok := d.tasks[tid]
if !ok {
panic(fmt.Sprintf("meta not set for task with ID %s", tid))
}
if len(d.manualRuns) != 0 {
run := d.manualRuns[0]
d.manualRuns = d.manualRuns[1:]
runs, ok := d.runs[tid]
if !ok {
runs = make(map[influxdb.ID]*influxdb.Run)
}
runs[run.ID] = run
d.runs[task.ID] = runs
now, err := time.Parse(time.RFC3339, run.ScheduledFor)
next, _ := d.nextDueRun(ctx, taskID)
if err == nil {
rc := backend.RunCreation{
Created: backend.QueuedRun{
TaskID: task.ID,
RunID: run.ID,
Now: now.Unix(),
},
NextDue: next,
HasQueue: len(d.manualRuns) != 0,
}
d.created[tid.String()+rc.Created.RunID.String()] = rc.Created
d.totalRunsCreated[taskID]++
return rc, nil
}
}
rc, err := d.createNextRun(task, now)
if err != nil {
return backend.RunCreation{}, err
}
rc.Created.TaskID = taskID
d.created[tid.String()+rc.Created.RunID.String()] = rc.Created
d.totalRunsCreated[taskID]++
return rc, nil
}
func (t *TaskControlService) createNextRun(task *influxdb.Task, now int64) (backend.RunCreation, error) {
sch, err := cron.Parse(task.EffectiveCron())
if err != nil {
return backend.RunCreation{}, err
}
latest := int64(0)
lt, err := time.Parse(time.RFC3339, task.LatestCompleted)
if err == nil {
latest = lt.Unix()
}
for _, r := range t.runs[task.ID] {
rt, err := time.Parse(time.RFC3339, r.ScheduledFor)
if err == nil {
if rt.Unix() > latest {
latest = rt.Unix()
}
}
}
nextScheduled := sch.Next(time.Unix(latest, 0))
nextScheduledUnix := nextScheduled.Unix()
offset := int64(0)
if task.Offset != "" {
toff, err := time.ParseDuration(task.Offset)
if err == nil {
offset = toff.Nanoseconds()
}
}
if dueAt := nextScheduledUnix + int64(offset); dueAt > now {
return backend.RunCreation{}, backend.RunNotYetDueError{DueAt: dueAt}
}
runID := idgen.ID()
runs, ok := t.runs[task.ID]
if !ok {
runs = make(map[influxdb.ID]*influxdb.Run)
}
runs[runID] = &influxdb.Run{
ID: runID,
ScheduledFor: nextScheduled.Format(time.RFC3339),
}
t.runs[task.ID] = runs
return backend.RunCreation{
Created: backend.QueuedRun{
RunID: runID,
Now: nextScheduledUnix,
},
NextDue: sch.Next(nextScheduled).Unix() + offset,
HasQueue: false,
}, nil
}
func (d *TaskControlService) FinishRun(_ context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
d.mu.Lock()
defer d.mu.Unlock()
tid := taskID
rid := runID
r := d.runs[tid][rid]
delete(d.runs[tid], rid)
t := d.tasks[tid]
schedFor, err := time.Parse(time.RFC3339, r.ScheduledFor)
if err != nil {
return nil, err
}
var latest time.Time
if t.LatestCompleted != "" {
latest, err = time.Parse(time.RFC3339, t.LatestCompleted)
if err != nil {
return nil, err
}
}
if schedFor.After(latest) {
t.LatestCompleted = r.ScheduledFor
}
d.finishedRuns[rid] = r
delete(d.created, tid.String()+rid.String())
return r, nil
}
func (t *TaskControlService) CurrentlyRunning(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
t.mu.Lock()
defer t.mu.Unlock()
rtn := []*influxdb.Run{}
for _, run := range t.runs[taskID] {
rtn = append(rtn, run)
}
return rtn, nil
}
func (t *TaskControlService) ManualRuns(ctx context.Context, taskID influxdb.ID) ([]*influxdb.Run, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.manualRuns != nil {
return t.manualRuns, nil
}
return []*influxdb.Run{}, nil
}
// NextDueRun returns the Unix timestamp of when the next call to CreateNextRun will be ready.
// The returned timestamp reflects the task's offset, so it does not necessarily exactly match the schedule time.
func (d *TaskControlService) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
d.mu.Lock()
defer d.mu.Unlock()
return d.nextDueRun(ctx, taskID)
}
func (d *TaskControlService) nextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
task := d.tasks[taskID]
sch, err := cron.Parse(task.EffectiveCron())
if err != nil {
return 0, err
}
latest := int64(0)
lt, err := time.Parse(time.RFC3339, task.LatestCompleted)
if err == nil {
latest = lt.Unix()
}
for _, r := range d.runs[task.ID] {
rt, err := time.Parse(time.RFC3339, r.ScheduledFor)
if err == nil {
if rt.Unix() > latest {
latest = rt.Unix()
}
}
}
nextScheduled := sch.Next(time.Unix(latest, 0))
nextScheduledUnix := nextScheduled.Unix()
offset := int64(0)
if task.Offset != "" {
toff, err := time.ParseDuration(task.Offset)
if err == nil {
offset = toff.Nanoseconds()
}
}
return nextScheduledUnix + int64(offset), nil
}
// UpdateRunState sets the run state at the respective time.
func (d *TaskControlService) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
d.mu.Lock()
defer d.mu.Unlock()
run := d.runs[taskID][runID]
switch state {
case backend.RunStarted:
run.StartedAt = when.Format(time.RFC3339Nano)
case backend.RunSuccess, backend.RunFail, backend.RunCanceled:
run.FinishedAt = when.Format(time.RFC3339Nano)
case backend.RunScheduled:
// nothing
default:
panic("invalid status")
}
run.Status = state.String()
return nil
}
// AddRunLog adds a log line to the run.
func (d *TaskControlService) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error {
d.mu.Lock()
defer d.mu.Unlock()
run := d.runs[taskID][runID]
if run == nil {
panic("cannot add a log to a non existant run")
}
run.Log = append(run.Log, influxdb.Log{Time: when.Format(time.RFC3339Nano), Message: log})
return nil
}
func (d *TaskControlService) CreatedFor(taskID influxdb.ID) []backend.QueuedRun {
d.mu.Lock()
defer d.mu.Unlock()
var qrs []backend.QueuedRun
for _, qr := range d.created {
if qr.TaskID == taskID {
qrs = append(qrs, qr)
}
}
return qrs
}
// TotalRunsCreatedForTask returns the number of runs created for taskID.
func (d *TaskControlService) TotalRunsCreatedForTask(taskID influxdb.ID) int {
d.mu.Lock()
defer d.mu.Unlock()
return d.totalRunsCreated[taskID]
}
// PollForNumberCreated blocks for a small amount of time waiting for exactly the given count of created and unfinished runs for the given task ID.
// If the expected number isn't found in time, it returns an error.
//
// Because the scheduler and executor do a lot of state changes asynchronously, this is useful in test.
func (d *TaskControlService) PollForNumberCreated(taskID influxdb.ID, count int) ([]backend.QueuedRun, error) {
const numAttempts = 50
actualCount := 0
var created []backend.QueuedRun
for i := 0; i < numAttempts; i++ {
time.Sleep(2 * time.Millisecond) // we sleep even on first so it becomes more likely that we catch when too many are produced.
created = d.CreatedFor(taskID)
actualCount = len(created)
if actualCount == count {
return created, nil
}
}
return created, fmt.Errorf("did not see count of %d created run(s) for task with ID %s in time, instead saw %d", count, taskID, actualCount) // we return created anyways, to make it easier to debug
}
func (d *TaskControlService) FinishedRun(runID influxdb.ID) *influxdb.Run {
d.mu.Lock()
defer d.mu.Unlock()
return d.finishedRuns[runID]
}
func (d *TaskControlService) FinishedRuns() []*influxdb.Run {
rtn := []*influxdb.Run{}
for _, run := range d.finishedRuns {
rtn = append(rtn, run)
}
sort.Slice(rtn, func(i, j int) bool { return rtn[i].ScheduledFor < rtn[j].ScheduledFor })
return rtn
}

View File

@ -27,7 +27,7 @@ func inMemFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
i := inmem.NewService()
return &servicetest.System{
TaskControlService: servicetest.TaskControlAdaptor(st, lrw, lrw),
TaskControlService: backend.TaskControlAdaptor(st, lrw, lrw),
Ctx: ctx,
I: i,
TaskService: servicetest.UsePlatformAdaptor(st, lrw, mock.NewScheduler(), i),
@ -63,7 +63,7 @@ func boltFactory(t *testing.T) (*servicetest.System, context.CancelFunc) {
i := inmem.NewService()
return &servicetest.System{
TaskControlService: servicetest.TaskControlAdaptor(st, lrw, lrw),
TaskControlService: backend.TaskControlAdaptor(st, lrw, lrw),
TaskService: servicetest.UsePlatformAdaptor(st, lrw, mock.NewScheduler(), i),
Ctx: ctx,
I: i,

View File

@ -42,99 +42,6 @@ func UsePlatformAdaptor(s backend.Store, lr backend.LogReader, rc task.RunContro
return task.PlatformAdapter(s, lr, rc, i, i, i)
}
// TaskControlAdaptor creates a TaskControlService for the older TaskStore system.
func TaskControlAdaptor(s backend.Store, lw backend.LogWriter, lr backend.LogReader) backend.TaskControlService {
return &taskControlAdaptor{s, lw, lr}
}
// taskControlAdaptor adapts a backend.Store and log readers and writers to implement the task control service.
type taskControlAdaptor struct {
s backend.Store
lw backend.LogWriter
lr backend.LogReader
}
func (tcs *taskControlAdaptor) CreateNextRun(ctx context.Context, taskID influxdb.ID, now int64) (backend.RunCreation, error) {
return tcs.s.CreateNextRun(ctx, taskID, now)
}
func (tcs *taskControlAdaptor) FinishRun(ctx context.Context, taskID, runID influxdb.ID) (*influxdb.Run, error) {
// the tests aren't looking for a returned Run because the old system didn't return one
// Once we completely switch over to the new system we can look at the returned run in the tests.
return nil, tcs.s.FinishRun(ctx, taskID, runID)
}
func (tcs *taskControlAdaptor) NextDueRun(ctx context.Context, taskID influxdb.ID) (int64, error) {
_, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return 0, err
}
return m.NextDueRun()
}
func (tcs *taskControlAdaptor) UpdateRunState(ctx context.Context, taskID, runID influxdb.ID, when time.Time, state backend.RunStatus) error {
st, m, err := tcs.s.FindTaskByIDWithMeta(ctx, taskID)
if err != nil {
return err
}
var (
schedFor, reqAt time.Time
)
// check the log store
r, err := tcs.lr.FindRunByID(ctx, st.Org, runID)
if err == nil {
schedFor, _ = time.Parse(time.RFC3339, r.ScheduledFor)
reqAt, _ = time.Parse(time.RFC3339, r.RequestedAt)
}
// in the old system the log store may not have the run until after the first
// state update, so we will need to pull the currently running.
if schedFor.IsZero() {
for _, cr := range m.CurrentlyRunning {
if influxdb.ID(cr.RunID) == runID {
schedFor = time.Unix(cr.Now, 0)
reqAt = time.Unix(cr.RequestedAt, 0)
}
}
}
rlb := backend.RunLogBase{
Task: st,
RunID: runID,
RunScheduledFor: schedFor.Unix(),
RequestedAt: reqAt.Unix(),
}
if err := tcs.lw.UpdateRunState(ctx, rlb, when, state); err != nil {
return err
}
return nil
}
func (tcs *taskControlAdaptor) AddRunLog(ctx context.Context, taskID, runID influxdb.ID, when time.Time, log string) error {
st, err := tcs.s.FindTaskByID(ctx, taskID)
if err != nil {
return err
}
r, err := tcs.lr.FindRunByID(ctx, st.Org, runID)
if err != nil {
return err
}
schFor, _ := time.Parse(time.RFC3339, r.ScheduledFor)
reqAt, _ := time.Parse(time.RFC3339, r.RequestedAt)
rlb := backend.RunLogBase{
Task: st,
RunID: runID,
RunScheduledFor: schFor.Unix(),
RequestedAt: reqAt.Unix(),
}
return tcs.lw.AddRunLog(ctx, rlb, when, log)
}
// TestTaskService should be called by consumers of the servicetest package.
// This will call fn once to create a single influxdb.TaskService
// used across all subtests in TestTaskService.
@ -176,6 +83,10 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory) {
t.Parallel()
testMetaUpdate(t, sys)
})
t.Run("Task Manual Run", func(t *testing.T) {
t.Parallel()
testManualRun(t, sys)
})
})
}
@ -1059,6 +970,49 @@ func testTaskConcurrency(t *testing.T, sys *System) {
extraWg.Wait()
}
func testManualRun(t *testing.T, s *System) {
cr := creds(t, s)
// Create a task.
tc := influxdb.TaskCreate{
OrganizationID: cr.OrgID,
Flux: fmt.Sprintf(scriptFmt, 0),
Token: cr.Token,
}
authorizedCtx := icontext.SetAuthorizer(s.Ctx, cr.Authorizer())
tsk, err := s.TaskService.CreateTask(authorizedCtx, tc)
if err != nil {
t.Fatal(err)
}
if !tsk.ID.Valid() {
t.Fatal("no task ID set")
}
scheduledFor := time.Now().UTC()
run, err := s.TaskService.ForceRun(authorizedCtx, tsk.ID, scheduledFor.Unix())
if err != nil {
t.Fatal(err)
}
if run.ScheduledFor != scheduledFor.Format(time.RFC3339) {
t.Fatalf("force run returned a different scheduled for time expected: %s, got %s", scheduledFor.Format(time.RFC3339), run.ScheduledFor)
}
runs, err := s.TaskControlService.ManualRuns(authorizedCtx, tsk.ID)
if err != nil {
t.Fatal(err)
}
if len(runs) != 1 {
t.Fatalf("expected 1 manual run: got %d", len(runs))
}
if runs[0].ID != run.ID {
diff := cmp.Diff(runs[0], run)
t.Fatalf("manual run missmatch: %s", diff)
}
}
func creds(t *testing.T, s *System) TestCreds {
t.Helper()