Merge pull request #529 from influxdata/task/refactor-logwriter

feat(task): update the scheduler and logwriter interface
pull/10616/head
Lyon Hill 2018-08-01 16:15:09 -06:00 committed by GitHub
commit 6280b2a5d1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 181 additions and 168 deletions

View File

@ -49,7 +49,7 @@ func NewInMemRunReaderWriter() *runReaderWriter {
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}}
}
func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, status RunStatus) error {
func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, status RunStatus) error {
r.mu.Lock()
defer r.mu.Unlock()
timeSetter := func(r *platform.Run) {
@ -69,7 +69,7 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID plat
run := &platform.Run{ID: runID, Status: status.String()}
timeSetter(run)
r.byRunID[runID.String()] = run
r.byTaskID[taskID.String()] = append(r.byTaskID[taskID.String()], run)
r.byTaskID[task.ID.String()] = append(r.byTaskID[task.ID.String()], run)
return nil
}
@ -78,7 +78,7 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID plat
return nil
}
func (r *runReaderWriter) AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error {
func (r *runReaderWriter) AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error {
r.mu.Lock()
defer r.mu.Unlock()

View File

@ -90,9 +90,7 @@ type Scheduler interface {
// you can set startExecutionFrom in the past to backfill a task.
// concurrencyLimit is how many runs may be concurrently queued or executing.
// concurrencyLimit must be positive.
// TODO(mr): concurrencyLimit should become a script option rather than explicit.
ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) error
ClaimTask(task *StoreTask, startExecutionFrom int64, opt *options.Options) error
// ReleaseTask immediately cancels any in-progress runs for the given task ID,
// and releases any resources related to management of that task.
ReleaseTask(taskID platform.ID) error
@ -191,14 +189,9 @@ func (s *outerScheduler) Tick(now int64) {
}
}
func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) (err error) {
func (s *outerScheduler) ClaimTask(task *StoreTask, startExecutionFrom int64, opts *options.Options) (err error) {
defer s.metrics.ClaimTask(err == nil)
opts, err := options.FromScript(script)
if err != nil {
return err
}
timer := opts.Cron
if timer == "" {
@ -219,10 +212,10 @@ func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecu
ts := newTaskScheduler(
s,
taskID,
task,
sch,
startExecutionFrom,
concurrencyLimit,
uint8(opts.Concurrency),
)
if s.cronTimer != nil {
@ -236,13 +229,13 @@ func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecu
}
s.mu.Lock()
_, ok := s.tasks[taskID.String()]
_, ok := s.tasks[task.ID.String()]
if ok {
s.mu.Unlock()
return errors.New("task has already been claimed")
}
s.tasks[taskID.String()] = ts
s.tasks[task.ID.String()] = ts
s.mu.Unlock()
@ -278,8 +271,8 @@ func (s *outerScheduler) PrometheusCollectors() []prometheus.Collector {
// taskScheduler is a lightweight wrapper around a collection of runners.
type taskScheduler struct {
// ID of task.
id platform.ID
// Task we are scheduling for.
task *StoreTask
// Seconds since UTC epoch.
now int64
@ -301,7 +294,7 @@ type taskScheduler struct {
func newTaskScheduler(
s *outerScheduler,
taskID platform.ID,
task *StoreTask,
cron cron.Schedule,
startExecutionFrom int64,
concurrencyLimit uint8,
@ -309,11 +302,11 @@ func newTaskScheduler(
firstScheduled := cron.Next(time.Unix(startExecutionFrom, 0).UTC()).Unix()
ctx, cancel := context.WithCancel(context.Background())
ts := &taskScheduler{
id: taskID,
task: task,
now: startExecutionFrom,
cancel: cancel,
runners: make([]*runner, concurrencyLimit),
logger: s.logger.With(zap.String("task_id", taskID.String())),
logger: s.logger.With(zap.String("task_id", task.ID.String())),
}
tt := &taskTimer{
@ -327,7 +320,7 @@ func newTaskScheduler(
}
for i := range ts.runners {
ts.runners[i] = newRunner(ctx, ts.logger, taskID, s.desiredState, s.executor, s.logWriter, tt)
ts.runners[i] = newRunner(ctx, ts.logger, task, s.desiredState, s.executor, s.logWriter, tt)
}
return ts
@ -409,7 +402,7 @@ type runner struct {
// Cancelable context from parent taskScheduler.
ctx context.Context
taskID platform.ID
task *StoreTask
desiredState DesiredState
executor Executor
@ -423,7 +416,7 @@ type runner struct {
func newRunner(
ctx context.Context,
logger *zap.Logger,
taskID platform.ID,
task *StoreTask,
desiredState DesiredState,
executor Executor,
logWriter LogWriter,
@ -432,7 +425,7 @@ func newRunner(
return &runner{
ctx: ctx,
state: new(uint32),
taskID: taskID,
task: task,
desiredState: desiredState,
executor: executor,
logWriter: logWriter,
@ -475,7 +468,7 @@ func (r *runner) startFromWorking() {
if next, ready := r.tt.NextScheduledRun(); ready {
// It's possible that two runners may attempt to create the same run for this "next" timestamp,
// but the contract of DesiredState requires that only one succeeds.
qr, err := r.desiredState.CreateRun(r.ctx, r.taskID, next)
qr, err := r.desiredState.CreateRun(r.ctx, r.task.ID, next)
if err != nil {
r.logger.Info("Failed to create run", zap.Error(err))
atomic.StoreUint32(r.state, runnerIdle)
@ -552,11 +545,11 @@ func (r *runner) executeAndWait(qr QueuedRun) {
func (r *runner) updateRunState(qr QueuedRun, s RunStatus) {
switch s {
case RunStarted:
r.tt.metrics.StartRun(r.taskID.String())
r.tt.metrics.StartRun(r.task.ID.String())
case RunSuccess:
r.tt.metrics.FinishRun(r.taskID.String(), true)
r.tt.metrics.FinishRun(r.task.ID.String(), true)
case RunFail, RunCanceled:
r.tt.metrics.FinishRun(r.taskID.String(), false)
r.tt.metrics.FinishRun(r.task.ID.String(), false)
default:
// We are deliberately not handling RunQueued yet.
// There is not really a notion of being queued in this runner architecture.
@ -567,7 +560,7 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus) {
// If we start seeing errors from this, we know the time limit is too short or the system is overloaded.
ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond)
defer cancel()
if err := r.logWriter.UpdateRunState(ctx, r.taskID, qr.RunID, time.Now(), s); err != nil {
if err := r.logWriter.UpdateRunState(ctx, r.task, qr.RunID, time.Now(), s); err != nil {
r.logger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err))
}
}

View File

@ -4,6 +4,7 @@ import (
"context"
"errors"
"testing"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/prom"
@ -11,55 +12,35 @@ import (
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/mock"
)
const (
scriptEveryMinute = `option task = {
name: "name",
cron: "* * * * *",
}
// (every minute on the minute)
from(bucket:"b") |> toHTTP(url: "http://example.com")`
scriptEverySecond = `option task = {
name: "name",
every: 1s,
}
from(bucket:"b") |> toHTTP(url: "http://example.com")`
"github.com/influxdata/platform/task/options"
)
func TestScheduler_EveryValidation(t *testing.T) {
d := mock.NewDesiredState()
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
tid := platform.ID{1}
badScripts := []string{
`option task = {
name: "name",
every: 1ms,
}
from(bucket:"b") |> toHTTP(url: "http://example.com")`,
`option task = {
name: "name",
every: -1h,
}
from(bucket:"b") |> toHTTP(url: "http://example.com")`,
`option task = {
name: "name",
every: 1500ms,
}
from(bucket:"b") |> toHTTP(url: "http://example.com")`,
`option task = {
name: "name",
every: 12.32s,
}
from(bucket:"b") |> toHTTP(url: "http://example.com")`,
task := &backend.StoreTask{
ID: platform.ID{1},
}
for _, badScript := range badScripts {
if err := o.ClaimTask(tid, badScript, 3, 99); err == nil {
t.Fatal("no error returned for :", badScript)
badOptions := []options.Options{
{
Every: time.Millisecond,
},
{
Every: time.Hour * -1,
},
{
Every: 1500 * time.Millisecond,
},
{
Every: 1232 * time.Millisecond,
},
}
for _, badOption := range badOptions {
if err := o.ClaimTask(task, 3, &badOption); err == nil {
t.Fatal("no error returned for :", badOption)
}
}
}
@ -69,23 +50,29 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) {
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
tid := platform.ID{1}
if err := o.ClaimTask(tid, scriptEveryMinute, 3, 99); err != nil {
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Minute}
if err := o.ClaimTask(task, 3, opts); err != nil {
t.Fatal(err)
}
// No valid timestamps between 3 and 5 for every minute.
if n := len(d.CreatedFor(tid)); n > 0 {
if n := len(d.CreatedFor(task.ID)); n > 0 {
t.Fatalf("expected no runs queued, but got %d", n)
}
// For every second, can queue for timestamps 4 and 5.
tid = platform.ID{2}
if err := o.ClaimTask(tid, scriptEverySecond, 3, 5); err != nil {
task = &backend.StoreTask{
ID: platform.ID{2},
}
opts = &options.Options{Every: time.Second, Concurrency: 99}
if err := o.ClaimTask(task, 3, opts); err != nil {
t.Fatal(err)
}
if n := len(d.CreatedFor(tid)); n != 2 {
if n := len(d.CreatedFor(task.ID)); n != 2 {
t.Fatalf("expected 2 runs queued for 'every 1s' script, but got %d", n)
}
}
@ -95,36 +82,40 @@ func TestScheduler_CreateRunOnTick(t *testing.T) {
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
tid := platform.ID{1}
if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 2}
if err := o.ClaimTask(task, 5, opts); err != nil {
t.Fatal(err)
}
if x, err := d.PollForNumberCreated(tid, 0); err != nil {
if x, err := d.PollForNumberCreated(task.ID, 0); err != nil {
t.Fatalf("expected no runs queued, but got %d", len(x))
}
o.Tick(6)
if x, err := d.PollForNumberCreated(tid, 1); err != nil {
if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
t.Fatalf("expected 1 run queued, but got %d", len(x))
}
running, err := e.PollForNumberRunning(tid, 1)
running, err := e.PollForNumberRunning(task.ID, 1)
if err != nil {
t.Fatal(err)
}
run6 := running[0]
o.Tick(7)
if x, err := d.PollForNumberCreated(tid, 2); err != nil {
if x, err := d.PollForNumberCreated(task.ID, 2); err != nil {
t.Fatalf("expected 2 runs queued, but got %d", len(x))
}
o.Tick(8) // Can't exceed concurrency of 2.
if x, err := d.PollForNumberCreated(tid, 2); err != nil {
if x, err := d.PollForNumberCreated(task.ID, 2); err != nil {
t.Fatalf("expected 2 runs queued, but got %d", len(x))
}
run6.Cancel()
if x, err := d.PollForNumberCreated(tid, 1); err != nil {
if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
t.Fatal(err, x)
}
}
@ -134,22 +125,26 @@ func TestScheduler_Release(t *testing.T) {
e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
tid := platform.ID{1}
if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 99}
if err := o.ClaimTask(task, 5, opts); err != nil {
t.Fatal(err)
}
o.Tick(6)
o.Tick(7)
if n := len(d.CreatedFor(tid)); n != 2 {
if n := len(d.CreatedFor(task.ID)); n != 2 {
t.Fatalf("expected 2 runs queued, but got %d", n)
}
if err := o.ReleaseTask(tid); err != nil {
if err := o.ReleaseTask(task.ID); err != nil {
t.Fatal(err)
}
if _, err := d.PollForNumberCreated(tid, 0); err != nil {
if _, err := d.PollForNumberCreated(task.ID, 0); err != nil {
t.Fatal(err)
}
}
@ -161,22 +156,26 @@ func TestScheduler_RunLog(t *testing.T) {
s := backend.NewScheduler(d, e, rl, 5)
// Claim a task that starts later.
tid := platform.ID{1}
if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 99}
if err := s.ClaimTask(task, 5, opts); err != nil {
t.Fatal(err)
}
if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}); err != backend.ErrRunNotFound {
if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err != backend.ErrRunNotFound {
t.Fatal(err)
}
s.Tick(6)
promises, err := e.PollForNumberRunning(tid, 1)
promises, err := e.PollForNumberRunning(task.ID, 1)
if err != nil {
t.Fatal(err)
}
runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid})
runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil {
t.Fatal(err)
}
@ -190,11 +189,11 @@ func TestScheduler_RunLog(t *testing.T) {
// Finish with success.
promises[0].Finish(mock.NewRunResult(nil, false), nil)
if _, err := e.PollForNumberRunning(tid, 0); err != nil {
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid})
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil {
t.Fatal(err)
}
@ -208,12 +207,12 @@ func TestScheduler_RunLog(t *testing.T) {
// Create a new run, but fail this time.
s.Tick(7)
promises, err = e.PollForNumberRunning(tid, 1)
promises, err = e.PollForNumberRunning(task.ID, 1)
if err != nil {
t.Fatal(err)
}
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid})
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil {
t.Fatal(err)
}
@ -227,11 +226,11 @@ func TestScheduler_RunLog(t *testing.T) {
// Finish with failure.
promises[0].Finish(nil, errors.New("forced failure"))
if _, err := e.PollForNumberRunning(tid, 0); err != nil {
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid})
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil {
t.Fatal(err)
}
@ -245,12 +244,12 @@ func TestScheduler_RunLog(t *testing.T) {
// One more run, but cancel this time.
s.Tick(8)
promises, err = e.PollForNumberRunning(tid, 1)
promises, err = e.PollForNumberRunning(task.ID, 1)
if err != nil {
t.Fatal(err)
}
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid})
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil {
t.Fatal(err)
}
@ -264,11 +263,11 @@ func TestScheduler_RunLog(t *testing.T) {
// Finish with failure.
promises[0].Cancel()
if _, err := e.PollForNumberRunning(tid, 0); err != nil {
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid})
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil {
t.Fatal(err)
}
@ -292,8 +291,12 @@ func TestScheduler_Metrics(t *testing.T) {
reg.MustRegister(s.(prom.PrometheusCollector).PrometheusCollectors()...)
// Claim a task that starts later.
tid := platform.ID{1}
if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 99}
if err := s.ClaimTask(task, 5, opts); err != nil {
t.Fatal(err)
}
@ -309,7 +312,7 @@ func TestScheduler_Metrics(t *testing.T) {
}
s.Tick(6)
if _, err := e.PollForNumberRunning(tid, 1); err != nil {
if _, err := e.PollForNumberRunning(task.ID, 1); err != nil {
t.Fatal(err)
}
@ -319,13 +322,13 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 total run active, got %v", got)
}
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()})
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 run active for task ID %s, got %v", tid.String(), got)
t.Fatalf("expected 1 run active for task ID %s, got %v", task.ID.String(), got)
}
s.Tick(7)
if _, err := e.PollForNumberRunning(tid, 2); err != nil {
if _, err := e.PollForNumberRunning(task.ID, 2); err != nil {
t.Fatal(err)
}
@ -334,14 +337,14 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 2 {
t.Fatalf("expected 2 total runs active, got %v", got)
}
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()})
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 2 {
t.Fatalf("expected 2 runs active for task ID %s, got %v", tid.String(), got)
t.Fatalf("expected 2 runs active for task ID %s, got %v", task.ID.String(), got)
}
// Runs active decreases as run finishes.
e.RunningFor(tid)[0].Finish(mock.NewRunResult(nil, false), nil)
if _, err := e.PollForNumberRunning(tid, 1); err != nil {
e.RunningFor(task.ID)[0].Finish(mock.NewRunResult(nil, false), nil)
if _, err := e.PollForNumberRunning(task.ID, 1); err != nil {
t.Fatal(err)
}
mfs = promtest.MustGather(t, reg)
@ -349,17 +352,17 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 total run active, got %v", got)
}
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()})
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 run active for task ID %s, got %v", tid.String(), got)
t.Fatalf("expected 1 run active for task ID %s, got %v", task.ID.String(), got)
}
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "success"})
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "success"})
if got := *m.Counter.Value; got != 1 {
t.Fatalf("expected 1 run succeeded for task ID %s, got %v", tid.String(), got)
t.Fatalf("expected 1 run succeeded for task ID %s, got %v", task.ID.String(), got)
}
e.RunningFor(tid)[0].Finish(mock.NewRunResult(nil, false), errors.New("failed to execute"))
if _, err := e.PollForNumberRunning(tid, 0); err != nil {
e.RunningFor(task.ID)[0].Finish(mock.NewRunResult(nil, false), errors.New("failed to execute"))
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err)
}
mfs = promtest.MustGather(t, reg)
@ -367,27 +370,27 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 0 {
t.Fatalf("expected 0 total runs active, got %v", got)
}
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()})
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 0 {
t.Fatalf("expected 0 runs active for task ID %s, got %v", tid.String(), got)
t.Fatalf("expected 0 runs active for task ID %s, got %v", task.ID.String(), got)
}
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "failure"})
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "failure"})
if got := *m.Counter.Value; got != 1 {
t.Fatalf("expected 1 run failed for task ID %s, got %v", tid.String(), got)
t.Fatalf("expected 1 run failed for task ID %s, got %v", task.ID.String(), got)
}
// Runs label removed after task released.
if err := s.ReleaseTask(tid); err != nil {
if err := s.ReleaseTask(task.ID); err != nil {
t.Fatal(err)
}
mfs = promtest.MustGather(t, reg)
if m := promtest.FindMetric(mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}); m != nil {
if m := promtest.FindMetric(mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()}); m != nil {
t.Fatalf("expected metric to be removed after releasing a task, got %v", m)
}
if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "success"}); m != nil {
if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "success"}); m != nil {
t.Fatalf("expected metric to be removed after releasing a task, got %v", m)
}
if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "failure"}); m != nil {
if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "failure"}); m != nil {
t.Fatalf("expected metric to be removed after releasing a task, got %v", m)
}

