2018-07-26 20:27:35 +00:00
|
|
|
package backend
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"fmt"
|
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/influxdata/platform"
|
|
|
|
)
|
|
|
|
|
|
|
|
var ErrRunNotFound error = errors.New("run not found")
|
|
|
|
|
|
|
|
type runReaderWriter struct {
|
|
|
|
mu sync.RWMutex
|
|
|
|
byTaskID map[string][]*platform.Run
|
|
|
|
byRunID map[string]*platform.Run
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewInMemRunReaderWriter() *runReaderWriter {
|
|
|
|
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}}
|
|
|
|
}
|
|
|
|
|
2018-09-06 21:55:55 +00:00
|
|
|
func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
|
2018-07-26 20:27:35 +00:00
|
|
|
r.mu.Lock()
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
timeSetter := func(r *platform.Run) {
|
2018-08-30 00:08:50 +00:00
|
|
|
whenStr := when.UTC().Format(time.RFC3339)
|
2018-07-26 20:27:35 +00:00
|
|
|
switch status {
|
|
|
|
case RunStarted:
|
2018-08-14 18:25:19 +00:00
|
|
|
r.StartedAt = whenStr
|
2018-07-26 20:27:35 +00:00
|
|
|
case RunFail, RunSuccess, RunCanceled:
|
2018-08-14 18:25:19 +00:00
|
|
|
r.FinishedAt = whenStr
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-06 21:55:55 +00:00
|
|
|
ridStr := rlb.RunID.String()
|
|
|
|
existingRun, ok := r.byRunID[ridStr]
|
2018-07-26 20:27:35 +00:00
|
|
|
if !ok {
|
2018-09-06 21:55:55 +00:00
|
|
|
tid := append([]byte(nil), rlb.Task.ID...)
|
|
|
|
sf := time.Unix(rlb.RunScheduledFor, 0).UTC()
|
2018-09-27 13:19:26 +00:00
|
|
|
run := &platform.Run{
|
|
|
|
ID: rlb.RunID,
|
|
|
|
TaskID: tid,
|
|
|
|
Status: status.String(),
|
|
|
|
ScheduledFor: sf.Format(time.RFC3339),
|
|
|
|
}
|
|
|
|
if rlb.RequestedAt != 0 {
|
|
|
|
run.RequestedAt = time.Unix(rlb.RequestedAt, 0).UTC().Format(time.RFC3339)
|
|
|
|
}
|
2018-07-26 20:27:35 +00:00
|
|
|
timeSetter(run)
|
2018-09-06 21:55:55 +00:00
|
|
|
r.byRunID[ridStr] = run
|
|
|
|
tidStr := rlb.Task.ID.String()
|
|
|
|
r.byTaskID[tidStr] = append(r.byTaskID[tidStr], run)
|
2018-07-26 20:27:35 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
timeSetter(existingRun)
|
|
|
|
existingRun.Status = status.String()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-09-06 21:55:55 +00:00
|
|
|
func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when time.Time, log string) error {
|
2018-07-26 20:27:35 +00:00
|
|
|
r.mu.Lock()
|
|
|
|
defer r.mu.Unlock()
|
|
|
|
log = fmt.Sprintf("%s: %s", when.Format(time.RFC3339), log)
|
2018-09-06 21:55:55 +00:00
|
|
|
ridStr := rlb.RunID.String()
|
|
|
|
existingRun, ok := r.byRunID[ridStr]
|
2018-07-26 20:27:35 +00:00
|
|
|
if !ok {
|
|
|
|
return ErrRunNotFound
|
|
|
|
}
|
2018-09-06 21:55:55 +00:00
|
|
|
sep := ""
|
2018-07-26 20:27:35 +00:00
|
|
|
if existingRun.Log != "" {
|
2018-09-06 21:55:55 +00:00
|
|
|
sep = "\n"
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
2018-09-06 21:55:55 +00:00
|
|
|
existingRun.Log = platform.Log(string(existingRun.Log) + sep + log)
|
2018-07-26 20:27:35 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *runReaderWriter) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) {
|
|
|
|
r.mu.RLock()
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
|
|
|
if runFilter.Task == nil {
|
|
|
|
return nil, errors.New("task is required")
|
|
|
|
}
|
|
|
|
|
|
|
|
_, ok := r.byTaskID[runFilter.Task.String()]
|
|
|
|
if !ok {
|
|
|
|
return nil, ErrRunNotFound
|
|
|
|
}
|
|
|
|
|
|
|
|
runs := make([]*platform.Run, len(r.byTaskID[runFilter.Task.String()]))
|
|
|
|
copy(runs, r.byTaskID[runFilter.Task.String()])
|
|
|
|
|
|
|
|
sort.Slice(runs, func(i int, j int) bool {
|
2018-08-14 18:25:19 +00:00
|
|
|
return runs[i].ScheduledFor < runs[j].ScheduledFor
|
2018-07-26 20:27:35 +00:00
|
|
|
})
|
|
|
|
|
|
|
|
beforeCheck := runFilter.BeforeTime != ""
|
|
|
|
afterCheck := runFilter.AfterTime != ""
|
|
|
|
|
|
|
|
afterIndex := 0
|
|
|
|
beforeIndex := len(runs)
|
|
|
|
|
|
|
|
for i, run := range runs {
|
|
|
|
if afterIndex == 0 && runFilter.After != nil && runFilter.After.String() == run.ID.String() {
|
|
|
|
afterIndex = i
|
|
|
|
}
|
|
|
|
|
2018-08-14 18:25:19 +00:00
|
|
|
if run.ScheduledFor != "" {
|
|
|
|
if afterCheck && afterIndex == 0 && run.ScheduledFor > runFilter.AfterTime {
|
2018-07-26 20:27:35 +00:00
|
|
|
afterIndex = i
|
|
|
|
}
|
|
|
|
|
2018-08-14 18:25:19 +00:00
|
|
|
if beforeCheck && beforeIndex == len(runs) && runFilter.BeforeTime < run.ScheduledFor {
|
2018-07-26 20:27:35 +00:00
|
|
|
beforeIndex = i
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if runFilter.Limit != 0 && beforeIndex-afterIndex > runFilter.Limit {
|
|
|
|
beforeIndex = afterIndex + runFilter.Limit
|
|
|
|
}
|
|
|
|
|
2018-09-15 04:33:08 +00:00
|
|
|
runs = runs[afterIndex:beforeIndex]
|
|
|
|
for i := range runs {
|
|
|
|
// Copy every element, to avoid a data race if the original Run is modified in UpdateRunState or AddRunLog.
|
|
|
|
r := *runs[i]
|
|
|
|
runs[i] = &r
|
|
|
|
}
|
|
|
|
return runs, nil
|
2018-07-26 20:27:35 +00:00
|
|
|
}
|
|
|
|
|
2018-10-08 20:07:08 +00:00
|
|
|
func (r *runReaderWriter) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
|
2018-07-26 20:27:35 +00:00
|
|
|
r.mu.RLock()
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
|
|
|
run, ok := r.byRunID[runID.String()]
|
|
|
|
if !ok {
|
|
|
|
return nil, ErrRunNotFound
|
|
|
|
}
|
|
|
|
|
|
|
|
var rtnRun platform.Run
|
|
|
|
rtnRun = *run
|
|
|
|
return &rtnRun, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *runReaderWriter) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) {
|
|
|
|
r.mu.RLock()
|
|
|
|
defer r.mu.RUnlock()
|
|
|
|
|
|
|
|
if logFilter.Task == nil && logFilter.Run == nil {
|
|
|
|
return nil, errors.New("task or run is required")
|
|
|
|
}
|
|
|
|
|
|
|
|
if logFilter.Run != nil {
|
|
|
|
run, ok := r.byRunID[logFilter.Run.String()]
|
|
|
|
if !ok {
|
|
|
|
return nil, ErrRunNotFound
|
|
|
|
}
|
|
|
|
return []platform.Log{run.Log}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
logs := []platform.Log{}
|
|
|
|
for _, run := range r.byTaskID[logFilter.Task.String()] {
|
|
|
|
logs = append(logs, run.Log)
|
|
|
|
}
|
|
|
|
return logs, nil
|
|
|
|
}
|