refactor(task): remove explicit org from run/log lookup
Task ID is now a required value on run and log filters. It was effectively required by all implementations before anyway, so now those types reflect that requirement. Organization ID was removed from those same fields. The TaskService looks up the organization ID via the task in cases where we need it at a lower layer.pull/11963/head
parent
79dfbb6875
commit
cbef811c63
|
@ -378,7 +378,6 @@ func taskDeleteF(cmd *cobra.Command, args []string) error {
|
||||||
type TaskLogFindFlags struct {
|
type TaskLogFindFlags struct {
|
||||||
taskID string
|
taskID string
|
||||||
runID string
|
runID string
|
||||||
orgID string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var taskLogFindFlags TaskLogFindFlags
|
var taskLogFindFlags TaskLogFindFlags
|
||||||
|
@ -392,7 +391,6 @@ func init() {
|
||||||
|
|
||||||
taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.taskID, "task-id", "", "", "task id (required)")
|
taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.taskID, "task-id", "", "", "task id (required)")
|
||||||
taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.runID, "run-id", "", "", "run id")
|
taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.runID, "run-id", "", "", "run id")
|
||||||
taskLogFindCmd.Flags().StringVarP(&taskLogFindFlags.orgID, "org-id", "", "", "organization id")
|
|
||||||
taskLogFindCmd.MarkFlagRequired("task-id")
|
taskLogFindCmd.MarkFlagRequired("task-id")
|
||||||
|
|
||||||
logCmd.AddCommand(taskLogFindCmd)
|
logCmd.AddCommand(taskLogFindCmd)
|
||||||
|
@ -409,7 +407,7 @@ func taskLogFindF(cmd *cobra.Command, args []string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
filter.Task = id
|
filter.Task = *id
|
||||||
|
|
||||||
if taskLogFindFlags.runID != "" {
|
if taskLogFindFlags.runID != "" {
|
||||||
id, err := platform.IDFromString(taskLogFindFlags.runID)
|
id, err := platform.IDFromString(taskLogFindFlags.runID)
|
||||||
|
@ -419,14 +417,6 @@ func taskLogFindF(cmd *cobra.Command, args []string) error {
|
||||||
filter.Run = id
|
filter.Run = id
|
||||||
}
|
}
|
||||||
|
|
||||||
if taskLogFindFlags.orgID != "" {
|
|
||||||
id, err := platform.IDFromString(taskLogFindFlags.orgID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
filter.Org = id
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
logs, _, err := s.FindLogs(ctx, filter)
|
logs, _, err := s.FindLogs(ctx, filter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -451,7 +441,6 @@ func taskLogFindF(cmd *cobra.Command, args []string) error {
|
||||||
type TaskRunFindFlags struct {
|
type TaskRunFindFlags struct {
|
||||||
runID string
|
runID string
|
||||||
taskID string
|
taskID string
|
||||||
orgID string
|
|
||||||
afterTime string
|
afterTime string
|
||||||
beforeTime string
|
beforeTime string
|
||||||
limit int
|
limit int
|
||||||
|
@ -468,13 +457,11 @@ func init() {
|
||||||
|
|
||||||
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.taskID, "task-id", "", "", "task id (required)")
|
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.taskID, "task-id", "", "", "task id (required)")
|
||||||
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.runID, "run-id", "", "", "run id")
|
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.runID, "run-id", "", "", "run id")
|
||||||
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.orgID, "org-id", "", "", "organization id")
|
|
||||||
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.afterTime, "after", "", "", "after time for filtering")
|
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.afterTime, "after", "", "", "after time for filtering")
|
||||||
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.beforeTime, "before", "", "", "before time for filtering")
|
taskRunFindCmd.Flags().StringVarP(&taskRunFindFlags.beforeTime, "before", "", "", "before time for filtering")
|
||||||
taskRunFindCmd.Flags().IntVarP(&taskRunFindFlags.limit, "limit", "", 0, "limit the results")
|
taskRunFindCmd.Flags().IntVarP(&taskRunFindFlags.limit, "limit", "", 0, "limit the results")
|
||||||
|
|
||||||
taskRunFindCmd.MarkFlagRequired("task-id")
|
taskRunFindCmd.MarkFlagRequired("task-id")
|
||||||
taskRunFindCmd.MarkFlagRequired("org-id")
|
|
||||||
|
|
||||||
runCmd.AddCommand(taskRunFindCmd)
|
runCmd.AddCommand(taskRunFindCmd)
|
||||||
}
|
}
|
||||||
|
@ -494,13 +481,7 @@ func taskRunFindF(cmd *cobra.Command, args []string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
filter.Task = taskID
|
filter.Task = *taskID
|
||||||
|
|
||||||
orgID, err := platform.IDFromString(taskRunFindFlags.orgID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
filter.Org = orgID
|
|
||||||
|
|
||||||
var runs []*platform.Run
|
var runs []*platform.Run
|
||||||
if taskRunFindFlags.runID != "" {
|
if taskRunFindFlags.runID != "" {
|
||||||
|
@ -508,7 +489,7 @@ func taskRunFindF(cmd *cobra.Command, args []string) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
run, err := s.FindRunByID(context.Background(), *filter.Org, *id)
|
run, err := s.FindRunByID(context.Background(), filter.Task, *id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -122,7 +122,7 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
|
||||||
}
|
}
|
||||||
time.Sleep(5 * time.Millisecond)
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
|
||||||
runs, _, err := be.TaskService().FindRuns(ctx, influxdb.RunFilter{Org: &org.ID, Task: &created.ID, Limit: 1})
|
runs, _, err := be.TaskService().FindRuns(ctx, influxdb.RunFilter{Task: created.ID, Limit: 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -219,7 +219,7 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
|
||||||
})
|
})
|
||||||
|
|
||||||
// now lets see a logs
|
// now lets see a logs
|
||||||
logs, _, err := be.TaskService().FindLogs(ctx, influxdb.LogFilter{Org: &org.ID, Task: &created.ID, Run: &targetRun.ID})
|
logs, _, err := be.TaskService().FindLogs(ctx, influxdb.LogFilter{Task: created.ID, Run: &targetRun.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -766,24 +766,7 @@ func decodeGetLogsRequest(ctx context.Context, r *http.Request, orgs platform.Or
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
req.filter.Task = taskID
|
req.filter.Task = *taskID
|
||||||
|
|
||||||
qp := r.URL.Query()
|
|
||||||
|
|
||||||
if orgName := qp.Get("org"); orgName != "" {
|
|
||||||
o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
req.filter.Org = &o.ID
|
|
||||||
} else if oid := qp.Get("orgID"); oid != "" {
|
|
||||||
orgID, err := platform.IDFromString(oid)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.filter.Org = orgID
|
|
||||||
}
|
|
||||||
|
|
||||||
if runID := params.ByName("rid"); runID != "" {
|
if runID := params.ByName("rid"); runID != "" {
|
||||||
id, err := platform.IDFromString(runID)
|
id, err := platform.IDFromString(runID)
|
||||||
|
@ -823,7 +806,7 @@ func (h *TaskHandler) handleGetRuns(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := encodeResponse(ctx, w, http.StatusOK, newRunsResponse(runs, *req.filter.Task)); err != nil {
|
if err := encodeResponse(ctx, w, http.StatusOK, newRunsResponse(runs, req.filter.Task)); err != nil {
|
||||||
logEncodingError(h.logger, r, err)
|
logEncodingError(h.logger, r, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -848,25 +831,10 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request, orgs platform.Or
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
req.filter.Task = taskID
|
req.filter.Task = *taskID
|
||||||
|
|
||||||
qp := r.URL.Query()
|
qp := r.URL.Query()
|
||||||
|
|
||||||
if orgName := qp.Get("org"); orgName != "" {
|
|
||||||
o, err := orgs.FindOrganization(ctx, platform.OrganizationFilter{Name: &orgName})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
req.filter.Org = &o.ID
|
|
||||||
} else if orgID := qp.Get("orgID"); orgID != "" {
|
|
||||||
oid, err := platform.IDFromString(orgID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req.filter.Org = oid
|
|
||||||
}
|
|
||||||
|
|
||||||
if id := qp.Get("after"); id != "" {
|
if id := qp.Get("after"); id != "" {
|
||||||
afterID, err := platform.IDFromString(id)
|
afterID, err := platform.IDFromString(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1401,26 +1369,21 @@ func (t TaskService) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
|
|
||||||
// FindLogs returns logs for a run.
|
// FindLogs returns logs for a run.
|
||||||
func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
if filter.Task == nil {
|
if !filter.Task.Valid() {
|
||||||
return nil, 0, errors.New("task ID required")
|
return nil, 0, errors.New("task ID required")
|
||||||
}
|
}
|
||||||
|
|
||||||
var urlPath string
|
var urlPath string
|
||||||
if filter.Run == nil {
|
if filter.Run == nil {
|
||||||
urlPath = path.Join(taskIDPath(*filter.Task), "logs")
|
urlPath = path.Join(taskIDPath(filter.Task), "logs")
|
||||||
} else {
|
} else {
|
||||||
urlPath = path.Join(taskIDRunIDPath(*filter.Task, *filter.Run), "logs")
|
urlPath = path.Join(taskIDRunIDPath(filter.Task, *filter.Run), "logs")
|
||||||
}
|
}
|
||||||
|
|
||||||
u, err := newURL(t.Addr, urlPath)
|
u, err := newURL(t.Addr, urlPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
val := url.Values{}
|
|
||||||
if filter.Org != nil {
|
|
||||||
val.Set("orgID", filter.Org.String())
|
|
||||||
}
|
|
||||||
u.RawQuery = val.Encode()
|
|
||||||
|
|
||||||
req, err := http.NewRequest("GET", u.String(), nil)
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1450,19 +1413,16 @@ func (t TaskService) FindLogs(ctx context.Context, filter platform.LogFilter) ([
|
||||||
|
|
||||||
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
|
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
|
||||||
func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
if filter.Task == nil {
|
if !filter.Task.Valid() {
|
||||||
return nil, 0, errors.New("task ID required")
|
return nil, 0, errors.New("task ID required")
|
||||||
}
|
}
|
||||||
|
|
||||||
u, err := newURL(t.Addr, taskIDRunsPath(*filter.Task))
|
u, err := newURL(t.Addr, taskIDRunsPath(filter.Task))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return nil, 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
val := url.Values{}
|
val := url.Values{}
|
||||||
if filter.Org != nil {
|
|
||||||
val.Set("orgID", filter.Org.String())
|
|
||||||
}
|
|
||||||
if filter.After != nil {
|
if filter.After != nil {
|
||||||
val.Set("after", filter.After.String())
|
val.Set("after", filter.After.String())
|
||||||
}
|
}
|
||||||
|
|
|
@ -440,7 +440,7 @@ func TestTaskHandler_handleGetRuns(t *testing.T) {
|
||||||
runs := []*platform.Run{
|
runs := []*platform.Run{
|
||||||
{
|
{
|
||||||
ID: platform.ID(2),
|
ID: platform.ID(2),
|
||||||
TaskID: *f.Task,
|
TaskID: f.Task,
|
||||||
Status: "success",
|
Status: "success",
|
||||||
ScheduledFor: "2018-12-01T17:00:13Z",
|
ScheduledFor: "2018-12-01T17:00:13Z",
|
||||||
StartedAt: "2018-12-01T17:00:03.155645Z",
|
StartedAt: "2018-12-01T17:00:03.155645Z",
|
||||||
|
@ -617,7 +617,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
name: "get task logs",
|
name: "get task logs",
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
|
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
if *f.Task == taskID {
|
if f.Task == taskID {
|
||||||
return nil, 0, nil
|
return nil, 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -633,7 +633,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
name: "get run logs",
|
name: "get run logs",
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
|
FindLogsFn: func(_ context.Context, f platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
if *f.Task != taskID {
|
if f.Task != taskID {
|
||||||
return nil, 0, backend.ErrTaskNotFound
|
return nil, 0, backend.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
if *f.Run != runID {
|
if *f.Run != runID {
|
||||||
|
@ -652,7 +652,7 @@ func TestTaskHandler_NotFoundStatus(t *testing.T) {
|
||||||
name: "get runs",
|
name: "get runs",
|
||||||
svc: &mock.TaskService{
|
svc: &mock.TaskService{
|
||||||
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
|
FindRunsFn: func(_ context.Context, f platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
if *f.Task != taskID {
|
if f.Task != taskID {
|
||||||
return nil, 0, backend.ErrTaskNotFound
|
return nil, 0, backend.ErrTaskNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
15
task.go
15
task.go
|
@ -251,17 +251,20 @@ type TaskFilter struct {
|
||||||
|
|
||||||
// RunFilter represents a set of filters that restrict the returned results
|
// RunFilter represents a set of filters that restrict the returned results
|
||||||
type RunFilter struct {
|
type RunFilter struct {
|
||||||
Org *ID
|
// Task ID is required for listing runs.
|
||||||
Task *ID
|
Task ID
|
||||||
|
|
||||||
After *ID
|
After *ID
|
||||||
Limit int
|
Limit int
|
||||||
AfterTime string
|
AfterTime string
|
||||||
BeforeTime string
|
BeforeTime string
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogFilter represents a set of filters that restrict the returned results
|
// LogFilter represents a set of filters that restrict the returned log results.
|
||||||
type LogFilter struct {
|
type LogFilter struct {
|
||||||
Org *ID
|
// Task ID is required.
|
||||||
Task *ID
|
Task ID
|
||||||
Run *ID
|
|
||||||
|
// The optional Run ID limits logs to a single run.
|
||||||
|
Run *ID
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,14 +10,20 @@ import (
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// orgtask is used as a key for storing runs by org and task ID.
|
||||||
|
// This is only relevant for the in-memory run store.
|
||||||
|
type orgtask struct {
|
||||||
|
o, t platform.ID
|
||||||
|
}
|
||||||
|
|
||||||
type runReaderWriter struct {
|
type runReaderWriter struct {
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
byTaskID map[string][]*platform.Run
|
byOrgTask map[orgtask][]*platform.Run
|
||||||
byRunID map[string]*platform.Run
|
byRunID map[string]*platform.Run
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInMemRunReaderWriter() *runReaderWriter {
|
func NewInMemRunReaderWriter() *runReaderWriter {
|
||||||
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byTaskID: map[string][]*platform.Run{}}
|
return &runReaderWriter{byRunID: map[string]*platform.Run{}, byOrgTask: map[orgtask][]*platform.Run{}}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
|
func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, when time.Time, status RunStatus) error {
|
||||||
|
@ -48,8 +54,8 @@ func (r *runReaderWriter) UpdateRunState(ctx context.Context, rlb RunLogBase, wh
|
||||||
}
|
}
|
||||||
timeSetter(run)
|
timeSetter(run)
|
||||||
r.byRunID[ridStr] = run
|
r.byRunID[ridStr] = run
|
||||||
tidStr := rlb.Task.ID.String()
|
ot := orgtask{o: rlb.Task.Org, t: rlb.Task.ID}
|
||||||
r.byTaskID[tidStr] = append(r.byTaskID[tidStr], run)
|
r.byOrgTask[ot] = append(r.byOrgTask[ot], run)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,15 +81,15 @@ func (r *runReaderWriter) AddRunLog(ctx context.Context, rlb RunLogBase, when ti
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runReaderWriter) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) {
|
func (r *runReaderWriter) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) {
|
||||||
r.mu.RLock()
|
r.mu.RLock()
|
||||||
defer r.mu.RUnlock()
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
if runFilter.Task == nil {
|
if !runFilter.Task.Valid() {
|
||||||
return nil, errors.New("task is required")
|
return nil, errors.New("task is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
ex, ok := r.byTaskID[runFilter.Task.String()]
|
ex, ok := r.byOrgTask[orgtask{o: orgID, t: runFilter.Task}]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, ErrRunNotFound
|
return nil, ErrRunNotFound
|
||||||
}
|
}
|
||||||
|
@ -129,12 +135,12 @@ func (r *runReaderWriter) FindRunByID(ctx context.Context, orgID, runID platform
|
||||||
return &rtnRun, nil
|
return &rtnRun, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *runReaderWriter) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) {
|
func (r *runReaderWriter) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) {
|
||||||
r.mu.RLock()
|
r.mu.RLock()
|
||||||
defer r.mu.RUnlock()
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
if logFilter.Task == nil && logFilter.Run == nil {
|
if !logFilter.Task.Valid() {
|
||||||
return nil, errors.New("task or run is required")
|
return nil, errors.New("task ID required")
|
||||||
}
|
}
|
||||||
|
|
||||||
if logFilter.Run != nil {
|
if logFilter.Run != nil {
|
||||||
|
@ -142,12 +148,19 @@ func (r *runReaderWriter) ListLogs(ctx context.Context, logFilter platform.LogFi
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, ErrRunNotFound
|
return nil, ErrRunNotFound
|
||||||
}
|
}
|
||||||
|
// TODO(mr): validate that task ID matches, if task is also set. Needs test.
|
||||||
return []platform.Log{run.Log}, nil
|
return []platform.Log{run.Log}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logs := []platform.Log{}
|
var logs []platform.Log
|
||||||
for _, run := range r.byTaskID[logFilter.Task.String()] {
|
ot := orgtask{o: orgID, t: logFilter.Task}
|
||||||
|
for _, run := range r.byOrgTask[ot] {
|
||||||
logs = append(logs, run.Log)
|
logs = append(logs, run.Log)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(logs) == 0 {
|
||||||
|
return nil, errors.New("no matching runs found")
|
||||||
|
}
|
||||||
|
|
||||||
return logs, nil
|
return logs, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,18 +22,17 @@ type QueryLogReader struct {
|
||||||
queryService query.QueryService
|
queryService query.QueryService
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ LogReader = (*QueryLogReader)(nil)
|
||||||
|
|
||||||
func NewQueryLogReader(qs query.QueryService) *QueryLogReader {
|
func NewQueryLogReader(qs query.QueryService) *QueryLogReader {
|
||||||
return &QueryLogReader{
|
return &QueryLogReader{
|
||||||
queryService: qs,
|
queryService: qs,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) {
|
func (qlr *QueryLogReader) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) {
|
||||||
if logFilter.Org == nil {
|
if !logFilter.Task.Valid() {
|
||||||
return nil, errors.New("org required")
|
return nil, errors.New("task ID required to list logs")
|
||||||
}
|
|
||||||
if logFilter.Task == nil && logFilter.Run == nil {
|
|
||||||
return nil, errors.New("task or run is required")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
filterPart := ""
|
filterPart := ""
|
||||||
|
@ -58,7 +57,7 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF
|
||||||
if auth.Kind() != "authorization" {
|
if auth.Kind() != "authorization" {
|
||||||
return nil, platform.ErrAuthorizerNotSupported
|
return nil, platform.ErrAuthorizerNotSupported
|
||||||
}
|
}
|
||||||
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: *logFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}}
|
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}}
|
||||||
|
|
||||||
ittr, err := qlr.queryService.Query(ctx, request)
|
ittr, err := qlr.queryService.Query(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -78,6 +77,10 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF
|
||||||
}
|
}
|
||||||
|
|
||||||
runs := re.Runs()
|
runs := re.Runs()
|
||||||
|
if len(runs) == 0 {
|
||||||
|
return nil, errors.New("no matching runs found")
|
||||||
|
}
|
||||||
|
|
||||||
logs := make([]platform.Log, len(runs))
|
logs := make([]platform.Log, len(runs))
|
||||||
for i, r := range runs {
|
for i, r := range runs {
|
||||||
logs[i] = r.Log
|
logs[i] = r.Log
|
||||||
|
@ -85,13 +88,10 @@ func (qlr *QueryLogReader) ListLogs(ctx context.Context, logFilter platform.LogF
|
||||||
return logs, nil
|
return logs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qlr *QueryLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) {
|
func (qlr *QueryLogReader) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) {
|
||||||
if runFilter.Task == nil {
|
if !runFilter.Task.Valid() {
|
||||||
return nil, errors.New("task required")
|
return nil, errors.New("task required")
|
||||||
}
|
}
|
||||||
if runFilter.Org == nil {
|
|
||||||
return nil, errors.New("org required")
|
|
||||||
}
|
|
||||||
|
|
||||||
limit := "|> limit(n: 100)\n"
|
limit := "|> limit(n: 100)\n"
|
||||||
if runFilter.Limit > 0 {
|
if runFilter.Limit > 0 {
|
||||||
|
@ -129,14 +129,23 @@ from(bucketID: "000000000000000a")
|
||||||
if auth.Kind() != "authorization" {
|
if auth.Kind() != "authorization" {
|
||||||
return nil, platform.ErrAuthorizerNotSupported
|
return nil, platform.ErrAuthorizerNotSupported
|
||||||
}
|
}
|
||||||
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: *runFilter.Org, Compiler: lang.FluxCompiler{Query: listScript}}
|
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}}
|
||||||
|
|
||||||
ittr, err := qlr.queryService.Query(ctx, request)
|
ittr, err := qlr.queryService.Query(ctx, request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return queryIttrToRuns(ittr)
|
runs, err := queryIttrToRuns(ittr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(runs) == 0 {
|
||||||
|
return nil, errors.New("no matching runs found")
|
||||||
|
}
|
||||||
|
|
||||||
|
return runs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
|
func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
|
||||||
|
|
|
@ -29,8 +29,10 @@ func TestScheduler_Cancelation(t *testing.T) {
|
||||||
o.Start(context.Background())
|
o.Start(context.Background())
|
||||||
defer o.Stop()
|
defer o.Stop()
|
||||||
|
|
||||||
|
const orgID = 2
|
||||||
task := &backend.StoreTask{
|
task := &backend.StoreTask{
|
||||||
ID: platform.ID(1),
|
ID: platform.ID(1),
|
||||||
|
Org: orgID,
|
||||||
}
|
}
|
||||||
meta := &backend.StoreTaskMeta{
|
meta := &backend.StoreTaskMeta{
|
||||||
MaxConcurrency: 1,
|
MaxConcurrency: 1,
|
||||||
|
@ -41,7 +43,7 @@ func TestScheduler_Cancelation(t *testing.T) {
|
||||||
if err := o.ClaimTask(task, meta); err != nil {
|
if err := o.ClaimTask(task, meta); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
|
runs, err := rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -49,7 +51,7 @@ func TestScheduler_Cancelation(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
time.Sleep(10 * time.Millisecond) // we have to do this because the storage system we are using for the logs is eventually consistent.
|
time.Sleep(10 * time.Millisecond) // we have to do this because the storage system we are using for the logs is eventually consistent.
|
||||||
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
|
runs, err = rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -58,7 +60,7 @@ func TestScheduler_Cancelation(t *testing.T) {
|
||||||
}
|
}
|
||||||
// check to make sure it is really canceling, and that the status doesn't get changed to something else after it would have finished
|
// check to make sure it is really canceling, and that the status doesn't get changed to something else after it would have finished
|
||||||
time.Sleep(500 * time.Millisecond)
|
time.Sleep(500 * time.Millisecond)
|
||||||
runs, err = rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
|
runs, err = rl.ListRuns(context.Background(), orgID, platform.RunFilter{Task: task.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -395,7 +397,7 @@ func TestScheduler_Queue(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// pollForRunStatus tries a few times to find runs matching supplied conditions, before failing.
|
// pollForRunStatus tries a few times to find runs matching supplied conditions, before failing.
|
||||||
func pollForRunStatus(t *testing.T, r backend.LogReader, taskID platform.ID, expCount, expIndex int, expStatus string) {
|
func pollForRunStatus(t *testing.T, r backend.LogReader, taskID, orgID platform.ID, expCount, expIndex int, expStatus string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
var runs []*platform.Run
|
var runs []*platform.Run
|
||||||
|
@ -407,7 +409,7 @@ func pollForRunStatus(t *testing.T, r backend.LogReader, taskID platform.ID, exp
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
|
|
||||||
runs, err = r.ListRuns(context.Background(), platform.RunFilter{Task: &taskID})
|
runs, err = r.ListRuns(context.Background(), orgID, platform.RunFilter{Task: taskID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -443,7 +445,8 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
|
|
||||||
// Claim a task that starts later.
|
// Claim a task that starts later.
|
||||||
task := &backend.StoreTask{
|
task := &backend.StoreTask{
|
||||||
ID: platform.ID(1),
|
ID: platform.ID(1),
|
||||||
|
Org: 2,
|
||||||
}
|
}
|
||||||
meta := &backend.StoreTaskMeta{
|
meta := &backend.StoreTaskMeta{
|
||||||
MaxConcurrency: 99,
|
MaxConcurrency: 99,
|
||||||
|
@ -456,7 +459,7 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID}); err != backend.ErrRunNotFound {
|
if _, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID}); err != backend.ErrRunNotFound {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -466,7 +469,7 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runs, err := rl.ListRuns(context.Background(), platform.RunFilter{Task: &task.ID})
|
runs, err := rl.ListRuns(context.Background(), task.Org, platform.RunFilter{Task: task.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -484,7 +487,7 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pollForRunStatus(t, rl, task.ID, 1, 0, backend.RunSuccess.String())
|
pollForRunStatus(t, rl, task.ID, task.Org, 1, 0, backend.RunSuccess.String())
|
||||||
|
|
||||||
// Create a new run, but fail this time.
|
// Create a new run, but fail this time.
|
||||||
s.Tick(7)
|
s.Tick(7)
|
||||||
|
@ -493,7 +496,7 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pollForRunStatus(t, rl, task.ID, 2, 1, backend.RunStarted.String())
|
pollForRunStatus(t, rl, task.ID, task.Org, 2, 1, backend.RunStarted.String())
|
||||||
|
|
||||||
// Finish with failure to create the run.
|
// Finish with failure to create the run.
|
||||||
promises[0].Finish(nil, errors.New("forced failure"))
|
promises[0].Finish(nil, errors.New("forced failure"))
|
||||||
|
@ -501,7 +504,7 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pollForRunStatus(t, rl, task.ID, 2, 1, backend.RunFail.String())
|
pollForRunStatus(t, rl, task.ID, task.Org, 2, 1, backend.RunFail.String())
|
||||||
|
|
||||||
// Create a new run that starts but fails.
|
// Create a new run that starts but fails.
|
||||||
s.Tick(8)
|
s.Tick(8)
|
||||||
|
@ -510,12 +513,12 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pollForRunStatus(t, rl, task.ID, 3, 2, backend.RunStarted.String())
|
pollForRunStatus(t, rl, task.ID, task.Org, 3, 2, backend.RunStarted.String())
|
||||||
promises[0].Finish(mock.NewRunResult(errors.New("started but failed to finish properly"), false), nil)
|
promises[0].Finish(mock.NewRunResult(errors.New("started but failed to finish properly"), false), nil)
|
||||||
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
|
if _, err := e.PollForNumberRunning(task.ID, 0); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
pollForRunStatus(t, rl, task.ID, 3, 2, backend.RunFail.String())
|
pollForRunStatus(t, rl, task.ID, task.Org, 3, 2, backend.RunFail.String())
|
||||||
|
|
||||||
// One more run, but cancel this time.
|
// One more run, but cancel this time.
|
||||||
s.Tick(9)
|
s.Tick(9)
|
||||||
|
@ -524,7 +527,7 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pollForRunStatus(t, rl, task.ID, 4, 3, backend.RunStarted.String())
|
pollForRunStatus(t, rl, task.ID, task.Org, 4, 3, backend.RunStarted.String())
|
||||||
|
|
||||||
// Finish with failure.
|
// Finish with failure.
|
||||||
promises[0].Cancel()
|
promises[0].Cancel()
|
||||||
|
@ -532,7 +535,7 @@ func TestScheduler_RunLog(t *testing.T) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pollForRunStatus(t, rl, task.ID, 4, 3, backend.RunCanceled.String())
|
pollForRunStatus(t, rl, task.ID, task.Org, 4, 3, backend.RunCanceled.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScheduler_RunFailureCleanup(t *testing.T) {
|
func TestScheduler_RunFailureCleanup(t *testing.T) {
|
||||||
|
|
|
@ -329,6 +329,8 @@ type LogWriter interface {
|
||||||
// This is useful for test, but not much else.
|
// This is useful for test, but not much else.
|
||||||
type NopLogWriter struct{}
|
type NopLogWriter struct{}
|
||||||
|
|
||||||
|
var _ LogWriter = NopLogWriter{}
|
||||||
|
|
||||||
func (NopLogWriter) UpdateRunState(context.Context, RunLogBase, time.Time, RunStatus) error {
|
func (NopLogWriter) UpdateRunState(context.Context, RunLogBase, time.Time, RunStatus) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -340,21 +342,25 @@ func (NopLogWriter) AddRunLog(context.Context, RunLogBase, time.Time, string) er
|
||||||
// LogReader reads log information and log data from a store.
|
// LogReader reads log information and log data from a store.
|
||||||
type LogReader interface {
|
type LogReader interface {
|
||||||
// ListRuns returns a list of runs belonging to a task.
|
// ListRuns returns a list of runs belonging to a task.
|
||||||
ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error)
|
// orgID is necessary to look in the correct system bucket.
|
||||||
|
ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error)
|
||||||
|
|
||||||
// FindRunByID finds a run given a orgID and runID.
|
// FindRunByID finds a run given a orgID and runID.
|
||||||
// orgID is necessary to look in the correct system bucket.
|
// orgID is necessary to look in the correct system bucket.
|
||||||
FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)
|
FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error)
|
||||||
|
|
||||||
// ListLogs lists logs for a task or a specified run of a task.
|
// ListLogs lists logs for a task or a specified run of a task.
|
||||||
ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error)
|
// orgID is necessary to look in the correct system bucket.
|
||||||
|
ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NopLogWriter is a LogWriter that doesn't do anything when its methods are called.
|
// NopLogReader is a LogReader that doesn't do anything when its methods are called.
|
||||||
// This is useful for test, but not much else.
|
// This is useful for test, but not much else.
|
||||||
type NopLogReader struct{}
|
type NopLogReader struct{}
|
||||||
|
|
||||||
func (NopLogReader) ListRuns(ctx context.Context, runFilter platform.RunFilter) ([]*platform.Run, error) {
|
var _ LogReader = NopLogReader{}
|
||||||
|
|
||||||
|
func (NopLogReader) ListRuns(ctx context.Context, orgID platform.ID, runFilter platform.RunFilter) ([]*platform.Run, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -362,7 +368,7 @@ func (NopLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (NopLogReader) ListLogs(ctx context.Context, logFilter platform.LogFilter) ([]platform.Log, error) {
|
func (NopLogReader) ListLogs(ctx context.Context, orgID platform.ID, logFilter platform.LogFilter) ([]platform.Log, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -173,8 +173,11 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
|
|
||||||
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
|
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
|
||||||
|
|
||||||
if _, err := reader.ListRuns(ctx, platform.RunFilter{Task: &task.ID}); err == nil {
|
if _, err := reader.ListRuns(ctx, task.ID, platform.RunFilter{Task: task.ID}); err == nil {
|
||||||
t.Fatal("failed to error on bad id")
|
t.Fatal("failed to error on bad org id")
|
||||||
|
}
|
||||||
|
if _, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{Task: task.Org}); err == nil {
|
||||||
|
t.Fatal("failed to error on bad task id")
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
@ -201,13 +204,15 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := reader.ListRuns(ctx, platform.RunFilter{}); err == nil {
|
if _, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{}); err == nil {
|
||||||
t.Fatal("failed to error without any filter")
|
t.Fatal("failed to error with invalid task ID")
|
||||||
|
}
|
||||||
|
if _, err := reader.ListRuns(ctx, 0, platform.RunFilter{Task: task.ID}); err == nil {
|
||||||
|
t.Fatal("failed to error with invalid org ID")
|
||||||
}
|
}
|
||||||
|
|
||||||
listRuns, err := reader.ListRuns(ctx, platform.RunFilter{
|
listRuns, err := reader.ListRuns(ctx, task.Org, platform.RunFilter{
|
||||||
Task: &task.ID,
|
Task: task.ID,
|
||||||
Org: &task.Org,
|
|
||||||
Limit: 2 * nRuns,
|
Limit: 2 * nRuns,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -219,9 +224,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
const afterIDIdx = 20
|
const afterIDIdx = 20
|
||||||
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
|
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
|
||||||
Task: &task.ID,
|
Task: task.ID,
|
||||||
Org: &task.Org,
|
|
||||||
After: &runs[afterIDIdx].ID,
|
After: &runs[afterIDIdx].ID,
|
||||||
Limit: 2 * nRuns,
|
Limit: 2 * nRuns,
|
||||||
})
|
})
|
||||||
|
@ -233,9 +237,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)-(afterIDIdx+1))
|
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)-(afterIDIdx+1))
|
||||||
}
|
}
|
||||||
|
|
||||||
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
|
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
|
||||||
Task: &task.ID,
|
Task: task.ID,
|
||||||
Org: &task.Org,
|
|
||||||
Limit: 30,
|
Limit: 30,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -248,9 +251,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
|
|
||||||
const afterTimeIdx = 34
|
const afterTimeIdx = 34
|
||||||
scheduledFor, _ := time.Parse(time.RFC3339, runs[afterTimeIdx].ScheduledFor)
|
scheduledFor, _ := time.Parse(time.RFC3339, runs[afterTimeIdx].ScheduledFor)
|
||||||
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
|
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
|
||||||
Task: &task.ID,
|
Task: task.ID,
|
||||||
Org: &task.Org,
|
|
||||||
AfterTime: scheduledFor.Format(time.RFC3339),
|
AfterTime: scheduledFor.Format(time.RFC3339),
|
||||||
Limit: 2 * nRuns,
|
Limit: 2 * nRuns,
|
||||||
})
|
})
|
||||||
|
@ -264,9 +266,8 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
|
|
||||||
const beforeTimeIdx = 34
|
const beforeTimeIdx = 34
|
||||||
scheduledFor, _ = time.Parse(time.RFC3339, runs[beforeTimeIdx].ScheduledFor)
|
scheduledFor, _ = time.Parse(time.RFC3339, runs[beforeTimeIdx].ScheduledFor)
|
||||||
listRuns, err = reader.ListRuns(ctx, platform.RunFilter{
|
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
|
||||||
Task: &task.ID,
|
Task: task.ID,
|
||||||
Org: &task.Org,
|
|
||||||
BeforeTime: scheduledFor.Add(time.Millisecond).Format(time.RFC3339),
|
BeforeTime: scheduledFor.Add(time.Millisecond).Format(time.RFC3339),
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -348,11 +349,11 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
|
|
||||||
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
|
ctx := pcontext.SetAuthorizer(context.Background(), makeNewAuthorization())
|
||||||
|
|
||||||
if _, err := reader.ListLogs(ctx, platform.LogFilter{}); err == nil {
|
if _, err := reader.ListLogs(ctx, task.Org, platform.LogFilter{}); err == nil {
|
||||||
t.Fatal("failed to error with no filter")
|
t.Fatal("failed to error with missing task ID")
|
||||||
}
|
}
|
||||||
if _, err := reader.ListLogs(ctx, platform.LogFilter{Run: &task.ID}); err == nil {
|
if _, err := reader.ListLogs(ctx, 9999999, platform.LogFilter{Task: task.ID}); err == nil {
|
||||||
t.Fatal("failed to error with a non-run-ID")
|
t.Fatal("failed to error with an invalid org ID")
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now().UTC()
|
now := time.Now().UTC()
|
||||||
|
@ -381,7 +382,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
}
|
}
|
||||||
|
|
||||||
const targetRun = 4
|
const targetRun = 4
|
||||||
logs, err := reader.ListLogs(ctx, platform.LogFilter{Run: &runs[targetRun].ID, Org: &task.Org})
|
logs, err := reader.ListLogs(ctx, task.Org, platform.LogFilter{Task: task.ID, Run: &runs[targetRun].ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -394,7 +395,7 @@ func listLogsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
||||||
t.Fatalf("expected: %q, got: %q", fmtTimelog+": log4", string(logs[0]))
|
t.Fatalf("expected: %q, got: %q", fmtTimelog+": log4", string(logs[0]))
|
||||||
}
|
}
|
||||||
|
|
||||||
logs, err = reader.ListLogs(ctx, platform.LogFilter{Task: &task.ID, Org: &task.Org})
|
logs, err = reader.ListLogs(ctx, task.Org, platform.LogFilter{Task: task.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -230,7 +230,12 @@ func (p pAdapter) DeleteTask(ctx context.Context, id platform.ID) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
logs, err := p.r.ListLogs(ctx, filter)
|
task, err := p.s.FindTaskByID(ctx, filter.Task)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
logs, err := p.r.ListLogs(ctx, task.Org, filter)
|
||||||
logPointers := make([]*platform.Log, len(logs))
|
logPointers := make([]*platform.Log, len(logs))
|
||||||
for i := range logs {
|
for i := range logs {
|
||||||
logPointers[i] = &logs[i]
|
logPointers[i] = &logs[i]
|
||||||
|
@ -239,7 +244,12 @@ func (p pAdapter) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*p
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
func (p pAdapter) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
runs, err := p.r.ListRuns(ctx, filter)
|
task, err := p.s.FindTaskByID(ctx, filter.Task)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
runs, err := p.r.ListRuns(ctx, task.Org, filter)
|
||||||
return runs, len(runs), err
|
return runs, len(runs), err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -462,7 +462,7 @@ func testTaskRuns(t *testing.T, sys *System) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Limit 1 should only return the earlier run.
|
// Limit 1 should only return the earlier run.
|
||||||
runs, _, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &cr.OrgID, Task: &task.ID, Limit: 1})
|
runs, _, err := sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Task: task.ID, Limit: 1})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -483,7 +483,7 @@ func testTaskRuns(t *testing.T, sys *System) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unspecified limit returns both runs.
|
// Unspecified limit returns both runs.
|
||||||
runs, _, err = sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Org: &cr.OrgID, Task: &task.ID})
|
runs, _, err = sys.ts.FindRuns(sys.Ctx, platform.RunFilter{Task: task.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -745,9 +745,8 @@ func testTaskRuns(t *testing.T, sys *System) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure it is returned when filtering logs by run ID.
|
// Ensure it is returned when filtering logs by run ID.
|
||||||
logs, err := sys.LR.ListLogs(sys.Ctx, platform.LogFilter{
|
logs, err := sys.LR.ListLogs(sys.Ctx, cr.OrgID, platform.LogFilter{
|
||||||
Org: &cr.OrgID,
|
Task: task.ID,
|
||||||
Task: &task.ID,
|
|
||||||
Run: &rc1.Created.RunID,
|
Run: &rc1.Created.RunID,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -767,9 +766,8 @@ func testTaskRuns(t *testing.T, sys *System) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure both returned when filtering logs by task ID.
|
// Ensure both returned when filtering logs by task ID.
|
||||||
logs, err = sys.LR.ListLogs(sys.Ctx, platform.LogFilter{
|
logs, err = sys.LR.ListLogs(sys.Ctx, cr.OrgID, platform.LogFilter{
|
||||||
Org: &cr.OrgID,
|
Task: task.ID,
|
||||||
Task: &task.ID,
|
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
|
|
|
@ -35,6 +35,7 @@ func NewValidator(ts platform.TaskService, bs platform.BucketService) platform.T
|
||||||
preAuth: query.NewPreAuthorizer(bs),
|
preAuth: query.NewPreAuthorizer(bs),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
func (ts *taskServiceValidator) FindTaskByID(ctx context.Context, id platform.ID) (*platform.Task, error) {
|
||||||
task, err := ts.TaskService.FindTaskByID(ctx, id)
|
task, err := ts.TaskService.FindTaskByID(ctx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -128,33 +129,29 @@ func (ts *taskServiceValidator) DeleteTask(ctx context.Context, id platform.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
func (ts *taskServiceValidator) FindLogs(ctx context.Context, filter platform.LogFilter) ([]*platform.Log, int, error) {
|
||||||
if filter.Org != nil {
|
// Look up the task first, through the validator, to ensure we have permission to view the task.
|
||||||
perm, err := platform.NewPermission(platform.ReadAction, platform.TasksResourceType, *filter.Org)
|
if _, err := ts.FindTaskByID(ctx, filter.Task); err != nil {
|
||||||
if err != nil {
|
return nil, -1, err
|
||||||
return nil, -1, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := validatePermission(ctx, *perm); err != nil {
|
|
||||||
return nil, -1, err
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(lyon): If the user no longer has permission to the organization we might fail or filter here?
|
// If we can find the task, we can read its logs.
|
||||||
return ts.TaskService.FindLogs(ctx, filter)
|
return ts.TaskService.FindLogs(ctx, filter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
func (ts *taskServiceValidator) FindRuns(ctx context.Context, filter platform.RunFilter) ([]*platform.Run, int, error) {
|
||||||
if filter.Org != nil {
|
// Look up the task first, through the validator, to ensure we have permission to view the task.
|
||||||
perm, err := platform.NewPermission(platform.ReadAction, platform.TasksResourceType, *filter.Org)
|
task, err := ts.FindTaskByID(ctx, filter.Task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, -1, err
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := validatePermission(ctx, *perm); err != nil {
|
perm, err := platform.NewPermission(platform.ReadAction, platform.TasksResourceType, task.OrganizationID)
|
||||||
return nil, -1, err
|
if err != nil {
|
||||||
}
|
return nil, -1, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := validatePermission(ctx, *perm); err != nil {
|
||||||
|
return nil, -1, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(lyon): If the user no longer has permission to the organization we might fail or filter here?
|
// TODO(lyon): If the user no longer has permission to the organization we might fail or filter here?
|
||||||
|
|
|
@ -323,7 +323,7 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")`
|
||||||
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.OrgsResourceType, OrgID: &taskID}}}},
|
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.OrgsResourceType, OrgID: &taskID}}}},
|
||||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||||
_, _, err := svc.FindLogs(ctx, influxdb.LogFilter{
|
_, _, err := svc.FindLogs(ctx, influxdb.LogFilter{
|
||||||
Org: &orgID,
|
Task: taskID,
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return errors.New("returned no error with a invalid auth")
|
return errors.New("returned no error with a invalid auth")
|
||||||
|
@ -336,25 +336,17 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")`
|
||||||
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.TasksResourceType, OrgID: &orgID}}}},
|
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.TasksResourceType, OrgID: &orgID}}}},
|
||||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||||
_, _, err := svc.FindLogs(ctx, influxdb.LogFilter{
|
_, _, err := svc.FindLogs(ctx, influxdb.LogFilter{
|
||||||
Org: &orgID,
|
Task: taskID,
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "FindLogs without org",
|
|
||||||
auth: &influxdb.Authorization{Status: "active"},
|
|
||||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
|
||||||
_, _, err := svc.FindLogs(ctx, influxdb.LogFilter{})
|
|
||||||
return err
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "FindRuns with bad auth",
|
name: "FindRuns with bad auth",
|
||||||
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.OrgsResourceType, OrgID: &taskID}}}},
|
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.OrgsResourceType, OrgID: &taskID}}}},
|
||||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||||
_, _, err := svc.FindRuns(ctx, influxdb.RunFilter{
|
_, _, err := svc.FindRuns(ctx, influxdb.RunFilter{
|
||||||
Org: &orgID,
|
Task: taskID,
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return errors.New("returned no error with a invalid auth")
|
return errors.New("returned no error with a invalid auth")
|
||||||
|
@ -367,19 +359,11 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")`
|
||||||
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.TasksResourceType, OrgID: &orgID}}}},
|
auth: &influxdb.Authorization{Status: "active", Permissions: []influxdb.Permission{influxdb.Permission{Action: influxdb.ReadAction, Resource: influxdb.Resource{Type: influxdb.TasksResourceType, OrgID: &orgID}}}},
|
||||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
||||||
_, _, err := svc.FindRuns(ctx, influxdb.RunFilter{
|
_, _, err := svc.FindRuns(ctx, influxdb.RunFilter{
|
||||||
Org: &orgID,
|
Task: taskID,
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "FindRuns without org",
|
|
||||||
auth: &influxdb.Authorization{Status: "active"},
|
|
||||||
check: func(ctx context.Context, svc influxdb.TaskService) error {
|
|
||||||
_, _, err := svc.FindRuns(ctx, influxdb.RunFilter{})
|
|
||||||
return err
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
name: "FindRunByID missing auth",
|
name: "FindRunByID missing auth",
|
||||||
auth: &influxdb.Authorization{Permissions: []influxdb.Permission{}},
|
auth: &influxdb.Authorization{Permissions: []influxdb.Permission{}},
|
||||||
|
@ -469,5 +453,4 @@ from(bucket:"cows") |> range(start:-5m) |> to(bucket:"cows", org:"thing")`
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue