chore(task): add test around paging using after (#16774)
* chore(task): add test around paging using after * fix(kv): support filter.After in FindTasksByUserpull/16778/head
parent
06102ba3d4
commit
bc64968b0b
33
kv/task.go
33
kv/task.go
|
@ -311,7 +311,21 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
|
|||
return nil, 0, err
|
||||
}
|
||||
|
||||
matchFn := newTaskMatchFn(filter, org)
|
||||
var (
|
||||
afterSeen bool
|
||||
after = func(task *influxdb.Task) bool {
|
||||
if filter.After == nil || afterSeen {
|
||||
return true
|
||||
}
|
||||
|
||||
if task.ID == *filter.After {
|
||||
afterSeen = true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
matchFn = newTaskMatchFn(filter, org)
|
||||
)
|
||||
|
||||
for _, m := range maps {
|
||||
task, err := s.findTaskByIDWithAuth(ctx, tx, m.ResourceID)
|
||||
|
@ -323,6 +337,10 @@ func (s *Service) findTasksByUser(ctx context.Context, tx Tx, filter influxdb.Ta
|
|||
}
|
||||
|
||||
if matchFn == nil || matchFn(task) {
|
||||
if !after(task) {
|
||||
continue
|
||||
}
|
||||
|
||||
ts = append(ts, task)
|
||||
|
||||
if len(ts) >= filter.Limit {
|
||||
|
@ -389,6 +407,9 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
|
|||
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||
}
|
||||
|
||||
// free cursor resources
|
||||
defer c.Close()
|
||||
|
||||
matchFn := newTaskMatchFn(filter, nil)
|
||||
|
||||
for k, v := c.Next(); k != nil; k, v = c.Next() {
|
||||
|
@ -401,6 +422,7 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
|
|||
if err != nil {
|
||||
if err == influxdb.ErrTaskNotFound {
|
||||
// we might have some crufty index's
|
||||
err = nil
|
||||
continue
|
||||
}
|
||||
return nil, 0, err
|
||||
|
@ -420,7 +442,7 @@ func (s *Service) findTasksByOrg(ctx context.Context, tx Tx, filter influxdb.Tas
|
|||
}
|
||||
}
|
||||
|
||||
return ts, len(ts), err
|
||||
return ts, len(ts), c.Err()
|
||||
}
|
||||
|
||||
type taskMatchFn func(*influxdb.Task) bool
|
||||
|
@ -500,6 +522,9 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
|
|||
return nil, 0, influxdb.ErrUnexpectedTaskBucketErr(err)
|
||||
}
|
||||
|
||||
// free cursor resources
|
||||
defer c.Close()
|
||||
|
||||
matchFn := newTaskMatchFn(filter, nil)
|
||||
|
||||
for k, v := c.Next(); k != nil; k, v = c.Next() {
|
||||
|
@ -519,6 +544,10 @@ func (s *Service) findAllTasks(ctx context.Context, tx Tx, filter influxdb.TaskF
|
|||
}
|
||||
}
|
||||
|
||||
if err := c.Err(); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return ts, len(ts), err
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -60,6 +61,10 @@ func TestTaskService(t *testing.T, fn BackendComponentFactory, testCategory ...s
|
|||
testTaskFindTasksPaging(t, sys)
|
||||
})
|
||||
|
||||
t.Run("FindTasks after paging", func(t *testing.T) {
|
||||
testTaskFindTasksAfterPaging(t, sys)
|
||||
})
|
||||
|
||||
t.Run("Task Update Options Full", func(t *testing.T) {
|
||||
t.Parallel()
|
||||
testTaskOptionsUpdateFull(t, sys)
|
||||
|
@ -528,6 +533,85 @@ from(bucket: "b")
|
|||
}
|
||||
}
|
||||
|
||||
func testTaskFindTasksAfterPaging(t *testing.T, sys *System) {
|
||||
var (
|
||||
script = `option task = {
|
||||
name: "some-unique-task-name",
|
||||
cron: "* * * * *",
|
||||
concurrency: 100,
|
||||
offset: 10s,
|
||||
}
|
||||
|
||||
from(bucket: "b")
|
||||
|> to(bucket: "two", orgID: "000000000000000")`
|
||||
cr = creds(t, sys)
|
||||
tc = influxdb.TaskCreate{
|
||||
OrganizationID: cr.OrgID,
|
||||
OwnerID: cr.UserID,
|
||||
Type: influxdb.TaskSystemType,
|
||||
Flux: script,
|
||||
}
|
||||
authorizedCtx = icontext.SetAuthorizer(sys.Ctx, cr.Authorizer())
|
||||
created = make([]*influxdb.Task, 10)
|
||||
taskName = "some-unique-task-name"
|
||||
)
|
||||
|
||||
for i := 0; i < len(created); i++ {
|
||||
tsk, err := sys.TaskService.CreateTask(authorizedCtx, tc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !tsk.ID.Valid() {
|
||||
t.Fatal("no task ID set")
|
||||
}
|
||||
|
||||
created[i] = tsk
|
||||
}
|
||||
|
||||
var (
|
||||
expected = [][]influxdb.ID{
|
||||
{created[0].ID, created[1].ID},
|
||||
{created[2].ID, created[3].ID},
|
||||
{created[4].ID, created[5].ID},
|
||||
{created[6].ID, created[7].ID},
|
||||
{created[8].ID, created[9].ID},
|
||||
// last page should be empty
|
||||
nil,
|
||||
}
|
||||
found = make([][]influxdb.ID, 0, 6)
|
||||
after *influxdb.ID
|
||||
)
|
||||
|
||||
// one more than expected pages
|
||||
for i := 0; i < 6; i++ {
|
||||
tasks, _, err := sys.TaskService.FindTasks(sys.Ctx, influxdb.TaskFilter{
|
||||
Limit: 2,
|
||||
After: after,
|
||||
Name: &taskName,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("FindTasks: %v", err)
|
||||
}
|
||||
|
||||
var page []influxdb.ID
|
||||
for _, task := range tasks {
|
||||
page = append(page, task.ID)
|
||||
}
|
||||
|
||||
found = append(found, page)
|
||||
|
||||
if len(tasks) == 0 {
|
||||
break
|
||||
}
|
||||
|
||||
after = &tasks[len(tasks)-1].ID
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(expected, found) {
|
||||
t.Errorf("expected %#v, found %#v", expected, found)
|
||||
}
|
||||
}
|
||||
|
||||
//Create a new task with a Cron and Offset option
|
||||
//Update the task to remove the Offset option, and change Cron to Every
|
||||
//Retrieve the task again to ensure the options are now Every, without Cron or Offset
|
||||
|
|
Loading…
Reference in New Issue