Merge pull request #14569 from influxdata/tasks/fix-run-limit-api

fix(tasks): add run limit check to analytical store
pull/14604/head
Alirie Gray 2019-08-12 09:06:32 -07:00 committed by GitHub
commit aafb9e5b5e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 39 additions and 10 deletions

View File

@ -11,6 +11,7 @@
1. [14480](https://github.com/influxdata/influxdb/pull/14480): Fix authentication when updating a task with invalid org or bucket.
1. [14497](https://github.com/influxdata/influxdb/pull/14497): Update the documentation link for Telegraf.
1. [14492](https://github.com/influxdata/influxdb/pull/14492): Fix to surface errors properly as task notifications on create.
1. [14569](https://github.com/influxdata/influxdb/pull/14569): Fix limiting of get runs for task.
## v2.0.0-alpha.16 [2019-07-25]

View File

@ -180,8 +180,9 @@ func (as *AnalyticalStorage) FindRuns(ctx context.Context, filter influxdb.RunFi
%s
|> group(columns: ["taskID"])
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> limit(n:%d)
`, filter.Task.String(), filterPart)
`, filter.Task.String(), filterPart, filter.Limit-len(runs))
// At this point we are behind authorization
// so we are faking a read only permission to the org's system bucket

View File

@ -1078,7 +1078,7 @@ func testRunStorage(t *testing.T, sys *System) {
t.Fatalf("wrong task ID on created task: got %s, want %s", rc0.Created.TaskID, task.ID)
}
startedAt := time.Now().UTC()
startedAt := time.Now().UTC().Add(time.Second * -10)
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc0.Created.RunID, startedAt, backend.RunStarted); err != nil {
@ -1094,12 +1094,12 @@ func testRunStorage(t *testing.T, sys *System) {
}
// Update the run state to Started; normally the scheduler would do this.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.Created.RunID, startedAt, backend.RunStarted); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.Created.RunID, startedAt.Add(time.Second), backend.RunStarted); err != nil {
t.Fatal(err)
}
// Mark the second run finished.
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.Created.RunID, startedAt.Add(time.Second), backend.RunSuccess); err != nil {
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc1.Created.RunID, startedAt.Add(time.Second*2), backend.RunSuccess); err != nil {
t.Fatal(err)
}
@ -1128,13 +1128,40 @@ func testRunStorage(t *testing.T, sys *System) {
t.Fatalf("expected empty FinishedAt, got %q", runs[0].FinishedAt)
}
// Unspecified limit returns both runs.
// Create 3rd run and test limiting to 2 runs
rc2, err := sys.TaskControlService.CreateNextRun(sys.Ctx, task.ID, requestedAtUnix)
if err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.Created.RunID, startedAt.Add(time.Second*3), backend.RunStarted); err != nil {
t.Fatal(err)
}
if err := sys.TaskControlService.UpdateRunState(sys.Ctx, task.ID, rc2.Created.RunID, startedAt.Add(time.Second*4), backend.RunSuccess); err != nil {
t.Fatal(err)
}
if _, err := sys.TaskControlService.FinishRun(sys.Ctx, task.ID, rc2.Created.RunID); err != nil {
t.Fatal(err)
}
runs2, _, err := sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID, Limit: 2})
if err != nil {
t.Fatal(err)
}
if len(runs2) != 2 {
t.Fatalf("expected 2 runs, got %v", runs2)
}
if runs2[0].ID != rc0.Created.RunID {
t.Fatalf("retrieved wrong run ID; want %s, got %s", rc0.Created.RunID, runs[0].ID)
}
// Unspecified limit returns all three runs.
runs, _, err = sys.TaskService.FindRuns(sys.Ctx, influxdb.RunFilter{Task: task.ID})
if err != nil {
t.Fatal(err)
}
if len(runs) != 2 {
t.Fatalf("expected 2 runs, got %v", runs)
if len(runs) != 3 {
t.Fatalf("expected 3 runs, got %v", runs)
}
if runs[0].ID != rc0.Created.RunID {
t.Fatalf("retrieved wrong run ID; want %s, got %s", rc0.Created.RunID, runs[0].ID)
@ -1152,13 +1179,13 @@ func testRunStorage(t *testing.T, sys *System) {
if runs[1].ID != rc1.Created.RunID {
t.Fatalf("retrieved wrong run ID; want %s, got %s", rc1.Created.RunID, runs[1].ID)
}
if runs[1].StartedAt != runs[0].StartedAt {
t.Fatalf("unexpected StartedAt; want %s, got %s", runs[0].StartedAt, runs[1].StartedAt)
if runs[1].StartedAt != startedAt.Add(time.Second).Format(time.RFC3339Nano) {
t.Fatalf("unexpected StartedAt; want %s, got %s", runs[0].StartedAt, startedAt.Add(time.Second))
}
if runs[1].Status != backend.RunSuccess.String() {
t.Fatalf("unexpected run status; want %s, got %s", backend.RunSuccess.String(), runs[0].Status)
}
if exp := startedAt.Add(time.Second).Format(time.RFC3339Nano); runs[1].FinishedAt != exp {
if exp := startedAt.Add(time.Second * 2).Format(time.RFC3339Nano); runs[1].FinishedAt != exp {
t.Fatalf("unexpected FinishedAt; want %s, got %s", exp, runs[1].FinishedAt)
}