Fix requested at for task runs (#12559)
* Fix requested at for task runs * add short delay to allow storage propigationpull/12611/head
parent
1fc483f321
commit
69db8099d4
|
@ -104,7 +104,13 @@ func (qlr *QueryLogReader) ListRuns(ctx context.Context, orgID platform.ID, runF
|
|||
scheduledBefore = runFilter.BeforeTime
|
||||
}
|
||||
|
||||
listScript := fmt.Sprintf(`
|
||||
// Because flux doesnt support piviting on a rowkey that might not exist we need first check if we can pivot with "requestedAt"
|
||||
// and if that fails we can fall back to pivot without "requestedAt"
|
||||
// TODO(lh): After we transition to a seperation of transactional and analytical stores this can be simplified.
|
||||
pivotWithRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor", "requestedAt"], columnKey: ["status"], valueColumn: "_time")`
|
||||
pivotWithOutRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")`
|
||||
|
||||
listFmtString := `
|
||||
import "influxdata/influxdb/v1"
|
||||
|
||||
from(bucketID: "000000000000000a")
|
||||
|
@ -114,9 +120,10 @@ from(bucketID: "000000000000000a")
|
|||
|> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"])
|
||||
|> v1.fieldsAsCols()
|
||||
|> filter(fn: (r) => r.scheduledFor < %q and r.scheduledFor > %q and r.runID > %q)
|
||||
|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")
|
||||
%s
|
||||
`, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, limit)
|
||||
%s
|
||||
`
|
||||
listScript := fmt.Sprintf(listFmtString, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, pivotWithRequestedAt, limit)
|
||||
|
||||
auth, err := pctx.GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
|
@ -134,14 +141,28 @@ from(bucketID: "000000000000000a")
|
|||
|
||||
runs, err := queryIttrToRuns(ittr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// try re running the script without the requested at
|
||||
listScript := fmt.Sprintf(listFmtString, runFilter.Task.String(), scheduledBefore, scheduledAfter, afterID, pivotWithOutRequestedAt, limit)
|
||||
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: listScript}}
|
||||
|
||||
ittr, err := qlr.queryService.Query(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
runs, err = queryIttrToRuns(ittr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return runs, nil
|
||||
}
|
||||
|
||||
func (qlr *QueryLogReader) FindRunByID(ctx context.Context, orgID, runID platform.ID) (*platform.Run, error) {
|
||||
showScript := fmt.Sprintf(`
|
||||
pivotWithRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor", "requestedAt"], columnKey: ["status"], valueColumn: "_time")`
|
||||
pivotWithOutRequestedAt := `|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")`
|
||||
|
||||
showFmtScript := `
|
||||
import "influxdata/influxdb/v1"
|
||||
|
||||
logs = from(bucketID: "000000000000000a")
|
||||
|
@ -159,9 +180,10 @@ from(bucketID: "000000000000000a")
|
|||
|> group(columns: ["_measurement", "taskID", "scheduledFor", "status", "runID"])
|
||||
|> v1.fieldsAsCols()
|
||||
|> filter(fn: (r) => r.runID == %q)
|
||||
|> pivot(rowKey:["runID", "scheduledFor"], columnKey: ["status"], valueColumn: "_time")
|
||||
%s
|
||||
|> yield(name: "result")
|
||||
`, runID.String(), runID.String())
|
||||
`
|
||||
showScript := fmt.Sprintf(showFmtScript, runID.String(), runID.String(), pivotWithRequestedAt)
|
||||
|
||||
auth, err := pctx.GetAuthorizer(ctx)
|
||||
if err != nil {
|
||||
|
@ -178,7 +200,18 @@ from(bucketID: "000000000000000a")
|
|||
}
|
||||
runs, err := queryIttrToRuns(ittr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
showScript := fmt.Sprintf(showFmtScript, runID.String(), runID.String(), pivotWithOutRequestedAt)
|
||||
request := &query.Request{Authorization: auth.(*platform.Authorization), OrganizationID: orgID, Compiler: lang.FluxCompiler{Query: showScript}}
|
||||
|
||||
ittr, err := qlr.queryService.Query(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
}
|
||||
runs, err = queryIttrToRuns(ittr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if len(runs) == 0 {
|
||||
return nil, ErrRunNotFound
|
||||
|
|
|
@ -306,6 +306,37 @@ func listRunsTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc)
|
|||
if len(listRuns) != beforeTimeIdx {
|
||||
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), beforeTimeIdx)
|
||||
}
|
||||
|
||||
// add a run and now list again but this time with a requested at
|
||||
scheduledFor = now.Add(time.Duration(-2*(nRuns-len(runs))) * time.Second)
|
||||
run := platform.Run{
|
||||
ID: platform.ID(len(runs) + 1),
|
||||
Status: "started",
|
||||
ScheduledFor: scheduledFor.Format(time.RFC3339),
|
||||
RequestedAt: scheduledFor.Format(time.RFC3339),
|
||||
}
|
||||
runs = append(runs, run)
|
||||
rlb := backend.RunLogBase{
|
||||
Task: task,
|
||||
RunID: run.ID,
|
||||
RunScheduledFor: scheduledFor.Unix(),
|
||||
RequestedAt: scheduledFor.Unix(),
|
||||
}
|
||||
|
||||
if err := writer.UpdateRunState(ctx, rlb, scheduledFor.Add(time.Second), backend.RunStarted); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
listRuns, err = reader.ListRuns(ctx, task.Org, platform.RunFilter{
|
||||
Task: task.ID,
|
||||
Limit: 2 * nRuns,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if len(listRuns) != len(runs) {
|
||||
t.Fatalf("retrieved: %d, expected: %d", len(listRuns), len(runs))
|
||||
}
|
||||
}
|
||||
|
||||
func findRunByIDTest(t *testing.T, crf CreateRunStoreFunc, drf DestroyRunStoreFunc) {
|
||||
|
|
Loading…
Reference in New Issue