test: use experimental.chain in flux e2e tests to allow a single pass (#18898)
Force the writing of data and running of the test to happen sequentially. As the results come out, collect them and report an error only if the diff results are not empty.pull/19039/head
parent
88cdf43db1
commit
c693f0b080
|
@ -8,7 +8,6 @@ import (
|
|||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/ast"
|
||||
"github.com/influxdata/flux/execute"
|
||||
"github.com/influxdata/flux/lang"
|
||||
|
@ -155,12 +154,15 @@ func makeTestPackage(file *ast.File) *ast.Package {
|
|||
var optionsSource = `
|
||||
import "testing"
|
||||
import c "csv"
|
||||
import "experimental"
|
||||
|
||||
// Options bucket and org are defined dynamically per test
|
||||
|
||||
option testing.loadStorage = (csv) => {
|
||||
c.from(csv: csv) |> to(bucket: bucket, org: org)
|
||||
return from(bucket: bucket)
|
||||
return experimental.chain(
|
||||
first: c.from(csv: csv) |> to(bucket: bucket, org: org),
|
||||
second: from(bucket:bucket)
|
||||
)
|
||||
}
|
||||
`
|
||||
var optionsAST *ast.File
|
||||
|
@ -175,8 +177,6 @@ func init() {
|
|||
|
||||
func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
||||
|
||||
// Query server to ensure write persists.
|
||||
|
||||
b := &platform.Bucket{
|
||||
OrgID: l.Org.ID,
|
||||
Name: t.Name(),
|
||||
|
@ -208,7 +208,7 @@ func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
|||
pkg := makeTestPackage(file)
|
||||
pkg.Files = append(pkg.Files, options)
|
||||
|
||||
// Add testing.inspect call to ensure the data is loaded
|
||||
// Use testing.inspect call to get all of diff, want, and got
|
||||
inspectCalls := stdlib.TestingInspectCalls(pkg)
|
||||
pkg.Files = append(pkg.Files, inspectCalls)
|
||||
|
||||
|
@ -221,85 +221,19 @@ func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
|||
OrganizationID: l.Org.ID,
|
||||
Compiler: lang.ASTCompiler{AST: bs},
|
||||
}
|
||||
|
||||
if r, err := l.FluxQueryService().Query(ctx, req); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
results := make( map[string]*bytes.Buffer )
|
||||
|
||||
for r.More() {
|
||||
v := r.Next()
|
||||
if err := v.Tables().Do(func(tbl flux.Table) error {
|
||||
return tbl.Do(func(reader flux.ColReader) error {
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
t.Error(err)
|
||||
|
||||
if _, ok := results[v.Name()]; !ok {
|
||||
results[v.Name()] = &bytes.Buffer{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// quirk: our execution engine doesn't guarantee the order of execution for disconnected DAGS
|
||||
// so that our function-with-side effects call to `to` may run _after_ the test instead of before.
|
||||
// running twice makes sure that `to` happens at least once before we run the test.
|
||||
// this time we use a call to `run` so that the assertion error is triggered
|
||||
runCalls := stdlib.TestingRunCalls(pkg)
|
||||
pkg.Files[len(pkg.Files)-1] = runCalls
|
||||
|
||||
bs, err = json.Marshal(pkg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
req = &query.Request{
|
||||
OrganizationID: l.Org.ID,
|
||||
Compiler: lang.ASTCompiler{AST: bs},
|
||||
}
|
||||
r, err := l.FluxQueryService().Query(ctx, req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
needInspect := false
|
||||
for r.More() {
|
||||
v := r.Next()
|
||||
if err := v.Tables().Do(func(tbl flux.Table) error {
|
||||
return tbl.Do(func(reader flux.ColReader) error {
|
||||
return nil
|
||||
})
|
||||
}); err != nil {
|
||||
needInspect = true
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
if err := r.Err(); err != nil {
|
||||
t.Error(err)
|
||||
needInspect = true
|
||||
}
|
||||
if needInspect {
|
||||
// Replace the testing.run calls with testing.inspect calls.
|
||||
pkg.Files[len(pkg.Files)-1] = inspectCalls
|
||||
bs, err = json.Marshal(pkg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req = &query.Request{
|
||||
OrganizationID: l.Org.ID,
|
||||
Compiler: lang.ASTCompiler{AST: bs},
|
||||
}
|
||||
r, err := l.FluxQueryService().Query(ctx, req)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
var out bytes.Buffer
|
||||
defer func() {
|
||||
if t.Failed() {
|
||||
scanner := bufio.NewScanner(&out)
|
||||
for scanner.Scan() {
|
||||
t.Log(scanner.Text())
|
||||
}
|
||||
}
|
||||
}()
|
||||
for r.More() {
|
||||
v := r.Next()
|
||||
err := execute.FormatResult(&out, v)
|
||||
err := execute.FormatResult(results[v.Name()], v)
|
||||
if err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
@ -307,5 +241,22 @@ func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
|||
if err := r.Err(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
logFormatted := func( name string, results map[string]*bytes.Buffer ) {
|
||||
if _, ok := results[name]; ok {
|
||||
scanner := bufio.NewScanner(results[name])
|
||||
for scanner.Scan() {
|
||||
t.Log( scanner.Text())
|
||||
}
|
||||
} else {
|
||||
t.Log( "table ", name, " not present in results" )
|
||||
}
|
||||
}
|
||||
if _, ok := results["diff"]; ok {
|
||||
t.Error("diff table was not empty")
|
||||
logFormatted("diff", results)
|
||||
logFormatted("want", results)
|
||||
logFormatted("got", results)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue