feat(task): update the scheduler and logwriter interface

Preperatory change that should enable us to build more complex log writers.
pull/10616/head
Lyon Hill 2018-08-01 09:51:10 -06:00
parent e598ee45ed
commit 69761a98f7
5 changed files with 227 additions and 126 deletions

View File

@ -49,7 +49,7 @@ func NewInMemRunReaderWriter() *runReaderWriter {
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}} return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}}
} }
func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, status RunStatus) error { func (r *runReaderWriter) UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, status RunStatus) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
timeSetter := func(r *platform.Run) { timeSetter := func(r *platform.Run) {
@ -69,7 +69,7 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID plat
run := &platform.Run{ID: runID, Status: status.String()} run := &platform.Run{ID: runID, Status: status.String()}
timeSetter(run) timeSetter(run)
r.byRunID[runID.String()] = run r.byRunID[runID.String()] = run
r.byTaskID[taskID.String()] = append(r.byTaskID[taskID.String()], run) r.byTaskID[task.ID.String()] = append(r.byTaskID[task.ID.String()], run)
return nil return nil
} }
@ -78,7 +78,7 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, taskID, runID plat
return nil return nil
} }
func (r *runReaderWriter) AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error { func (r *runReaderWriter) AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()

View File

@ -90,9 +90,7 @@ type Scheduler interface {
// you can set startExecutionFrom in the past to backfill a task. // you can set startExecutionFrom in the past to backfill a task.
// concurrencyLimit is how many runs may be concurrently queued or executing. // concurrencyLimit is how many runs may be concurrently queued or executing.
// concurrencyLimit must be positive. // concurrencyLimit must be positive.
// TODO(mr): concurrencyLimit should become a script option rather than explicit. ClaimTask(task *StoreTask, startExecutionFrom int64, opt *options.Options) error
ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) error
// ReleaseTask immediately cancels any in-progress runs for the given task ID, // ReleaseTask immediately cancels any in-progress runs for the given task ID,
// and releases any resources related to management of that task. // and releases any resources related to management of that task.
ReleaseTask(taskID platform.ID) error ReleaseTask(taskID platform.ID) error
@ -191,14 +189,9 @@ func (s *outerScheduler) Tick(now int64) {
} }
} }
func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecutionFrom int64, concurrencyLimit uint8) (err error) { func (s *outerScheduler) ClaimTask(task *StoreTask, startExecutionFrom int64, opts *options.Options) (err error) {
defer s.metrics.ClaimTask(err == nil) defer s.metrics.ClaimTask(err == nil)
opts, err := options.FromScript(script)
if err != nil {
return err
}
timer := opts.Cron timer := opts.Cron
if timer == "" { if timer == "" {
@ -219,10 +212,10 @@ func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecu
ts := newTaskScheduler( ts := newTaskScheduler(
s, s,
taskID, task,
sch, sch,
startExecutionFrom, startExecutionFrom,
concurrencyLimit, uint8(opts.Concurrency),
) )
if s.cronTimer != nil { if s.cronTimer != nil {
@ -236,13 +229,13 @@ func (s *outerScheduler) ClaimTask(taskID platform.ID, script string, startExecu
} }
s.mu.Lock() s.mu.Lock()
_, ok := s.tasks[taskID.String()] _, ok := s.tasks[task.ID.String()]
if ok { if ok {
s.mu.Unlock() s.mu.Unlock()
return errors.New("task has already been claimed") return errors.New("task has already been claimed")
} }
s.tasks[taskID.String()] = ts s.tasks[task.ID.String()] = ts
s.mu.Unlock() s.mu.Unlock()
@ -278,8 +271,8 @@ func (s *outerScheduler) PrometheusCollectors() []prometheus.Collector {
// taskScheduler is a lightweight wrapper around a collection of runners. // taskScheduler is a lightweight wrapper around a collection of runners.
type taskScheduler struct { type taskScheduler struct {
// ID of task. // Task we are scheduling for.
id platform.ID task *StoreTask
// Seconds since UTC epoch. // Seconds since UTC epoch.
now int64 now int64
@ -301,7 +294,7 @@ type taskScheduler struct {
func newTaskScheduler( func newTaskScheduler(
s *outerScheduler, s *outerScheduler,
taskID platform.ID, task *StoreTask,
cron cron.Schedule, cron cron.Schedule,
startExecutionFrom int64, startExecutionFrom int64,
concurrencyLimit uint8, concurrencyLimit uint8,
@ -309,11 +302,11 @@ func newTaskScheduler(
firstScheduled := cron.Next(time.Unix(startExecutionFrom, 0).UTC()).Unix() firstScheduled := cron.Next(time.Unix(startExecutionFrom, 0).UTC()).Unix()
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
ts := &taskScheduler{ ts := &taskScheduler{
id: taskID, task: task,
now: startExecutionFrom, now: startExecutionFrom,
cancel: cancel, cancel: cancel,
runners: make([]*runner, concurrencyLimit), runners: make([]*runner, concurrencyLimit),
logger: s.logger.With(zap.String("task_id", taskID.String())), logger: s.logger.With(zap.String("task_id", task.ID.String())),
} }
tt := &taskTimer{ tt := &taskTimer{
@ -327,7 +320,7 @@ func newTaskScheduler(
} }
for i := range ts.runners { for i := range ts.runners {
ts.runners[i] = newRunner(ctx, ts.logger, taskID, s.desiredState, s.executor, s.logWriter, tt) ts.runners[i] = newRunner(ctx, ts.logger, task, s.desiredState, s.executor, s.logWriter, tt)
} }
return ts return ts
@ -409,7 +402,7 @@ type runner struct {
// Cancelable context from parent taskScheduler. // Cancelable context from parent taskScheduler.
ctx context.Context ctx context.Context
taskID platform.ID task *StoreTask
desiredState DesiredState desiredState DesiredState
executor Executor executor Executor
@ -423,7 +416,7 @@ type runner struct {
func newRunner( func newRunner(
ctx context.Context, ctx context.Context,
logger *zap.Logger, logger *zap.Logger,
taskID platform.ID, task *StoreTask,
desiredState DesiredState, desiredState DesiredState,
executor Executor, executor Executor,
logWriter LogWriter, logWriter LogWriter,
@ -432,7 +425,7 @@ func newRunner(
return &runner{ return &runner{
ctx: ctx, ctx: ctx,
state: new(uint32), state: new(uint32),
taskID: taskID, task: task,
desiredState: desiredState, desiredState: desiredState,
executor: executor, executor: executor,
logWriter: logWriter, logWriter: logWriter,
@ -475,7 +468,7 @@ func (r *runner) startFromWorking() {
if next, ready := r.tt.NextScheduledRun(); ready { if next, ready := r.tt.NextScheduledRun(); ready {
// It's possible that two runners may attempt to create the same run for this "next" timestamp, // It's possible that two runners may attempt to create the same run for this "next" timestamp,
// but the contract of DesiredState requires that only one succeeds. // but the contract of DesiredState requires that only one succeeds.
qr, err := r.desiredState.CreateRun(r.ctx, r.taskID, next) qr, err := r.desiredState.CreateRun(r.ctx, r.task.ID, next)
if err != nil { if err != nil {
r.logger.Info("Failed to create run", zap.Error(err)) r.logger.Info("Failed to create run", zap.Error(err))
atomic.StoreUint32(r.state, runnerIdle) atomic.StoreUint32(r.state, runnerIdle)
@ -552,11 +545,11 @@ func (r *runner) executeAndWait(qr QueuedRun) {
func (r *runner) updateRunState(qr QueuedRun, s RunStatus) { func (r *runner) updateRunState(qr QueuedRun, s RunStatus) {
switch s { switch s {
case RunStarted: case RunStarted:
r.tt.metrics.StartRun(r.taskID.String()) r.tt.metrics.StartRun(r.task.ID.String())
case RunSuccess: case RunSuccess:
r.tt.metrics.FinishRun(r.taskID.String(), true) r.tt.metrics.FinishRun(r.task.ID.String(), true)
case RunFail, RunCanceled: case RunFail, RunCanceled:
r.tt.metrics.FinishRun(r.taskID.String(), false) r.tt.metrics.FinishRun(r.task.ID.String(), false)
default: default:
// We are deliberately not handling RunQueued yet. // We are deliberately not handling RunQueued yet.
// There is not really a notion of being queued in this runner architecture. // There is not really a notion of being queued in this runner architecture.
@ -567,7 +560,7 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus) {
// If we start seeing errors from this, we know the time limit is too short or the system is overloaded. // If we start seeing errors from this, we know the time limit is too short or the system is overloaded.
ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond) ctx, cancel := context.WithTimeout(r.ctx, 10*time.Millisecond)
defer cancel() defer cancel()
if err := r.logWriter.UpdateRunState(ctx, r.taskID, qr.RunID, time.Now(), s); err != nil { if err := r.logWriter.UpdateRunState(ctx, r.task, qr.RunID, time.Now(), s); err != nil {
r.logger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err)) r.logger.Info("Error updating run state", zap.Stringer("state", s), zap.Error(err))
} }
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"testing" "testing"
"time"
"github.com/influxdata/platform" "github.com/influxdata/platform"
"github.com/influxdata/platform/kit/prom" "github.com/influxdata/platform/kit/prom"
@ -11,28 +12,19 @@ import (
_ "github.com/influxdata/platform/query/builtin" _ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/task/backend" "github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/mock" "github.com/influxdata/platform/task/mock"
) "github.com/influxdata/platform/task/options"
const (
scriptEveryMinute = `option task = {
name: "name",
cron: "* * * * *",
}
// (every minute on the minute)
from(bucket:"b") |> toHTTP(url: "http://example.com")`
scriptEverySecond = `option task = {
name: "name",
every: 1s,
}
from(bucket:"b") |> toHTTP(url: "http://example.com")`
) )
func TestScheduler_EveryValidation(t *testing.T) { func TestScheduler_EveryValidation(t *testing.T) {
d := mock.NewDesiredState() d := mock.NewDesiredState()
e := mock.NewExecutor() e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
<<<<<<< HEAD
tid := platform.ID{1} tid := platform.ID{1}
=======
<<<<<<< HEAD
tid := platform.ID(1)
>>>>>>> feat(task): update the scheduler and logwriter interface
badScripts := []string{ badScripts := []string{
`option task = { `option task = {
@ -60,6 +52,30 @@ from(bucket:"b") |> toHTTP(url: "http://example.com")`,
for _, badScript := range badScripts { for _, badScript := range badScripts {
if err := o.ClaimTask(tid, badScript, 3, 99); err == nil { if err := o.ClaimTask(tid, badScript, 3, 99); err == nil {
t.Fatal("no error returned for :", badScript) t.Fatal("no error returned for :", badScript)
=======
task := &backend.StoreTask{
ID: platform.ID{1},
}
badOptions := []options.Options{
{
Every: time.Millisecond,
},
{
Every: time.Hour * -1,
},
{
Every: 1500 * time.Millisecond,
},
{
Every: 1232 * time.Millisecond,
},
}
for _, badOption := range badOptions {
if err := o.ClaimTask(task, 3, &badOption); err == nil {
t.Fatal("no error returned for :", badOption)
>>>>>>> feat(task): update the scheduler and logwriter interface
} }
} }
} }
@ -69,23 +85,47 @@ func TestScheduler_StartScriptOnClaim(t *testing.T) {
e := mock.NewExecutor() e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
<<<<<<< HEAD
tid := platform.ID{1} tid := platform.ID{1}
=======
<<<<<<< HEAD
tid := platform.ID(1)
>>>>>>> feat(task): update the scheduler and logwriter interface
if err := o.ClaimTask(tid, scriptEveryMinute, 3, 99); err != nil { if err := o.ClaimTask(tid, scriptEveryMinute, 3, 99); err != nil {
=======
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Minute}
if err := o.ClaimTask(task, 3, opts); err != nil {
>>>>>>> feat(task): update the scheduler and logwriter interface
t.Fatal(err) t.Fatal(err)
} }
// No valid timestamps between 3 and 5 for every minute. // No valid timestamps between 3 and 5 for every minute.
if n := len(d.CreatedFor(tid)); n > 0 { if n := len(d.CreatedFor(task.ID)); n > 0 {
t.Fatalf("expected no runs queued, but got %d", n) t.Fatalf("expected no runs queued, but got %d", n)
} }
// For every second, can queue for timestamps 4 and 5. // For every second, can queue for timestamps 4 and 5.
<<<<<<< HEAD
tid = platform.ID{2} tid = platform.ID{2}
=======
<<<<<<< HEAD
tid = platform.ID(2)
>>>>>>> feat(task): update the scheduler and logwriter interface
if err := o.ClaimTask(tid, scriptEverySecond, 3, 5); err != nil { if err := o.ClaimTask(tid, scriptEverySecond, 3, 5); err != nil {
=======
task = &backend.StoreTask{
ID: platform.ID{2},
}
opts = &options.Options{Every: time.Second, Concurrency: 99}
if err := o.ClaimTask(task, 3, opts); err != nil {
>>>>>>> feat(task): update the scheduler and logwriter interface
t.Fatal(err) t.Fatal(err)
} }
if n := len(d.CreatedFor(tid)); n != 2 { if n := len(d.CreatedFor(task.ID)); n != 2 {
t.Fatalf("expected 2 runs queued for 'every 1s' script, but got %d", n) t.Fatalf("expected 2 runs queued for 'every 1s' script, but got %d", n)
} }
} }
@ -95,36 +135,49 @@ func TestScheduler_CreateRunOnTick(t *testing.T) {
e := mock.NewExecutor() e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
<<<<<<< HEAD
tid := platform.ID{1} tid := platform.ID{1}
=======
<<<<<<< HEAD
tid := platform.ID(1)
>>>>>>> feat(task): update the scheduler and logwriter interface
if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
=======
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 2}
if err := o.ClaimTask(task, 5, opts); err != nil {
>>>>>>> feat(task): update the scheduler and logwriter interface
t.Fatal(err) t.Fatal(err)
} }
if x, err := d.PollForNumberCreated(tid, 0); err != nil { if x, err := d.PollForNumberCreated(task.ID, 0); err != nil {
t.Fatalf("expected no runs queued, but got %d", len(x)) t.Fatalf("expected no runs queued, but got %d", len(x))
} }
o.Tick(6) o.Tick(6)
if x, err := d.PollForNumberCreated(tid, 1); err != nil { if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
t.Fatalf("expected 1 run queued, but got %d", len(x)) t.Fatalf("expected 1 run queued, but got %d", len(x))
} }
running, err := e.PollForNumberRunning(tid, 1) running, err := e.PollForNumberRunning(task.ID, 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
run6 := running[0] run6 := running[0]
o.Tick(7) o.Tick(7)
if x, err := d.PollForNumberCreated(tid, 2); err != nil { if x, err := d.PollForNumberCreated(task.ID, 2); err != nil {
t.Fatalf("expected 2 runs queued, but got %d", len(x)) t.Fatalf("expected 2 runs queued, but got %d", len(x))
} }
o.Tick(8) // Can't exceed concurrency of 2. o.Tick(8) // Can't exceed concurrency of 2.
if x, err := d.PollForNumberCreated(tid, 2); err != nil { if x, err := d.PollForNumberCreated(task.ID, 2); err != nil {
t.Fatalf("expected 2 runs queued, but got %d", len(x)) t.Fatalf("expected 2 runs queued, but got %d", len(x))
} }
run6.Cancel() run6.Cancel()
if x, err := d.PollForNumberCreated(tid, 1); err != nil { if x, err := d.PollForNumberCreated(task.ID, 1); err != nil {
t.Fatal(err, x) t.Fatal(err, x)
} }
} }
@ -134,22 +187,35 @@ func TestScheduler_Release(t *testing.T) {
e := mock.NewExecutor() e := mock.NewExecutor()
o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5) o := backend.NewScheduler(d, e, backend.NopLogWriter{}, 5)
<<<<<<< HEAD
tid := platform.ID{1} tid := platform.ID{1}
=======
<<<<<<< HEAD
tid := platform.ID(1)
>>>>>>> feat(task): update the scheduler and logwriter interface
if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { if err := o.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
=======
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 99}
if err := o.ClaimTask(task, 5, opts); err != nil {
>>>>>>> feat(task): update the scheduler and logwriter interface
t.Fatal(err) t.Fatal(err)
} }
o.Tick(6) o.Tick(6)
o.Tick(7) o.Tick(7)
if n := len(d.CreatedFor(tid)); n != 2 { if n := len(d.CreatedFor(task.ID)); n != 2 {
t.Fatalf("expected 2 runs queued, but got %d", n) t.Fatalf("expected 2 runs queued, but got %d", n)
} }
if err := o.ReleaseTask(tid); err != nil { if err := o.ReleaseTask(task.ID); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if _, err := d.PollForNumberCreated(tid, 0); err != nil { if _, err := d.PollForNumberCreated(task.ID, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
} }
@ -161,22 +227,35 @@ func TestScheduler_RunLog(t *testing.T) {
s := backend.NewScheduler(d, e, rl, 5) s := backend.NewScheduler(d, e, rl, 5)
// Claim a task that starts later. // Claim a task that starts later.
<<<<<<< HEAD
tid := platform.ID{1} tid := platform.ID{1}
=======
<<<<<<< HEAD
tid := platform.ID(1)
>>>>>>> feat(task): update the scheduler and logwriter interface
if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
=======
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 99}
if err := s.ClaimTask(task, 5, opts); err != nil {
>>>>>>> feat(task): update the scheduler and logwriter interface
t.Fatal(err) t.Fatal(err)
} }
if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}); err != backend.ErrRunNotFound { if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err != backend.ErrRunNotFound {
t.Fatal(err) t.Fatal(err)
} }
s.Tick(6) s.Tick(6)
promises, err := e.PollForNumberRunning(tid, 1) promises, err := e.PollForNumberRunning(task.ID, 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -190,11 +269,11 @@ func TestScheduler_RunLog(t *testing.T) {
// Finish with success. // Finish with success.
promises[0].Finish(mock.NewRunResult(nil, false), nil) promises[0].Finish(mock.NewRunResult(nil, false), nil)
if _, err := e.PollForNumberRunning(tid, 0); err != nil { if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -208,12 +287,12 @@ func TestScheduler_RunLog(t *testing.T) {
// Create a new run, but fail this time. // Create a new run, but fail this time.
s.Tick(7) s.Tick(7)
promises, err = e.PollForNumberRunning(tid, 1) promises, err = e.PollForNumberRunning(task.ID, 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -227,11 +306,11 @@ func TestScheduler_RunLog(t *testing.T) {
// Finish with failure. // Finish with failure.
promises[0].Finish(nil, errors.New("forced failure")) promises[0].Finish(nil, errors.New("forced failure"))
if _, err := e.PollForNumberRunning(tid, 0); err != nil { if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -245,12 +324,12 @@ func TestScheduler_RunLog(t *testing.T) {
// One more run, but cancel this time. // One more run, but cancel this time.
s.Tick(8) s.Tick(8)
promises, err = e.PollForNumberRunning(tid, 1) promises, err = e.PollForNumberRunning(task.ID, 1)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -264,11 +343,11 @@ func TestScheduler_RunLog(t *testing.T) {
// Finish with failure. // Finish with failure.
promises[0].Cancel() promises[0].Cancel()
if _, err := e.PollForNumberRunning(tid, 0); err != nil { if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &tid}) runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -292,8 +371,21 @@ func TestScheduler_Metrics(t *testing.T) {
reg.MustRegister(s.(prom.PrometheusCollector).PrometheusCollectors()...) reg.MustRegister(s.(prom.PrometheusCollector).PrometheusCollectors()...)
// Claim a task that starts later. // Claim a task that starts later.
<<<<<<< HEAD
tid := platform.ID{1} tid := platform.ID{1}
=======
<<<<<<< HEAD
tid := platform.ID(1)
>>>>>>> feat(task): update the scheduler and logwriter interface
if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil { if err := s.ClaimTask(tid, scriptEverySecond, 5, 2); err != nil {
=======
task := &backend.StoreTask{
ID: platform.ID{1},
}
opts := &options.Options{Every: time.Second, Concurrency: 99}
if err := s.ClaimTask(task, 5, opts); err != nil {
>>>>>>> feat(task): update the scheduler and logwriter interface
t.Fatal(err) t.Fatal(err)
} }
@ -309,7 +401,7 @@ func TestScheduler_Metrics(t *testing.T) {
} }
s.Tick(6) s.Tick(6)
if _, err := e.PollForNumberRunning(tid, 1); err != nil { if _, err := e.PollForNumberRunning(task.ID, 1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -319,13 +411,13 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 1 { if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 total run active, got %v", got) t.Fatalf("expected 1 total run active, got %v", got)
} }
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 1 { if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 run active for task ID %s, got %v", tid.String(), got) t.Fatalf("expected 1 run active for task ID %s, got %v", task.ID.String(), got)
} }
s.Tick(7) s.Tick(7)
if _, err := e.PollForNumberRunning(tid, 2); err != nil { if _, err := e.PollForNumberRunning(task.ID, 2); err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -334,14 +426,14 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 2 { if got := *m.Gauge.Value; got != 2 {
t.Fatalf("expected 2 total runs active, got %v", got) t.Fatalf("expected 2 total runs active, got %v", got)
} }
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 2 { if got := *m.Gauge.Value; got != 2 {
t.Fatalf("expected 2 runs active for task ID %s, got %v", tid.String(), got) t.Fatalf("expected 2 runs active for task ID %s, got %v", task.ID.String(), got)
} }
// Runs active decreases as run finishes. // Runs active decreases as run finishes.
e.RunningFor(tid)[0].Finish(mock.NewRunResult(nil, false), nil) e.RunningFor(task.ID)[0].Finish(mock.NewRunResult(nil, false), nil)
if _, err := e.PollForNumberRunning(tid, 1); err != nil { if _, err := e.PollForNumberRunning(task.ID, 1); err != nil {
t.Fatal(err) t.Fatal(err)
} }
mfs = promtest.MustGather(t, reg) mfs = promtest.MustGather(t, reg)
@ -349,17 +441,17 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 1 { if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 total run active, got %v", got) t.Fatalf("expected 1 total run active, got %v", got)
} }
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 1 { if got := *m.Gauge.Value; got != 1 {
t.Fatalf("expected 1 run active for task ID %s, got %v", tid.String(), got) t.Fatalf("expected 1 run active for task ID %s, got %v", task.ID.String(), got)
} }
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "success"}) m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "success"})
if got := *m.Counter.Value; got != 1 { if got := *m.Counter.Value; got != 1 {
t.Fatalf("expected 1 run succeeded for task ID %s, got %v", tid.String(), got) t.Fatalf("expected 1 run succeeded for task ID %s, got %v", task.ID.String(), got)
} }
e.RunningFor(tid)[0].Finish(mock.NewRunResult(nil, false), errors.New("failed to execute")) e.RunningFor(task.ID)[0].Finish(mock.NewRunResult(nil, false), errors.New("failed to execute"))
if _, err := e.PollForNumberRunning(tid, 0); err != nil { if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
t.Fatal(err) t.Fatal(err)
} }
mfs = promtest.MustGather(t, reg) mfs = promtest.MustGather(t, reg)
@ -367,27 +459,27 @@ func TestScheduler_Metrics(t *testing.T) {
if got := *m.Gauge.Value; got != 0 { if got := *m.Gauge.Value; got != 0 {
t.Fatalf("expected 0 total runs active, got %v", got) t.Fatalf("expected 0 total runs active, got %v", got)
} }
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}) m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()})
if got := *m.Gauge.Value; got != 0 { if got := *m.Gauge.Value; got != 0 {
t.Fatalf("expected 0 runs active for task ID %s, got %v", tid.String(), got) t.Fatalf("expected 0 runs active for task ID %s, got %v", task.ID.String(), got)
} }
m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "failure"}) m = promtest.MustFindMetric(t, mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "failure"})
if got := *m.Counter.Value; got != 1 { if got := *m.Counter.Value; got != 1 {
t.Fatalf("expected 1 run failed for task ID %s, got %v", tid.String(), got) t.Fatalf("expected 1 run failed for task ID %s, got %v", task.ID.String(), got)
} }
// Runs label removed after task released. // Runs label removed after task released.
if err := s.ReleaseTask(tid); err != nil { if err := s.ReleaseTask(task.ID); err != nil {
t.Fatal(err) t.Fatal(err)
} }
mfs = promtest.MustGather(t, reg) mfs = promtest.MustGather(t, reg)
if m := promtest.FindMetric(mfs, "task_scheduler_runs_active", map[string]string{"task_id": tid.String()}); m != nil { if m := promtest.FindMetric(mfs, "task_scheduler_runs_active", map[string]string{"task_id": task.ID.String()}); m != nil {
t.Fatalf("expected metric to be removed after releasing a task, got %v", m) t.Fatalf("expected metric to be removed after releasing a task, got %v", m)
} }
if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "success"}); m != nil { if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "success"}); m != nil {
t.Fatalf("expected metric to be removed after releasing a task, got %v", m) t.Fatalf("expected metric to be removed after releasing a task, got %v", m)
} }
if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": tid.String(), "status": "failure"}); m != nil { if m := promtest.FindMetric(mfs, "task_scheduler_runs_complete", map[string]string{"task_id": task.ID.String(), "status": "failure"}); m != nil {
t.Fatalf("expected metric to be removed after releasing a task, got %v", m) t.Fatalf("expected metric to be removed after releasing a task, got %v", m)
} }

View File

@ -62,20 +62,20 @@ type Store interface {
// LogWriter writes task logs and task state changes to a store. // LogWriter writes task logs and task state changes to a store.
type LogWriter interface { type LogWriter interface {
// UpdateRunState sets the run state and the respective time. // UpdateRunState sets the run state and the respective time.
UpdateRunState(ctx context.Context, taskID, runID platform.ID, when time.Time, state RunStatus) error UpdateRunState(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, state RunStatus) error
// AddRunLog adds a log line to the run. // AddRunLog adds a log line to the run.
AddRunLog(ctx context.Context, taskID, runID platform.ID, when time.Time, log string) error AddRunLog(ctx context.Context, task *StoreTask, runID platform.ID, when time.Time, log string) error
} }
// NopLogWriter is a LogWriter that doesn't do anything when its methods are called. // NopLogWriter is a LogWriter that doesn't do anything when its methods are called.
// This is useful for test, but not much else. // This is useful for test, but not much else.
type NopLogWriter struct{} type NopLogWriter struct{}
func (NopLogWriter) UpdateRunState(context.Context, platform.ID, platform.ID, time.Time, RunStatus) error { func (NopLogWriter) UpdateRunState(context.Context, *StoreTask, platform.ID, time.Time, RunStatus) error {
return nil return nil
} }
func (NopLogWriter) AddRunLog(context.Context, platform.ID, platform.ID, time.Time, string) error { func (NopLogWriter) AddRunLog(context.Context, *StoreTask, platform.ID, time.Time, string) error {
return nil return nil
} }

View File

@ -40,7 +40,10 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
writer, reader := crf(t) writer, reader := crf(t)
defer drf(t, writer, reader) defer drf(t, writer, reader)
taskID := platform.ID([]byte("task")) task := &backend.StoreTask{
ID: platform.ID([]byte("task")),
Org: platform.ID([]byte("org")),
}
queuedAt := time.Unix(1, 0) queuedAt := time.Unix(1, 0)
run := platform.Run{ run := platform.Run{
ID: platform.ID([]byte("run")), ID: platform.ID([]byte("run")),
@ -48,12 +51,12 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
QueuedAt: queuedAt.Format(time.RFC3339), QueuedAt: queuedAt.Format(time.RFC3339),
} }
err := writer.UpdateRunState(context.Background(), taskID, run.ID, queuedAt, backend.RunQueued) err := writer.UpdateRunState(context.Background(), task, run.ID, queuedAt, backend.RunQueued)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID) returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -63,13 +66,13 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
} }
startAt := time.Unix(2, 0) startAt := time.Unix(2, 0)
if err := writer.UpdateRunState(context.Background(), taskID, run.ID, startAt, backend.RunStarted); err != nil { if err := writer.UpdateRunState(context.Background(), task, run.ID, startAt, backend.RunStarted); err != nil {
t.Fatal(err) t.Fatal(err)
} }
run.StartTime = startAt.Format(time.RFC3339) run.StartTime = startAt.Format(time.RFC3339)
run.Status = "started" run.Status = "started"
returnedRun, err = reader.FindRunByID(context.Background(), taskID, run.ID) returnedRun, err = reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -79,13 +82,13 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
} }
endAt := time.Unix(3, 0) endAt := time.Unix(3, 0)
if err := writer.UpdateRunState(context.Background(), taskID, run.ID, endAt, backend.RunSuccess); err != nil { if err := writer.UpdateRunState(context.Background(), task, run.ID, endAt, backend.RunSuccess); err != nil {
t.Fatal(err) t.Fatal(err)
} }
run.EndTime = endAt.Format(time.RFC3339) run.EndTime = endAt.Format(time.RFC3339)
run.Status = "success" run.Status = "success"
returnedRun, err = reader.FindRunByID(context.Background(), taskID, run.ID) returnedRun, err = reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -99,7 +102,11 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
writer, reader := crf(t) writer, reader := crf(t)
defer drf(t, writer, reader) defer drf(t, writer, reader)
taskID := platform.ID([]byte("task")) task := &backend.StoreTask{
ID: platform.ID([]byte("task")),
Org: platform.ID([]byte("org")),
}
run := platform.Run{ run := platform.Run{
ID: platform.ID([]byte("run")), ID: platform.ID([]byte("run")),
Status: "queued", Status: "queued",
@ -108,29 +115,29 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
logTime := time.Now() logTime := time.Now()
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "bad"); err == nil { if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "bad"); err == nil {
t.Fatal("shouldn't be able to log against non existing run") t.Fatal("shouldn't be able to log against non existing run")
} }
err := writer.UpdateRunState(context.Background(), taskID, run.ID, time.Now(), backend.RunQueued) err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunQueued)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "first"); err != nil { if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "first"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "second"); err != nil { if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "second"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if err := writer.AddRunLog(context.Background(), taskID, run.ID, logTime, "third"); err != nil { if err := writer.AddRunLog(context.Background(), task, run.ID, logTime, "third"); err != nil {
t.Fatal(err) t.Fatal(err)
} }
fmtLogTime := logTime.Format(time.RFC3339) fmtLogTime := logTime.Format(time.RFC3339)
run.Log = platform.Log(fmt.Sprintf("%s: first\n%s: second\n%s: third", fmtLogTime, fmtLogTime, fmtLogTime)) run.Log = platform.Log(fmt.Sprintf("%s: first\n%s: second\n%s: third", fmtLogTime, fmtLogTime, fmtLogTime))
returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID) returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -144,9 +151,12 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
writer, reader := crf(t) writer, reader := crf(t)
defer drf(t, writer, reader) defer drf(t, writer, reader)
taskID := platform.ID([]byte("task")) task := &backend.StoreTask{
ID: platform.ID([]byte("task")),
Org: platform.ID([]byte("org")),
}
if _, err := reader.ListRuns(context.Background(), platform.RunFilter{Task: &taskID}); err == nil { if _, err := reader.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err == nil {
t.Fatal("failed to error on bad id") t.Fatal("failed to error on bad id")
} }
@ -159,7 +169,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
QueuedAt: queuedAt.Format(time.RFC3339), QueuedAt: queuedAt.Format(time.RFC3339),
} }
err := writer.UpdateRunState(context.Background(), taskID, runs[i].ID, queuedAt, backend.RunQueued) err := writer.UpdateRunState(context.Background(), task, runs[i].ID, queuedAt, backend.RunQueued)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -171,7 +181,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
} }
listRuns, err := reader.ListRuns(context.Background(), platform.RunFilter{ listRuns, err := reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID, Task: &task.ID,
}) })
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -182,7 +192,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
} }
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID, Task: &task.ID,
After: &runs[20].ID, After: &runs[20].ID,
}) })
if err != nil { if err != nil {
@ -194,7 +204,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
} }
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID, Task: &task.ID,
Limit: 30, Limit: 30,
}) })
if err != nil { if err != nil {
@ -207,7 +217,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
queuedAt, _ := time.Parse(time.RFC3339, runs[34].QueuedAt) queuedAt, _ := time.Parse(time.RFC3339, runs[34].QueuedAt)
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID, Task: &task.ID,
AfterTime: queuedAt.Add(-1 * time.Nanosecond).Format(time.RFC3339), AfterTime: queuedAt.Add(-1 * time.Nanosecond).Format(time.RFC3339),
}) })
if err != nil { if err != nil {
@ -220,7 +230,7 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
queuedAt, _ = time.Parse(time.RFC3339, runs[34].QueuedAt) queuedAt, _ = time.Parse(time.RFC3339, runs[34].QueuedAt)
listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{ listRuns, err = reader.ListRuns(context.Background(), platform.RunFilter{
Task: &taskID, Task: &task.ID,
BeforeTime: queuedAt.Add(time.Nanosecond).Format(time.RFC3339), BeforeTime: queuedAt.Add(time.Nanosecond).Format(time.RFC3339),
}) })
if err != nil { if err != nil {
@ -240,18 +250,21 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu
t.Fatal("failed to error with bad id") t.Fatal("failed to error with bad id")
} }
taskID := platform.ID([]byte("task")) task := &backend.StoreTask{
ID: platform.ID([]byte("task")),
Org: platform.ID([]byte("org")),
}
run := platform.Run{ run := platform.Run{
ID: platform.ID([]byte("run")), ID: platform.ID([]byte("run")),
Status: "queued", Status: "queued",
QueuedAt: time.Now().Format(time.RFC3339), QueuedAt: time.Now().Format(time.RFC3339),
} }
if err := writer.UpdateRunState(context.Background(), taskID, run.ID, time.Now(), backend.RunQueued); err != nil { if err := writer.UpdateRunState(context.Background(), task, run.ID, time.Now(), backend.RunQueued); err != nil {
t.Fatal(err) t.Fatal(err)
} }
returnedRun, err := reader.FindRunByID(context.Background(), taskID, run.ID) returnedRun, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -262,7 +275,7 @@ func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFu
returnedRun.Log = "cows" returnedRun.Log = "cows"
rr2, err := reader.FindRunByID(context.Background(), taskID, run.ID) rr2, err := reader.FindRunByID(context.Background(), task.ID, run.ID)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -276,12 +289,15 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
writer, reader := crf(t) writer, reader := crf(t)
defer drf(t, writer, reader) defer drf(t, writer, reader)
taskID := platform.ID([]byte("task")) task := &backend.StoreTask{
ID: platform.ID([]byte("task")),
Org: platform.ID([]byte("org")),
}
if _, err := reader.ListLogs(context.Background(), platform.LogFilter{}); err == nil { if _, err := reader.ListLogs(context.Background(), platform.LogFilter{}); err == nil {
t.Fatal("failed to error with no filter") t.Fatal("failed to error with no filter")
} }
if _, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &taskID}); err == nil { if _, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &task.ID}); err == nil {
t.Fatal("failed to error with no filter") t.Fatal("failed to error with no filter")
} }
@ -293,12 +309,12 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
QueuedAt: time.Unix(int64(i), 0).Format(time.RFC3339), QueuedAt: time.Unix(int64(i), 0).Format(time.RFC3339),
} }
err := writer.UpdateRunState(context.Background(), taskID, runs[i].ID, time.Now(), backend.RunQueued) err := writer.UpdateRunState(context.Background(), task, runs[i].ID, time.Now(), backend.RunQueued)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
writer.AddRunLog(context.Background(), taskID, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i)) writer.AddRunLog(context.Background(), task, runs[i].ID, time.Unix(int64(i), 0), fmt.Sprintf("log%d", i))
} }
logs, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &runs[4].ID}) logs, err := reader.ListLogs(context.Background(), platform.LogFilter{Run: &runs[4].ID})
@ -311,7 +327,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
t.Fatalf("expected: %+v, got: %+v", fmtTimelog+": log4", string(logs[0])) t.Fatalf("expected: %+v, got: %+v", fmtTimelog+": log4", string(logs[0]))
} }
logs, err = reader.ListLogs(context.Background(), platform.LogFilter{Task: &taskID}) logs, err = reader.ListLogs(context.Background(), platform.LogFilter{Task: &task.ID})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }