From 69761a98f70d40254a3c6f8c9003f28620e5c633 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Wed, 1 Aug 2018 09:51:10 -0600 Subject: [PATCH 1/4] feat(task): update the scheduler and logwriter interface Preperatory change that should enable us to build more complex log writers. --- task/backend/inmem_logreaderwriter.go | 6 +- task/backend/scheduler.go | 47 +++--- task/backend/scheduler_test.go | 214 ++++++++++++++++++------- task/backend/store.go | 8 +- task/backend/storetest/logstoretest.go | 78 +++++---- 5 files changed, 227 insertions(+), 126 deletions(-) diff --git a/task/backend/inmem_logreaderwriter.go b/task/backend/inmem_logreaderwriter.go index 2066801f50..7300e09786 100644 --- a/task/backend/inmem_logreaderwriter.go +++ b/task/backend/inmem_logreaderwriter.go @@ -49,7 +49,7 @@ func NewInMemRunReaderWriter() *runReaderWriter { return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}} } -func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, status RunStatus) error { +func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, status RunStatus) error { r.mu.Lock() defer r.mu.Unlock() timeSetter := func(r *platform.Run) { @@ -69,7 +69,7 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID plat run := &platform.Run{ID: runID, Status: status.String()} timeSetter(run) r.byRunID[runID.String()] = run - r.byTaskID[taskID.String()] = append(r.byTaskID[taskID.String()], run) + r.byTaskID[task.ID.String()] = append(r.byTaskID[task.ID.String()], run) return nil } @@ -78,7 +78,7 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID plat return nil } -func (r *runReaderWriter) AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error { +func (r *runReaderWriter) AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error { r.mu.Lock() defer r.mu.Unlock() diff --git a/task/backend/scheduler.go b/task/backend/scheduler.go index 20fb2d447e..00535e27ae 100644 --- a/task/backend/scheduler.go +++ b/task/backend/scheduler.go @@ -90,9 +90,7 @@ type Scheduler interface { // you can set startExecutionFrom in the past to backfill a task. // concurrencyLimit is how many runs may be concurrently queued or executing. // concurrencyLimit must be positive. - // TODO(mr): concurrencyLimit should become a script option rather than explicit. - ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) error - + ClaimTask(task *StoreTask, startExecutionFrom int64, opt *options.Options) error // ReleaseTask immediately cancels any in-progress runs for the given task ID, // and releases any resources related to management of that task. ReleaseTask(taskID platform.ID) error @@ -191,14 +189,9 @@ func (s *outerScheduler) Tick(now int64) { } } -func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) (err error) { +func (s *outerScheduler) ClaimTask(task *StoreTask, startExecutionFrom int64, opts *options.Options) (err error) { defer s.metrics.ClaimTask(err == nil) - opts, err := options.FromScript(script) - if err != nil { - return err - } - timer := opts.Cron if timer == "" { @@ -219,10 +212,10 @@ func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecu ts := newTaskScheduler( s, - taskID, + task, sch, startExecutionFrom, - concurrencyLimit, + uint8(opts.Concurrency), ) if s.cronTimer != nil { @@ -236,13 +229,13 @@ func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecu } s.mu.Lock() - _, ok := s.tasks[taskID.String()] + _, ok := s.tasks[task.ID.String()] if ok { s.mu.Unlock() return errors.New("task has already been claimed") } - s.tasks[taskID.String()] = ts + s.tasks[task.ID.String()] = ts s.mu.Unlock() @@ -278,8 +271,8 @@ func (s *outerScheduler) PrometheusCollectors() []prometheus.Collector { // taskScheduler is a lightweight wrapper around a collection of runners. type taskScheduler struct { - // ID of task. - id platform.ID + // Task we are scheduling for. + task *StoreTask // Seconds since UTC epoch. now int64 @@ -301,7 +294,7 @@ type taskScheduler struct { func newTaskScheduler( s *outerScheduler, - taskID platform.ID, + task *StoreTask, cron cron.Schedule, startExecutionFrom int64, concurrencyLimit uint8, @@ -309,11 +302,11 @@ func newTaskScheduler( firstScheduled := cron.Next(time.Unix(startExecutionFrom, 0).UTC()).Unix() ctx, cancel := context.WithCancel(context.Background()) ts := &taskScheduler{ - id: taskID, + task: task, now: startExecutionFrom, cancel: cancel, runners: make([]*runner, concurrencyLimit), - logger: s.logger.With(zap.String("task_id", taskID.String())), + logger: s.logger.With(zap.String("task_id", task.ID.String())), } tt := &taskTimer{ @@ -327,7 +320,7 @@ func newTaskScheduler( } for i := range ts.runners { - ts.runners[i] = newRunner(ctx, ts.logger, taskID, s.desiredState, s.executor, s.logWriter, tt) + ts.runners[i] = newRunner(ctx, ts.logger, task, s.desiredState, s.executor, s.logWriter, tt) } return ts @@ -409,7 +402,7 @@ type runner struct { // Cancelable context from parent taskScheduler. ctx context.Context - taskID platform.ID + task *StoreTask desiredState DesiredState executor Executor @@ -423,7 +416,7 @@ type runner struct { func newRunner( ctx context.Context, logger *zap.Logger, - taskID platform.ID, + task *StoreTask, desiredState DesiredState, executor Executor, logWriter LogWriter, @@ -432,7 +425,7 @@ func newRunner( return &runner{ ctx: ctx, state: new(uint32), - taskID: taskID, + task: task, desiredState: desiredState, executor: executor, logWriter: logWriter, @@ -475,7 +468,7 @@ func (r *runner) startFromWorking() { if next, ready := r.tt.NextScheduledRun(); ready { // It's possible that two runners may attempt to create the same run for this "next" timestamp, // but the contract of DesiredState requires that only one succeeds. - qr, err := r.desiredState.CreateRun(r.ctx, r.taskID, next) + qr, err := r.desiredState.CreateRun(r.ctx, r.task.ID, next) if err != nil { r.logger.Info("Failed to create run", zap.Error(err)) atomic.StoreUint32(r.state, runnerIdle) @@ -552,11 +545,11 @@ func (r *runner) executeAndWait(qr QueuedRun) { func (r *runner) updateRunState(qr QueuedRun, s RunStatus) { switch s { case RunStarted: - r.tt.metrics.StartRun(r.taskID.String()) + r.tt.metrics.StartRun(r.task.ID.String()) case RunSuccess: - r.tt.metrics.FinishRun(r.taskID.String(), true) + r.tt.metrics.FinishRun(r.task.ID.String(), true) case RunFail, RunCanceled: - r.tt.metrics.FinishRun(r.taskID.String(), false) + r.tt.metrics.FinishRun(r.task.ID.String(), false) default: // We are deliberately not handling RunQueued yet. // There is not really a notion of being queued in this runner architecture. @@ -567,7 +560,7 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus) { // If we start seeing errors from this, we know the time limit is too short or the system is overloaded. ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond) defer cancel() - if err := r.logWriter.UpdateRunState(ctx, r.taskID, qr.RunID, time.Now(), s); err != nil { + if err := r.logWriter.UpdateRunState(ctx, r.task, qr.RunID, time.Now(), s); err != nil { r.logger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err)) } } diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index 1f7c8ff7e8..d3e73419d0 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/influxdata/platform" "github.com/influxdata/platform/kit/prom" @@ -11,28 +12,19 @@ import ( _ "github.com/influxdata/platform/query/builtin" "github.com/influxdata/platform/task/backend" "github.com/influxdata/platform/task/mock" -) - -const ( - scriptEveryMinute = `option task = { - name: "name", - cron: "* * * * *", - } -// (every minute on the minute) -from(bucket:"b") |> toHTTP(url: "http://example.com")` - - scriptEverySecond = `option task = { - name: "name", - every: 1s, - } - from(bucket:"b") |> toHTTP(url: "http://example.com")` + "github.com/influxdata/platform/task/options" ) func TestScheduler_EveryValidation(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) +<<<<<<< HEAD tid := platform.ID{1} +======= +<<<<<<< HEAD + tid := platform.ID(1) +>>>>>>> feat(task): update the scheduler and logwriter interface badScripts := []string{ `option task = { @@ -60,6 +52,30 @@ from(bucket:"b") |> toHTTP(url: "http://example.com")`, for _, badScript := range badScripts { if err := o.ClaimTask(tid, badScript, 3, 99); err == nil { t.Fatal("no error returned for :", badScript) +======= + task := &backend.StoreTask{ + ID: platform.ID{1}, + } + + badOptions := []options.Options{ + { + Every: time.Millisecond, + }, + { + Every: time.Hour * -1, + }, + { + Every: 1500 * time.Millisecond, + }, + { + Every: 1232 * time.Millisecond, + }, + } + + for _, badOption := range badOptions { + if err := o.ClaimTask(task, 3, &badOption); err == nil { + t.Fatal("no error returned for :", badOption) +>>>>>>> feat(task): update the scheduler and logwriter interface } } } @@ -69,23 +85,47 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) +<<<<<<< HEAD tid := platform.ID{1} +======= +<<<<<<< HEAD + tid := platform.ID(1) +>>>>>>> feat(task): update the scheduler and logwriter interface if err := o.ClaimTask(tid, scriptEveryMinute, 3, 99); err != nil { +======= + task := &backend.StoreTask{ + ID: platform.ID{1}, + } + opts := &options.Options{Every: time.Minute} + if err := o.ClaimTask(task, 3, opts); err != nil { +>>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } // No valid timestamps between 3 and 5 for every minute. - if n := len(d.CreatedFor(tid)); n > 0 { + if n := len(d.CreatedFor(task.ID)); n > 0 { t.Fatalf("expected no runs queued, but got %d", n) } // For every second, can queue for timestamps 4 and 5. +<<<<<<< HEAD tid = platform.ID{2} +======= +<<<<<<< HEAD + tid = platform.ID(2) +>>>>>>> feat(task): update the scheduler and logwriter interface if err := o.ClaimTask(tid, scriptEverySecond, 3, 5); err != nil { +======= + task = &backend.StoreTask{ + ID: platform.ID{2}, + } + opts = &options.Options{Every: time.Second, Concurrency: 99} + if err := o.ClaimTask(task, 3, opts); err != nil { +>>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } - if n := len(d.CreatedFor(tid)); n != 2 { + if n := len(d.CreatedFor(task.ID)); n != 2 { t.Fatalf("expected 2 runs queued for 'every 1s' script, but got %d", n) } } @@ -95,36 +135,49 @@ func TestScheduler_CreateRunOnTick(t *testing.T) { e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) +<<<<<<< HEAD tid := platform.ID{1} +======= +<<<<<<< HEAD + tid := platform.ID(1) +>>>>>>> feat(task): update the scheduler and logwriter interface if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { +======= + task := &backend.StoreTask{ + ID: platform.ID{1}, + } + + opts := &options.Options{Every: time.Second, Concurrency: 2} + if err := o.ClaimTask(task, 5, opts); err != nil { +>>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } - if x, err := d.PollForNumberCreated(tid, 0); err != nil { + if x, err := d.PollForNumberCreated(task.ID, 0); err != nil { t.Fatalf("expected no runs queued, but got %d", len(x)) } o.Tick(6) - if x, err := d.PollForNumberCreated(tid, 1); err != nil { + if x, err := d.PollForNumberCreated(task.ID, 1); err != nil { t.Fatalf("expected 1 run queued, but got %d", len(x)) } - running, err := e.PollForNumberRunning(tid, 1) + running, err := e.PollForNumberRunning(task.ID, 1) if err != nil { t.Fatal(err) } run6 := running[0] o.Tick(7) - if x, err := d.PollForNumberCreated(tid, 2); err != nil { + if x, err := d.PollForNumberCreated(task.ID, 2); err != nil { t.Fatalf("expected 2 runs queued, but got %d", len(x)) } o.Tick(8) // Can't exceed concurrency of 2. - if x, err := d.PollForNumberCreated(tid, 2); err != nil { + if x, err := d.PollForNumberCreated(task.ID, 2); err != nil { t.Fatalf("expected 2 runs queued, but got %d", len(x)) } run6.Cancel() - if x, err := d.PollForNumberCreated(tid, 1); err != nil { + if x, err := d.PollForNumberCreated(task.ID, 1); err != nil { t.Fatal(err, x) } } @@ -134,22 +187,35 @@ func TestScheduler_Release(t *testing.T) { e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) +<<<<<<< HEAD tid := platform.ID{1} +======= +<<<<<<< HEAD + tid := platform.ID(1) +>>>>>>> feat(task): update the scheduler and logwriter interface if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { +======= + task := &backend.StoreTask{ + ID: platform.ID{1}, + } + + opts := &options.Options{Every: time.Second, Concurrency: 99} + if err := o.ClaimTask(task, 5, opts); err != nil { +>>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } o.Tick(6) o.Tick(7) - if n := len(d.CreatedFor(tid)); n != 2 { + if n := len(d.CreatedFor(task.ID)); n != 2 { t.Fatalf("expected 2 runs queued, but got %d", n) } - if err := o.ReleaseTask(tid); err != nil { + if err := o.ReleaseTask(task.ID); err != nil { t.Fatal(err) } - if _, err := d.PollForNumberCreated(tid, 0); err != nil { + if _, err := d.PollForNumberCreated(task.ID, 0); err != nil { t.Fatal(err) } } @@ -161,22 +227,35 @@ func TestScheduler_RunLog(t *testing.T) { s := backend.NewScheduler(d, e, rl, 5) // Claim a task that starts later. +<<<<<<< HEAD tid := platform.ID{1} +======= +<<<<<<< HEAD + tid := platform.ID(1) +>>>>>>> feat(task): update the scheduler and logwriter interface if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { +======= + task := &backend.StoreTask{ + ID: platform.ID{1}, + } + + opts := &options.Options{Every: time.Second, Concurrency: 99} + if err := s.ClaimTask(task, 5, opts); err != nil { +>>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } - if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}); err != backend.ErrRunNotFound { + if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err != backend.ErrRunNotFound { t.Fatal(err) } s.Tick(6) - promises, err := e.PollForNumberRunning(tid, 1) + promises, err := e.PollForNumberRunning(task.ID, 1) if err != nil { t.Fatal(err) } - runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) + runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) if err != nil { t.Fatal(err) } @@ -190,11 +269,11 @@ func TestScheduler_RunLog(t *testing.T) { // Finish with success. promises[0].Finish(mock.NewRunResult(nil, false), nil) - if _, err := e.PollForNumberRunning(tid, 0); err != nil { + if _, err := e.PollForNumberRunning(task.ID, 0); err != nil { t.Fatal(err) } - runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) + runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) if err != nil { t.Fatal(err) } @@ -208,12 +287,12 @@ func TestScheduler_RunLog(t *testing.T) { // Create a new run, but fail this time. s.Tick(7) - promises, err = e.PollForNumberRunning(tid, 1) + promises, err = e.PollForNumberRunning(task.ID, 1) if err != nil { t.Fatal(err) } - runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) + runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) if err != nil { t.Fatal(err) } @@ -227,11 +306,11 @@ func TestScheduler_RunLog(t *testing.T) { // Finish with failure. promises[0].Finish(nil, errors.New("forced failure")) - if _, err := e.PollForNumberRunning(tid, 0); err != nil { + if _, err := e.PollForNumberRunning(task.ID, 0); err != nil { t.Fatal(err) } - runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) + runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) if err != nil { t.Fatal(err) } @@ -245,12 +324,12 @@ func TestScheduler_RunLog(t *testing.T) { // One more run, but cancel this time. s.Tick(8) - promises, err = e.PollForNumberRunning(tid, 1) + promises, err = e.PollForNumberRunning(task.ID, 1) if err != nil { t.Fatal(err) } - runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) + runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) if err != nil { t.Fatal(err) } @@ -264,11 +343,11 @@ func TestScheduler_RunLog(t *testing.T) { // Finish with failure. promises[0].Cancel() - if _, err := e.PollForNumberRunning(tid, 0); err != nil { + if _, err := e.PollForNumberRunning(task.ID, 0); err != nil { t.Fatal(err) } - runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) + runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}) if err != nil { t.Fatal(err) } @@ -292,8 +371,21 @@ func TestScheduler_Metrics(t *testing.T) { reg.MustRegister(s.(prom.PrometheusCollector).PrometheusCollectors()...) // Claim a task that starts later. +<<<<<<< HEAD tid := platform.ID{1} +======= +<<<<<<< HEAD + tid := platform.ID(1) +>>>>>>> feat(task): update the scheduler and logwriter interface if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { +======= + task := &backend.StoreTask{ + ID: platform.ID{1}, + } + + opts := &options.Options{Every: time.Second, Concurrency: 99} + if err := s.ClaimTask(task, 5, opts); err != nil { +>>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } @@ -309,7 +401,7 @@ func TestScheduler_Metrics(t *testing.T) { } s.Tick(6) - if _, err := e.PollForNumberRunning(tid, 1); err != nil { + if _, err := e.PollForNumberRunning(task.ID, 1); err != nil { t.Fatal(err) } @@ -319,13 +411,13 @@ func TestScheduler_Metrics(t *testing.T) { if got := *m.Gauge.Value; got != 1 { t.Fatalf("expected 1 total run active, got %v", got) } - m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) + m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()}) if got := *m.Gauge.Value; got != 1 { - t.Fatalf("expected 1 run active for task ID %s, got %v", tid.String(), got) + t.Fatalf("expected 1 run active for task ID %s, got %v", task.ID.String(), got) } s.Tick(7) - if _, err := e.PollForNumberRunning(tid, 2); err != nil { + if _, err := e.PollForNumberRunning(task.ID, 2); err != nil { t.Fatal(err) } @@ -334,14 +426,14 @@ func TestScheduler_Metrics(t *testing.T) { if got := *m.Gauge.Value; got != 2 { t.Fatalf("expected 2 total runs active, got %v", got) } - m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) + m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()}) if got := *m.Gauge.Value; got != 2 { - t.Fatalf("expected 2 runs active for task ID %s, got %v", tid.String(), got) + t.Fatalf("expected 2 runs active for task ID %s, got %v", task.ID.String(), got) } // Runs active decreases as run finishes. - e.RunningFor(tid)[0].Finish(mock.NewRunResult(nil, false), nil) - if _, err := e.PollForNumberRunning(tid, 1); err != nil { + e.RunningFor(task.ID)[0].Finish(mock.NewRunResult(nil, false), nil) + if _, err := e.PollForNumberRunning(task.ID, 1); err != nil { t.Fatal(err) } mfs = promtest.MustGather(t, reg) @@ -349,17 +441,17 @@ func TestScheduler_Metrics(t *testing.T) { if got := *m.Gauge.Value; got != 1 { t.Fatalf("expected 1 total run active, got %v", got) } - m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) + m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()}) if got := *m.Gauge.Value; got != 1 { - t.Fatalf("expected 1 run active for task ID %s, got %v", tid.String(), got) + t.Fatalf("expected 1 run active for task ID %s, got %v", task.ID.String(), got) } - m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "success"}) + m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "success"}) if got := *m.Counter.Value; got != 1 { - t.Fatalf("expected 1 run succeeded for task ID %s, got %v", tid.String(), got) + t.Fatalf("expected 1 run succeeded for task ID %s, got %v", task.ID.String(), got) } - e.RunningFor(tid)[0].Finish(mock.NewRunResult(nil, false), errors.New("failed to execute")) - if _, err := e.PollForNumberRunning(tid, 0); err != nil { + e.RunningFor(task.ID)[0].Finish(mock.NewRunResult(nil, false), errors.New("failed to execute")) + if _, err := e.PollForNumberRunning(task.ID, 0); err != nil { t.Fatal(err) } mfs = promtest.MustGather(t, reg) @@ -367,27 +459,27 @@ func TestScheduler_Metrics(t *testing.T) { if got := *m.Gauge.Value; got != 0 { t.Fatalf("expected 0 total runs active, got %v", got) } - m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) + m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()}) if got := *m.Gauge.Value; got != 0 { - t.Fatalf("expected 0 runs active for task ID %s, got %v", tid.String(), got) + t.Fatalf("expected 0 runs active for task ID %s, got %v", task.ID.String(), got) } - m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "failure"}) + m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "failure"}) if got := *m.Counter.Value; got != 1 { - t.Fatalf("expected 1 run failed for task ID %s, got %v", tid.String(), got) + t.Fatalf("expected 1 run failed for task ID %s, got %v", task.ID.String(), got) } // Runs label removed after task released. - if err := s.ReleaseTask(tid); err != nil { + if err := s.ReleaseTask(task.ID); err != nil { t.Fatal(err) } mfs = promtest.MustGather(t, reg) - if m := promtest.FindMetric(mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}); m != nil { + if m := promtest.FindMetric(mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()}); m != nil { t.Fatalf("expected metric to be removed after releasing a task, got %v", m) } - if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "success"}); m != nil { + if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "success"}); m != nil { t.Fatalf("expected metric to be removed after releasing a task, got %v", m) } - if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "failure"}); m != nil { + if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "failure"}); m != nil { t.Fatalf("expected metric to be removed after releasing a task, got %v", m) } diff --git a/task/backend/store.go b/task/backend/store.go index 419bba44e3..0dedbf4773 100644 --- a/task/backend/store.go +++ b/task/backend/store.go @@ -62,20 +62,20 @@ type Store interface { // LogWriter writes task logs and task state changes to a store. type LogWriter interface { // UpdateRunState sets the run state and the respective time. - UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, state RunStatus) error + UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, state RunStatus) error // AddRunLog adds a log line to the run. - AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error + AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error } // NopLogWriter is a LogWriter that doesn't do anything when its methods are called. // This is useful for test, but not much else. type NopLogWriter struct{} -func (NopLogWriter) UpdateRunState(context.Context, platform.ID, platform.ID, time.Time, RunStatus) error { +func (NopLogWriter) UpdateRunState(context.Context, *StoreTask, platform.ID, time.Time, RunStatus) error { return nil } -func (NopLogWriter) AddRunLog(context.Context, platform.ID, platform.ID, time.Time, string) error { +func (NopLogWriter) AddRunLog(context.Context, *StoreTask, platform.ID, time.Time, string) error { return nil } diff --git a/task/backend/storetest/logstoretest.go b/task/backend/storetest/logstoretest.go index b1de1af156..d70ed54b7e 100644 --- a/task/backend/storetest/logstoretest.go +++ b/task/backend/storetest/logstoretest.go @@ -40,7 +40,10 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun writer, reader := crf(t) defer drf(t, writer, reader) - taskID := platform.ID([]byte("task")) + task := &backend.StoreTask{ + ID: platform.ID([]byte("task")), + Org: platform.ID([]byte("org")), + } queuedAt := time.Unix(1, 0) run := platform.Run{ ID: platform.ID([]byte("run")), @@ -48,12 +51,12 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun QueuedAt: queuedAt.Format(time.RFC3339), } - err := writer.UpdateRunState(context.Background(), taskID, run.ID, queuedAt, backend.RunQueued) + err := writer.UpdateRunState(context.Background(), task, run.ID, queuedAt, backend.RunQueued) if err != nil { t.Fatal(err) } - returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID) + returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID) if err != nil { t.Fatal(err) } @@ -63,13 +66,13 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun } startAt := time.Unix(2, 0) - if err := writer.UpdateRunState(context.Background(), taskID, run.ID, startAt, backend.RunStarted); err != nil { + if err := writer.UpdateRunState(context.Background(), task, run.ID, startAt, backend.RunStarted); err != nil { t.Fatal(err) } run.StartTime = startAt.Format(time.RFC3339) run.Status = "started" - returnedRun, err = reader.FindRunByID(context.Background(), taskID, run.ID) + returnedRun, err = reader.FindRunByID(context.Background(), task.ID, run.ID) if err != nil { t.Fatal(err) } @@ -79,13 +82,13 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun } endAt := time.Unix(3, 0) - if err := writer.UpdateRunState(context.Background(), taskID, run.ID, endAt, backend.RunSuccess); err != nil { + if err := writer.UpdateRunState(context.Background(), task, run.ID, endAt, backend.RunSuccess); err != nil { t.Fatal(err) } run.EndTime = endAt.Format(time.RFC3339) run.Status = "success" - returnedRun, err = reader.FindRunByID(context.Background(), taskID, run.ID) + returnedRun, err = reader.FindRunByID(context.Background(), task.ID, run.ID) if err != nil { t.Fatal(err) } @@ -99,7 +102,11 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) { writer, reader := crf(t) defer drf(t, writer, reader) - taskID := platform.ID([]byte("task")) + task := &backend.StoreTask{ + ID: platform.ID([]byte("task")), + Org: platform.ID([]byte("org")), + } + run := platform.Run{ ID: platform.ID([]byte("run")), Status: "queued", @@ -108,29 +115,29 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) { logTime := time.Now() - if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "bad"); err == nil { + if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "bad"); err == nil { t.Fatal("shouldn't be able to log against non existing run") } - err := writer.UpdateRunState(context.Background(), taskID, run.ID, time.Now(), backend.RunQueued) + err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunQueued) if err != nil { t.Fatal(err) } - if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "first"); err != nil { + if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "first"); err != nil { t.Fatal(err) } - if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "second"); err != nil { + if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "second"); err != nil { t.Fatal(err) } - if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "third"); err != nil { + if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "third"); err != nil { t.Fatal(err) } fmtLogTime := logTime.Format(time.RFC3339) run.Log = platform.Log(fmt.Sprintf("%s: first\n%s: second\n%s: third", fmtLogTime, fmtLogTime, fmtLogTime)) - returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID) + returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID) if err != nil { t.Fatal(err) } @@ -144,9 +151,12 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) writer, reader := crf(t) defer drf(t, writer, reader) - taskID := platform.ID([]byte("task")) + task := &backend.StoreTask{ + ID: platform.ID([]byte("task")), + Org: platform.ID([]byte("org")), + } - if _, err := reader.ListRuns(context.Background(), platform.RunFilter{Task: &taskID}); err == nil { + if _, err := reader.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err == nil { t.Fatal("failed to error on bad id") } @@ -159,7 +169,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) QueuedAt: queuedAt.Format(time.RFC3339), } - err := writer.UpdateRunState(context.Background(), taskID, runs[i].ID, queuedAt, backend.RunQueued) + err := writer.UpdateRunState(context.Background(), task, runs[i].ID, queuedAt, backend.RunQueued) if err != nil { t.Fatal(err) } @@ -171,7 +181,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) } listRuns, err := reader.ListRuns(context.Background(), platform.RunFilter{ - Task: &taskID, + Task: &task.ID, }) if err != nil { t.Fatal(err) @@ -182,7 +192,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) } listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ - Task: &taskID, + Task: &task.ID, After: &runs[20].ID, }) if err != nil { @@ -194,7 +204,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) } listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ - Task: &taskID, + Task: &task.ID, Limit: 30, }) if err != nil { @@ -207,7 +217,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) queuedAt, _ := time.Parse(time.RFC3339, runs[34].QueuedAt) listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ - Task: &taskID, + Task: &task.ID, AfterTime: queuedAt.Add(-1 * time.Nanosecond).Format(time.RFC3339), }) if err != nil { @@ -220,7 +230,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) queuedAt, _ = time.Parse(time.RFC3339, runs[34].QueuedAt) listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ - Task: &taskID, + Task: &task.ID, BeforeTime: queuedAt.Add(time.Nanosecond).Format(time.RFC3339), }) if err != nil { @@ -240,18 +250,21 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu t.Fatal("failed to error with bad id") } - taskID := platform.ID([]byte("task")) + task := &backend.StoreTask{ + ID: platform.ID([]byte("task")), + Org: platform.ID([]byte("org")), + } run := platform.Run{ ID: platform.ID([]byte("run")), Status: "queued", QueuedAt: time.Now().Format(time.RFC3339), } - if err := writer.UpdateRunState(context.Background(), taskID, run.ID, time.Now(), backend.RunQueued); err != nil { + if err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunQueued); err != nil { t.Fatal(err) } - returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID) + returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID) if err != nil { t.Fatal(err) } @@ -262,7 +275,7 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu returnedRun.Log = "cows" - rr2, err := reader.FindRunByID(context.Background(), taskID, run.ID) + rr2, err := reader.FindRunByID(context.Background(), task.ID, run.ID) if err != nil { t.Fatal(err) } @@ -276,12 +289,15 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) writer, reader := crf(t) defer drf(t, writer, reader) - taskID := platform.ID([]byte("task")) + task := &backend.StoreTask{ + ID: platform.ID([]byte("task")), + Org: platform.ID([]byte("org")), + } if _, err := reader.ListLogs(context.Background(), platform.LogFilter{}); err == nil { t.Fatal("failed to error with no filter") } - if _, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &taskID}); err == nil { + if _, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &task.ID}); err == nil { t.Fatal("failed to error with no filter") } @@ -293,12 +309,12 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) QueuedAt: time.Unix(int64(i), 0).Format(time.RFC3339), } - err := writer.UpdateRunState(context.Background(), taskID, runs[i].ID, time.Now(), backend.RunQueued) + err := writer.UpdateRunState(context.Background(), task, runs[i].ID, time.Now(), backend.RunQueued) if err != nil { t.Fatal(err) } - writer.AddRunLog(context.Background(), taskID, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i)) + writer.AddRunLog(context.Background(), task, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i)) } logs, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &runs[4].ID}) @@ -311,7 +327,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) t.Fatalf("expected: %+v, got: %+v", fmtTimelog+": log4", string(logs[0])) } - logs, err = reader.ListLogs(context.Background(), platform.LogFilter{Task: &taskID}) + logs, err = reader.ListLogs(context.Background(), platform.LogFilter{Task: &task.ID}) if err != nil { t.Fatal(err) } From 6bba50a2a1328cadf966d5ade7af4ab9115333b3 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Wed, 1 Aug 2018 10:36:28 -0600 Subject: [PATCH 2/4] minor cleanup --- task/backend/storetest/logstoretest.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/task/backend/storetest/logstoretest.go b/task/backend/storetest/logstoretest.go index d70ed54b7e..d2a4d0fd20 100644 --- a/task/backend/storetest/logstoretest.go +++ b/task/backend/storetest/logstoretest.go @@ -41,7 +41,7 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platform.ID([]byte("task")), + ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), Org: platform.ID([]byte("org")), } queuedAt := time.Unix(1, 0) @@ -103,7 +103,7 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) { defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platform.ID([]byte("task")), + ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), Org: platform.ID([]byte("org")), } @@ -152,7 +152,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platform.ID([]byte("task")), + ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), Org: platform.ID([]byte("org")), } @@ -251,7 +251,7 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu } task := &backend.StoreTask{ - ID: platform.ID([]byte("task")), + ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), Org: platform.ID([]byte("org")), } run := platform.Run{ @@ -290,7 +290,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platform.ID([]byte("task")), + ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), Org: platform.ID([]byte("org")), } From 3ba8784fed311c78c0fc79b5e404b3aea41a8a7c Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Wed, 1 Aug 2018 10:43:31 -0600 Subject: [PATCH 3/4] fix bad commit with merge issue --- task/backend/scheduler_test.go | 89 -------------------------- task/backend/storetest/logstoretest.go | 20 +++--- 2 files changed, 10 insertions(+), 99 deletions(-) diff --git a/task/backend/scheduler_test.go b/task/backend/scheduler_test.go index d3e73419d0..7dffbdabd5 100644 --- a/task/backend/scheduler_test.go +++ b/task/backend/scheduler_test.go @@ -19,40 +19,6 @@ func TestScheduler_EveryValidation(t *testing.T) { d := mock.NewDesiredState() e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) -<<<<<<< HEAD - tid := platform.ID{1} -======= -<<<<<<< HEAD - tid := platform.ID(1) ->>>>>>> feat(task): update the scheduler and logwriter interface - - badScripts := []string{ - `option task = { - name: "name", - every: 1ms, - } -from(bucket:"b") |> toHTTP(url: "http://example.com")`, - `option task = { - name: "name", - every: -1h, - } -from(bucket:"b") |> toHTTP(url: "http://example.com")`, - `option task = { - name: "name", - every: 1500ms, - } -from(bucket:"b") |> toHTTP(url: "http://example.com")`, - `option task = { - name: "name", - every: 12.32s, - } -from(bucket:"b") |> toHTTP(url: "http://example.com")`, - } - - for _, badScript := range badScripts { - if err := o.ClaimTask(tid, badScript, 3, 99); err == nil { - t.Fatal("no error returned for :", badScript) -======= task := &backend.StoreTask{ ID: platform.ID{1}, } @@ -75,7 +41,6 @@ from(bucket:"b") |> toHTTP(url: "http://example.com")`, for _, badOption := range badOptions { if err := o.ClaimTask(task, 3, &badOption); err == nil { t.Fatal("no error returned for :", badOption) ->>>>>>> feat(task): update the scheduler and logwriter interface } } } @@ -85,20 +50,11 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) -<<<<<<< HEAD - tid := platform.ID{1} -======= -<<<<<<< HEAD - tid := platform.ID(1) ->>>>>>> feat(task): update the scheduler and logwriter interface - if err := o.ClaimTask(tid, scriptEveryMinute, 3, 99); err != nil { -======= task := &backend.StoreTask{ ID: platform.ID{1}, } opts := &options.Options{Every: time.Minute} if err := o.ClaimTask(task, 3, opts); err != nil { ->>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } @@ -108,20 +64,11 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) { } // For every second, can queue for timestamps 4 and 5. -<<<<<<< HEAD - tid = platform.ID{2} -======= -<<<<<<< HEAD - tid = platform.ID(2) ->>>>>>> feat(task): update the scheduler and logwriter interface - if err := o.ClaimTask(tid, scriptEverySecond, 3, 5); err != nil { -======= task = &backend.StoreTask{ ID: platform.ID{2}, } opts = &options.Options{Every: time.Second, Concurrency: 99} if err := o.ClaimTask(task, 3, opts); err != nil { ->>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } @@ -135,21 +82,12 @@ func TestScheduler_CreateRunOnTick(t *testing.T) { e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) -<<<<<<< HEAD - tid := platform.ID{1} -======= -<<<<<<< HEAD - tid := platform.ID(1) ->>>>>>> feat(task): update the scheduler and logwriter interface - if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { -======= task := &backend.StoreTask{ ID: platform.ID{1}, } opts := &options.Options{Every: time.Second, Concurrency: 2} if err := o.ClaimTask(task, 5, opts); err != nil { ->>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } @@ -187,21 +125,12 @@ func TestScheduler_Release(t *testing.T) { e := mock.NewExecutor() o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) -<<<<<<< HEAD - tid := platform.ID{1} -======= -<<<<<<< HEAD - tid := platform.ID(1) ->>>>>>> feat(task): update the scheduler and logwriter interface - if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { -======= task := &backend.StoreTask{ ID: platform.ID{1}, } opts := &options.Options{Every: time.Second, Concurrency: 99} if err := o.ClaimTask(task, 5, opts); err != nil { ->>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } @@ -227,21 +156,12 @@ func TestScheduler_RunLog(t *testing.T) { s := backend.NewScheduler(d, e, rl, 5) // Claim a task that starts later. -<<<<<<< HEAD - tid := platform.ID{1} -======= -<<<<<<< HEAD - tid := platform.ID(1) ->>>>>>> feat(task): update the scheduler and logwriter interface - if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { -======= task := &backend.StoreTask{ ID: platform.ID{1}, } opts := &options.Options{Every: time.Second, Concurrency: 99} if err := s.ClaimTask(task, 5, opts); err != nil { ->>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } @@ -371,21 +291,12 @@ func TestScheduler_Metrics(t *testing.T) { reg.MustRegister(s.(prom.PrometheusCollector).PrometheusCollectors()...) // Claim a task that starts later. -<<<<<<< HEAD - tid := platform.ID{1} -======= -<<<<<<< HEAD - tid := platform.ID(1) ->>>>>>> feat(task): update the scheduler and logwriter interface - if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { -======= task := &backend.StoreTask{ ID: platform.ID{1}, } opts := &options.Options{Every: time.Second, Concurrency: 99} if err := s.ClaimTask(task, 5, opts); err != nil { ->>>>>>> feat(task): update the scheduler and logwriter interface t.Fatal(err) } diff --git a/task/backend/storetest/logstoretest.go b/task/backend/storetest/logstoretest.go index d2a4d0fd20..1fb82d17b8 100644 --- a/task/backend/storetest/logstoretest.go +++ b/task/backend/storetest/logstoretest.go @@ -41,8 +41,8 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), - Org: platform.ID([]byte("org")), + ID: platform.ID([]byte("ab01ab01ab01ab01")), + Org: platform.ID([]byte("ab01ab01ab01ab05")), } queuedAt := time.Unix(1, 0) run := platform.Run{ @@ -103,8 +103,8 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) { defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), - Org: platform.ID([]byte("org")), + ID: platform.ID([]byte("ab01ab01ab01ab01")), + Org: platform.ID([]byte("ab01ab01ab01ab05")), } run := platform.Run{ @@ -152,8 +152,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), - Org: platform.ID([]byte("org")), + ID: platform.ID([]byte("ab01ab01ab01ab01")), + Org: platform.ID([]byte("ab01ab01ab01ab05")), } if _, err := reader.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err == nil { @@ -251,8 +251,8 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu } task := &backend.StoreTask{ - ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), - Org: platform.ID([]byte("org")), + ID: platform.ID([]byte("ab01ab01ab01ab01")), + Org: platform.ID([]byte("ab01ab01ab01ab05")), } run := platform.Run{ ID: platform.ID([]byte("run")), @@ -290,8 +290,8 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) defer drf(t, writer, reader) task := &backend.StoreTask{ - ID: platformtesting.MustIDFromString("ab01ab01ab01ab01"), - Org: platform.ID([]byte("org")), + ID: platform.ID([]byte("ab01ab01ab01ab01")), + Org: platform.ID([]byte("ab01ab01ab01ab05")), } if _, err := reader.ListLogs(context.Background(), platform.LogFilter{}); err == nil { From 0661d05b1f13d3b057ad3ba258aca08341713c15 Mon Sep 17 00:00:00 2001 From: Lyon Hill Date: Wed, 1 Aug 2018 11:36:10 -0600 Subject: [PATCH 4/4] update mock scheduler --- task/mock/scheduler.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/task/mock/scheduler.go b/task/mock/scheduler.go index 17535ee26d..3ef5a7266a 100644 --- a/task/mock/scheduler.go +++ b/task/mock/scheduler.go @@ -13,6 +13,7 @@ import ( "github.com/influxdata/platform" "github.com/influxdata/platform/task/backend" scheduler "github.com/influxdata/platform/task/backend" + "github.com/influxdata/platform/task/options" "go.uber.org/zap" ) @@ -53,7 +54,7 @@ func (s *Scheduler) Tick(now int64) { func (s *Scheduler) WithLogger(l *zap.Logger) {} -func (s *Scheduler) ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) error { +func (s *Scheduler) ClaimTask(task *backend.StoreTask, startExecutionFrom int64, opts *options.Options) error { if s.claimError != nil { return s.claimError } @@ -61,14 +62,14 @@ func (s *Scheduler) ClaimTask(taskID platform.ID, script string, startExecutionF s.Lock() defer s.Unlock() - _, ok := s.claims[taskID.String()] + _, ok := s.claims[task.ID.String()] if ok { return errors.New("task already in list") } - t := &Task{script, startExecutionFrom, concurrencyLimit} + t := &Task{task.Script, startExecutionFrom, uint8(opts.Concurrency)} - s.claims[taskID.String()] = t + s.claims[task.ID.String()] = t if s.createChan != nil { s.createChan <- t