View File

@ -62,20 +62,20 @@ type Store interface {
// LogWriter writes task logs and task state changes to a store.
type LogWriter interface {
// UpdateRunState sets the run state and the respective time.
UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, state RunStatus) error
UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, state RunStatus) error
// AddRunLog adds a log line to the run.
AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error
AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error
}
// NopLogWriter is a LogWriter that doesn't do anything when its methods are called.
// This is useful for test, but not much else.
type NopLogWriter struct{}
func (NopLogWriter) UpdateRunState(context.Context, platform.ID, platform.ID, time.Time, RunStatus) error {
func (NopLogWriter) UpdateRunState(context.Context, *StoreTask, platform.ID, time.Time, RunStatus) error {
return nil
}
func (NopLogWriter) AddRunLog(context.Context, platform.ID, platform.ID, time.Time, string) error {
func (NopLogWriter) AddRunLog(context.Context, *StoreTask, platform.ID, time.Time, string) error {
return nil
}

View File

@ -40,7 +40,10 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
writer, reader := crf(t)
defer drf(t, writer, reader)
taskID := platform.ID([]byte("task"))
task := &backend.StoreTask{
ID: platform.ID([]byte("ab01ab01ab01ab01")),
Org: platform.ID([]byte("ab01ab01ab01ab05")),
}
queuedAt := time.Unix(1, 0)
run := platform.Run{
ID: platform.ID([]byte("run")),
@ -48,12 +51,12 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
QueuedAt: queuedAt.Format(time.RFC3339),
}
err := writer.UpdateRunState(context.Background(), taskID, run.ID, queuedAt, backend.RunQueued)
err := writer.UpdateRunState(context.Background(), task, run.ID, queuedAt, backend.RunQueued)
if err != nil {
t.Fatal(err)
}
returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID)
returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
@ -63,13 +66,13 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
}
startAt := time.Unix(2, 0)
if err := writer.UpdateRunState(context.Background(), taskID, run.ID, startAt, backend.RunStarted); err != nil {
if err := writer.UpdateRunState(context.Background(), task, run.ID, startAt, backend.RunStarted); err != nil {
t.Fatal(err)
}
run.StartTime = startAt.Format(time.RFC3339)
run.Status = "started"
returnedRun, err = reader.FindRunByID(context.Background(), taskID, run.ID)
returnedRun, err = reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
@ -79,13 +82,13 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
}
endAt := time.Unix(3, 0)
if err := writer.UpdateRunState(context.Background(), taskID, run.ID, endAt, backend.RunSuccess); err != nil {
if err := writer.UpdateRunState(context.Background(), task, run.ID, endAt, backend.RunSuccess); err != nil {
t.Fatal(err)
}
run.EndTime = endAt.Format(time.RFC3339)
run.Status = "success"
returnedRun, err = reader.FindRunByID(context.Background(), taskID, run.ID)
returnedRun, err = reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
@ -99,7 +102,11 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader := crf(t)
defer drf(t, writer, reader)
taskID := platform.ID([]byte("task"))
task := &backend.StoreTask{
ID: platform.ID([]byte("ab01ab01ab01ab01")),
Org: platform.ID([]byte("ab01ab01ab01ab05")),
}
run := platform.Run{
ID: platform.ID([]byte("run")),
Status: "queued",
@ -108,29 +115,29 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
logTime := time.Now()
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "bad"); err == nil {
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "bad"); err == nil {
t.Fatal("shouldn't be able to log against non existing run")
}
err := writer.UpdateRunState(context.Background(), taskID, run.ID, time.Now(), backend.RunQueued)
err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunQueued)
if err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "first"); err != nil {
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "first"); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "second"); err != nil {
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "second"); err != nil {
t.Fatal(err)
}
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "third"); err != nil {
if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "third"); err != nil {
t.Fatal(err)
}
fmtLogTime := logTime.Format(time.RFC3339)
run.Log = platform.Log(fmt.Sprintf("%s: first\n%s: second\n%s: third", fmtLogTime, fmtLogTime, fmtLogTime))
returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID)
returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
@ -144,9 +151,12 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
writer, reader := crf(t)
defer drf(t, writer, reader)
taskID := platform.ID([]byte("task"))
task := &backend.StoreTask{
ID: platform.ID([]byte("ab01ab01ab01ab01")),
Org: platform.ID([]byte("ab01ab01ab01ab05")),
}
if _, err := reader.ListRuns(context.Background(), platform.RunFilter{Task: &taskID}); err == nil {
if _, err := reader.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err == nil {
t.Fatal("failed to error on bad id")
}
@ -159,7 +169,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
QueuedAt: queuedAt.Format(time.RFC3339),
}
err := writer.UpdateRunState(context.Background(), taskID, runs[i].ID, queuedAt, backend.RunQueued)
err := writer.UpdateRunState(context.Background(), task, runs[i].ID, queuedAt, backend.RunQueued)
if err != nil {
t.Fatal(err)
}
@ -171,7 +181,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
}
listRuns, err := reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID,
Task: &task.ID,
})
if err != nil {
t.Fatal(err)
@ -182,7 +192,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
}
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID,
Task: &task.ID,
After: &runs[20].ID,
})
if err != nil {
@ -194,7 +204,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
}
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID,
Task: &task.ID,
Limit: 30,
})
if err != nil {
@ -207,7 +217,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
queuedAt, _ := time.Parse(time.RFC3339, runs[34].QueuedAt)
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID,
Task: &task.ID,
AfterTime: queuedAt.Add(-1 * time.Nanosecond).Format(time.RFC3339),
})
if err != nil {
@ -220,7 +230,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
queuedAt, _ = time.Parse(time.RFC3339, runs[34].QueuedAt)
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID,
Task: &task.ID,
BeforeTime: queuedAt.Add(time.Nanosecond).Format(time.RFC3339),
})
if err != nil {
@ -240,18 +250,21 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu
t.Fatal("failed to error with bad id")
}
taskID := platform.ID([]byte("task"))
task := &backend.StoreTask{
ID: platform.ID([]byte("ab01ab01ab01ab01")),
Org: platform.ID([]byte("ab01ab01ab01ab05")),
}
run := platform.Run{
ID: platform.ID([]byte("run")),
Status: "queued",
QueuedAt: time.Now().Format(time.RFC3339),
}
if err := writer.UpdateRunState(context.Background(), taskID, run.ID, time.Now(), backend.RunQueued); err != nil {
if err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunQueued); err != nil {
t.Fatal(err)
}
returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID)
returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
@ -262,7 +275,7 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu
returnedRun.Log = "cows"
rr2, err := reader.FindRunByID(context.Background(), taskID, run.ID)
rr2, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil {
t.Fatal(err)
}
@ -276,12 +289,15 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
writer, reader := crf(t)
defer drf(t, writer, reader)
taskID := platform.ID([]byte("task"))
task := &backend.StoreTask{
ID: platform.ID([]byte("ab01ab01ab01ab01")),
Org: platform.ID([]byte("ab01ab01ab01ab05")),
}
if _, err := reader.ListLogs(context.Background(), platform.LogFilter{}); err == nil {
t.Fatal("failed to error with no filter")
}
if _, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &taskID}); err == nil {
if _, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &task.ID}); err == nil {
t.Fatal("failed to error with no filter")
}
@ -293,12 +309,12 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
QueuedAt: time.Unix(int64(i), 0).Format(time.RFC3339),
}
err := writer.UpdateRunState(context.Background(), taskID, runs[i].ID, time.Now(), backend.RunQueued)
err := writer.UpdateRunState(context.Background(), task, runs[i].ID, time.Now(), backend.RunQueued)
if err != nil {
t.Fatal(err)
}
writer.AddRunLog(context.Background(), taskID, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i))
writer.AddRunLog(context.Background(), task, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i))
}
logs, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &runs[4].ID})
@ -311,7 +327,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
t.Fatalf("expected: %+v, got: %+v", fmtTimelog+": log4", string(logs[0]))
}
logs, err = reader.ListLogs(context.Background(), platform.LogFilter{Task: &taskID})
logs, err = reader.ListLogs(context.Background(), platform.LogFilter{Task: &task.ID})
if err != nil {
t.Fatal(err)
}

View File

@ -13,6 +13,7 @@ import (
"github.com/influxdata/platform"
"github.com/influxdata/platform/task/backend"
scheduler "github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/options"
"go.uber.org/zap"
)
@ -53,7 +54,7 @@ func (s *Scheduler) Tick(now int64) {
func (s *Scheduler) WithLogger(l *zap.Logger) {}
func (s *Scheduler) ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) error {
func (s *Scheduler) ClaimTask(task *backend.StoreTask, startExecutionFrom int64, opts *options.Options) error {
if s.claimError != nil {
return s.claimError
}
@ -61,14 +62,14 @@ func (s *Scheduler) ClaimTask(taskID platform.ID, script string, startExecutionF
s.Lock()
defer s.Unlock()
_, ok := s.claims[taskID.String()]
_, ok := s.claims[task.ID.String()]
if ok {
return errors.New("task already in list")
}
t := &Task{script, startExecutionFrom, concurrencyLimit}
t := &Task{task.Script, startExecutionFrom, uint8(opts.Concurrency)}
s.claims[taskID.String()] = t
s.claims[task.ID.String()] = t
if s.createChan != nil {
s.createChan <- t