diff --git a/task/backend/query_logreader.go b/task/backend/query_logreader.go index b07d7b6026..29cf35dc90 100644 --- a/task/backend/query_logreader.go +++ b/task/backend/query_logreader.go @@ -104,7 +104,13 @@ func (qlr *QueryLogReader) ListRuns(ctx context.Context, orgID platform.ID, runF scheduledBefore = runFilter.BeforeTime } - listScript := fmt.Sprintf(` + // Because flux doesnt support piviting on a rowkey that might not exist we need first check if we can pivot with "requestedAt" + // and if that fails we can fall back to pivot without "requestedAt" + // TODO(lh): After we transition to a seperation of transactional and analytical stores this can be simplified. + pivotWithRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor", "requestedAt"], columnKey: ["status"], valueColumn: "_time")` + pivotWithOutRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")` + + listFmtString := ` import "influxdata/influxdb/v1" from(bucketID: "000000000000000a") @@ -114,9 +120,10 @@ from(bucketID: "000000000000000a") |> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"]) |> v1.fieldsAsCols() |> filter(fn: (r) => r.scheduledFor < %q and r.scheduledFor > %q and r.runID > %q) - |> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time") %s - `, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, limit) + %s + ` + listScript := fmt.Sprintf(listFmtString, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, pivotWithRequestedAt, limit) auth, err := pctx.GetAuthorizer(ctx) if err != nil { @@ -134,14 +141,28 @@ from(bucketID: "000000000000000a") runs, err := queryIttrToRuns(ittr) if err != nil { - return nil, err + // try re running the script without the requested at + listScript := fmt.Sprintf(listFmtString, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, pivotWithOutRequestedAt, limit) + request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}} + + ittr, err := qlr.queryService.Query(ctx, request) + if err != nil { + return nil, err + } + runs, err = queryIttrToRuns(ittr) + if err != nil { + return nil, err + } } return runs, nil } func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) { - showScript := fmt.Sprintf(` + pivotWithRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor", "requestedAt"], columnKey: ["status"], valueColumn: "_time")` + pivotWithOutRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")` + + showFmtScript := ` import "influxdata/influxdb/v1" logs = from(bucketID: "000000000000000a") @@ -159,9 +180,10 @@ from(bucketID: "000000000000000a") |> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"]) |> v1.fieldsAsCols() |> filter(fn: (r) => r.runID == %q) - |> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time") + %s |> yield(name: "result") - `, runID.String(), runID.String()) + ` + showScript := fmt.Sprintf(showFmtScript, runID.String(), runID.String(), pivotWithRequestedAt) auth, err := pctx.GetAuthorizer(ctx) if err != nil { @@ -178,7 +200,18 @@ from(bucketID: "000000000000000a") } runs, err := queryIttrToRuns(ittr) if err != nil { - return nil, err + showScript := fmt.Sprintf(showFmtScript, runID.String(), runID.String(), pivotWithOutRequestedAt) + request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: showScript}} + + ittr, err := qlr.queryService.Query(ctx, request) + if err != nil { + return nil, err + + } + runs, err = queryIttrToRuns(ittr) + if err != nil { + return nil, err + } } if len(runs) == 0 { return nil, ErrRunNotFound diff --git a/task/backend/storetest/logstoretest.go b/task/backend/storetest/logstoretest.go index e6c3f6f786..2d246843d6 100644 --- a/task/backend/storetest/logstoretest.go +++ b/task/backend/storetest/logstoretest.go @@ -306,6 +306,37 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) if len(listRuns) != beforeTimeIdx { t.Fatalf("retrieved: %d, expected: %d", len(listRuns), beforeTimeIdx) } + + // add a run and now list again but this time with a requested at + scheduledFor = now.Add(time.Duration(-2*(nRuns-len(runs))) * time.Second) + run := platform.Run{ + ID: platform.ID(len(runs) + 1), + Status: "started", + ScheduledFor: scheduledFor.Format(time.RFC3339), + RequestedAt: scheduledFor.Format(time.RFC3339), + } + runs = append(runs, run) + rlb := backend.RunLogBase{ + Task: task, + RunID: run.ID, + RunScheduledFor: scheduledFor.Unix(), + RequestedAt: scheduledFor.Unix(), + } + + if err := writer.UpdateRunState(ctx, rlb, scheduledFor.Add(time.Second), backend.RunStarted); err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{ + Task: task.ID, + Limit: 2 * nRuns, + }) + if err != nil { + t.Fatal(err) + } + if len(listRuns) != len(runs) { + t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs)) + } } func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {