chore(tasks): update run limits in swagger and FindRuns
parent
f35e9caf7e
commit
0ae2d318b1
|
@ -4349,8 +4349,8 @@ paths:
|
||||||
schema:
|
schema:
|
||||||
type: integer
|
type: integer
|
||||||
minimum: 1
|
minimum: 1
|
||||||
maximum: 100
|
maximum: 500
|
||||||
default: 20
|
default: 100
|
||||||
description: the number of runs to return
|
description: the number of runs to return
|
||||||
- in: query
|
- in: query
|
||||||
name: afterTime
|
name: afterTime
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/influxdata/flux/lang"
|
"github.com/influxdata/flux/lang"
|
||||||
|
influxdb "github.com/influxdata/influxdb"
|
||||||
platform "github.com/influxdata/influxdb"
|
platform "github.com/influxdata/influxdb"
|
||||||
"github.com/influxdata/influxdb/authorizer"
|
"github.com/influxdata/influxdb/authorizer"
|
||||||
pcontext "github.com/influxdata/influxdb/context"
|
pcontext "github.com/influxdata/influxdb/context"
|
||||||
|
@ -939,13 +940,9 @@ func decodeGetRunsRequest(ctx context.Context, r *http.Request) (*getRunsRequest
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if i < 1 || i > 100 {
|
if i < 1 || i > influxdb.TaskMaxPageSize {
|
||||||
return nil, &platform.Error{
|
return nil, backend.ErrOutOfBoundsLimit
|
||||||
Code: platform.EUnprocessableEntity,
|
|
||||||
Msg: "limit must be between 1 and 100",
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
req.filter.Limit = i
|
req.filter.Limit = i
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1646,9 +1643,12 @@ func (t TaskService) FindRuns(ctx context.Context, filter platform.RunFilter) ([
|
||||||
if filter.After != nil {
|
if filter.After != nil {
|
||||||
val.Set("after", filter.After.String())
|
val.Set("after", filter.After.String())
|
||||||
}
|
}
|
||||||
if filter.Limit > 0 {
|
|
||||||
val.Set("limit", strconv.Itoa(filter.Limit))
|
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
||||||
|
return nil, 0, backend.ErrOutOfBoundsLimit
|
||||||
}
|
}
|
||||||
|
val.Set("limit", strconv.Itoa(filter.Limit))
|
||||||
|
|
||||||
u.RawQuery = val.Encode()
|
u.RawQuery = val.Encode()
|
||||||
req, err := http.NewRequest("GET", u.String(), nil)
|
req, err := http.NewRequest("GET", u.String(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -862,9 +862,14 @@ func (s *Service) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*i
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
|
func (s *Service) findRuns(ctx context.Context, tx Tx, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
|
||||||
if filter.Limit == 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
if filter.Limit == 0 {
|
||||||
filter.Limit = influxdb.TaskMaxPageSize
|
filter.Limit = influxdb.TaskDefaultPageSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
||||||
|
return nil, 0, backend.ErrOutOfBoundsLimit
|
||||||
|
}
|
||||||
|
|
||||||
var runs []*influxdb.Run
|
var runs []*influxdb.Run
|
||||||
// manual runs
|
// manual runs
|
||||||
manualRuns, err := s.manualRuns(ctx, tx, filter.Task)
|
manualRuns, err := s.manualRuns(ctx, tx, filter.Task)
|
||||||
|
|
|
@ -41,6 +41,12 @@ var (
|
||||||
ErrRunNotFound = errors.New("run not found")
|
ErrRunNotFound = errors.New("run not found")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrOutOfBoundsLimit is returned with FindRuns is called with an invalid filter limit.
|
||||||
|
var ErrOutOfBoundsLimit = &platform.Error{
|
||||||
|
Code: platform.EUnprocessableEntity,
|
||||||
|
Msg: "run limit is out of bounds, must be between 1 and 500",
|
||||||
|
}
|
||||||
|
|
||||||
// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware
|
// NewAnalyticalStorage creates a new analytical store with access to the necessary systems for storing data and to act as a middleware
|
||||||
func NewAnalyticalStorage(ts influxdb.TaskService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage {
|
func NewAnalyticalStorage(ts influxdb.TaskService, tcs TaskControlService, pw storage.PointsWriter, qs query.QueryService) *AnalyticalStorage {
|
||||||
return &AnalyticalStorage{
|
return &AnalyticalStorage{
|
||||||
|
@ -142,8 +148,12 @@ func (as *AnalyticalStorage) FindLogs(ctx context.Context, filter influxdb.LogFi
|
||||||
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
|
// FindRuns returns a list of runs that match a filter and the total count of returned runs.
|
||||||
// First attempt to use the TaskService, then append additional analytical's runs to the list
|
// First attempt to use the TaskService, then append additional analytical's runs to the list
|
||||||
func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
|
func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFilter) ([]*influxdb.Run, int, error) {
|
||||||
if filter.Limit == 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
if filter.Limit == 0 {
|
||||||
filter.Limit = influxdb.TaskMaxPageSize
|
filter.Limit = influxdb.TaskDefaultPageSize
|
||||||
|
}
|
||||||
|
|
||||||
|
if filter.Limit < 0 || filter.Limit > influxdb.TaskMaxPageSize {
|
||||||
|
return nil, 0, ErrOutOfBoundsLimit
|
||||||
}
|
}
|
||||||
|
|
||||||
runs, n, err := as.TaskService.FindRuns(ctx, filter)
|
runs, n, err := as.TaskService.FindRuns(ctx, filter)
|
||||||
|
|
|
@ -602,6 +602,17 @@ func testTaskRuns(t *testing.T, sys *System) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check run filter errors
|
||||||
|
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
|
||||||
|
if err0 != backend.ErrOutOfBoundsLimit {
|
||||||
|
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
|
||||||
|
if err1 != backend.ErrOutOfBoundsLimit {
|
||||||
|
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
|
||||||
|
}
|
||||||
|
|
||||||
requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make two runs.
|
requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make two runs.
|
||||||
|
|
||||||
rc0, err := sys.TaskControlService.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix)
|
rc0, err := sys.TaskControlService.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix)
|
||||||
|
@ -1033,6 +1044,17 @@ func testRunStorage(t *testing.T, sys *System) {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check run filter errors
|
||||||
|
_, _, err0 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: -1})
|
||||||
|
if err0 != backend.ErrOutOfBoundsLimit {
|
||||||
|
t.Fatalf("failed to error with out of bounds run limit: %d", -1)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, _, err1 := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: influxdb.TaskMaxPageSize + 1})
|
||||||
|
if err1 != backend.ErrOutOfBoundsLimit {
|
||||||
|
t.Fatalf("failed to error with out of bounds run limit: %d", influxdb.TaskMaxPageSize+1)
|
||||||
|
}
|
||||||
|
|
||||||
requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make two runs.
|
requestedAtUnix := time.Now().Add(5 * time.Minute).UTC().Unix() // This should guarantee we can make two runs.
|
||||||
|
|
||||||
rc0, err := sys.TaskControlService.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix)
|
rc0, err := sys.TaskControlService.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix)
|
||||||
|
|
Loading…
Reference in New Issue