commit
1573ff6c24
|
@ -41,7 +41,15 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, wh
|
|||
if !ok {
|
||||
tid := append([]byte(nil), rlb.Task.ID...)
|
||||
sf := time.Unix(rlb.RunScheduledFor, 0).UTC()
|
||||
run := &platform.Run{ID: rlb.RunID, TaskID: tid, Status: status.String(), ScheduledFor: sf.Format(time.RFC3339)}
|
||||
run := &platform.Run{
|
||||
ID: rlb.RunID,
|
||||
TaskID: tid,
|
||||
Status: status.String(),
|
||||
ScheduledFor: sf.Format(time.RFC3339),
|
||||
}
|
||||
if rlb.RequestedAt != 0 {
|
||||
run.RequestedAt = time.Unix(rlb.RequestedAt, 0).UTC().Format(time.RFC3339)
|
||||
}
|
||||
timeSetter(run)
|
||||
r.byRunID[ridStr] = run
|
||||
tidStr := rlb.Task.ID.String()
|
||||
|
@ -57,7 +65,6 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, wh
|
|||
func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
log = fmt.Sprintf("%s: %s", when.Format(time.RFC3339), log)
|
||||
ridStr := rlb.RunID.String()
|
||||
existingRun, ok := r.byRunID[ridStr]
|
||||
|
|
|
@ -149,8 +149,9 @@ func (stm *StoreTaskMeta) createNextRunFromQueue(now, nextDue int64, sch cron.Sc
|
|||
|
||||
return RunCreation{
|
||||
Created: QueuedRun{
|
||||
RunID: id,
|
||||
Now: runNow,
|
||||
RunID: id,
|
||||
Now: runNow,
|
||||
RequestedAt: q.RequestedAt,
|
||||
},
|
||||
NextDue: nextDue,
|
||||
HasQueue: len(stm.ManualRuns) > 0,
|
||||
|
|
|
@ -46,6 +46,9 @@ type Executor interface {
|
|||
type QueuedRun struct {
|
||||
TaskID, RunID platform.ID
|
||||
|
||||
// The Unix timestamp (seconds since January 1, 1970 UTC) that will be set when a run a manually requested
|
||||
RequestedAt int64
|
||||
|
||||
// The Unix timestamp (seconds since January 1, 1970 UTC) that will be set
|
||||
// as the "now" option when executing the task.
|
||||
Now int64
|
||||
|
@ -602,6 +605,7 @@ func (r *runner) updateRunState(qr QueuedRun, s RunStatus, runLogger *zap.Logger
|
|||
Task: r.task,
|
||||
RunID: qr.RunID,
|
||||
RunScheduledFor: qr.Now,
|
||||
RequestedAt: qr.RequestedAt,
|
||||
}
|
||||
|
||||
// Arbitrarily chosen short time limit for how fast the log write must complete.
|
||||
|
|
|
@ -148,6 +148,9 @@ type RunLogBase struct {
|
|||
|
||||
// The Unix timestamp indicating the run's scheduled time.
|
||||
RunScheduledFor int64
|
||||
|
||||
// When the log is requested, should be ignored when it is zero.
|
||||
RequestedAt int64
|
||||
}
|
||||
|
||||
// LogWriter writes task logs and task state changes to a store.
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/task/backend"
|
||||
)
|
||||
|
@ -86,7 +87,7 @@ func updateRunState(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFun
|
|||
}
|
||||
|
||||
if !reflect.DeepEqual(run, *returnedRun) {
|
||||
t.Fatalf("expected: %+v, got: %+v", run, returnedRun)
|
||||
t.Fatalf("expected: %+v, got: %+v, \n diff: %+v", run, *returnedRun, cmp.Diff(run, *returnedRun))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -144,7 +145,7 @@ func runLogTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
|
|||
}
|
||||
|
||||
if !reflect.DeepEqual(run, *returnedRun) {
|
||||
t.Fatalf("expected: %+v, got: %+v", run, returnedRun)
|
||||
t.Fatalf("expected: %+v, got: %+v,\n\ndiff: %+v", run, *returnedRun, cmp.Diff(run, *returnedRun))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue