fix: convert flux e2e test harness to two-pass (#19672)
It appears that the double write caused by using to() inside a separate execution environment (experimental.chain) causes flux e2e tests to behave unpredictably, when coupled with the 1.x storage engine. Removing the second write by using two passes, one to write to the db, then another to run the test, eliminates the flakiness. Verified by running e2e tests in parallel times 8 for 12 hours without any flakiness observed. Before the fix, the flakiness would take approx 30 minutes on avgerage to exhibit. This commit also removes universe/to_time from the skipped tests because it was added when this flakiness was discovered.pull/19685/head
parent
999660ae2e
commit
feb41819ec
|
@ -151,32 +151,47 @@ func makeTestPackage(file *ast.File) *ast.Package {
|
|||
return pkg
|
||||
}
|
||||
|
||||
var optionsSource = `
|
||||
// This options definition puts to() in the path of the CSV input. The tests
|
||||
// get run in this case and they would normally pass, if we checked the
|
||||
// results, but don't look at them.
|
||||
var writeOptSource = `
|
||||
import "testing"
|
||||
import c "csv"
|
||||
import "experimental"
|
||||
|
||||
// Options bucket and org are defined dynamically per test
|
||||
|
||||
option testing.loadStorage = (csv) => {
|
||||
return experimental.chain(
|
||||
first: c.from(csv: csv) |> to(bucket: bucket, org: org),
|
||||
second: from(bucket:bucket)
|
||||
)
|
||||
return c.from(csv: csv) |> to(bucket: bucket, org: org)
|
||||
}
|
||||
`
|
||||
var optionsAST *ast.File
|
||||
|
||||
func init() {
|
||||
// This options definition is for the second run, the test run. It loads the
|
||||
// data from previously written bucket. We check the results after runnig this
|
||||
// second pass and report on them.
|
||||
var readOptSource = `
|
||||
import "testing"
|
||||
import c "csv"
|
||||
|
||||
option testing.loadStorage = (csv) => {
|
||||
return from(bucket: bucket)
|
||||
}
|
||||
`
|
||||
|
||||
var writeOptAST *ast.File
|
||||
var readOptAST *ast.File
|
||||
|
||||
func prepareOptions(optionsSource string) *ast.File {
|
||||
pkg := parser.ParseSource(optionsSource)
|
||||
if ast.Check(pkg) > 0 {
|
||||
panic(ast.GetError(pkg))
|
||||
}
|
||||
optionsAST = pkg.Files[0]
|
||||
return pkg.Files[0]
|
||||
}
|
||||
|
||||
func init() {
|
||||
writeOptAST = prepareOptions(writeOptSource)
|
||||
readOptAST = prepareOptions(readOptSource)
|
||||
}
|
||||
|
||||
func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
||||
|
||||
b := &platform.Bucket{
|
||||
OrgID: l.Org.ID,
|
||||
Name: t.Name(),
|
||||
|
@ -206,6 +221,34 @@ func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
|||
Init: &ast.StringLiteral{Value: l.Org.Name},
|
||||
},
|
||||
}
|
||||
|
||||
executeWithOptions(t, l, bucketOpt, orgOpt, writeOptAST, file)
|
||||
|
||||
results := executeWithOptions(t, l, bucketOpt, orgOpt, readOptAST, file)
|
||||
if results != nil {
|
||||
logFormatted := func(name string, results map[string]*bytes.Buffer) {
|
||||
if _, ok := results[name]; ok {
|
||||
scanner := bufio.NewScanner(results[name])
|
||||
for scanner.Scan() {
|
||||
t.Log(scanner.Text())
|
||||
}
|
||||
} else {
|
||||
t.Log("table ", name, " not present in results")
|
||||
}
|
||||
}
|
||||
if _, ok := results["diff"]; ok {
|
||||
t.Error("diff table was not empty")
|
||||
logFormatted("diff", results)
|
||||
logFormatted("want", results)
|
||||
logFormatted("got", results)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func executeWithOptions(t testing.TB, l *launcher.TestLauncher, bucketOpt *ast.OptionStatement,
|
||||
orgOpt *ast.OptionStatement, optionsAST *ast.File, file *ast.File) map[string]*bytes.Buffer {
|
||||
var results map[string]*bytes.Buffer
|
||||
|
||||
options := optionsAST.Copy().(*ast.File)
|
||||
options.Body = append([]ast.Statement{bucketOpt, orgOpt}, options.Body...)
|
||||
|
||||
|
@ -230,7 +273,7 @@ func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
|||
if r, err := l.FluxQueryService().Query(ctx, req); err != nil {
|
||||
t.Fatal(err)
|
||||
} else {
|
||||
results := make(map[string]*bytes.Buffer)
|
||||
results = make(map[string]*bytes.Buffer)
|
||||
|
||||
for r.More() {
|
||||
v := r.Next()
|
||||
|
@ -246,22 +289,6 @@ func testFlux(t testing.TB, l *launcher.TestLauncher, file *ast.File) {
|
|||
if err := r.Err(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
logFormatted := func(name string, results map[string]*bytes.Buffer) {
|
||||
if _, ok := results[name]; ok {
|
||||
scanner := bufio.NewScanner(results[name])
|
||||
for scanner.Scan() {
|
||||
t.Log(scanner.Text())
|
||||
}
|
||||
} else {
|
||||
t.Log("table ", name, " not present in results")
|
||||
}
|
||||
}
|
||||
if _, ok := results["diff"]; ok {
|
||||
t.Error("diff table was not empty")
|
||||
logFormatted("diff", results)
|
||||
logFormatted("want", results)
|
||||
logFormatted("got", results)
|
||||
}
|
||||
}
|
||||
return results
|
||||
}
|
||||
|
|
|
@ -92,8 +92,6 @@ var FluxEndToEndSkipList = map[string]map[string]string{
|
|||
|
||||
"holt_winters_panic": "Expected output is an empty table which breaks the testing framework (https://github.com/influxdata/influxdb/issues/14749)",
|
||||
"map_nulls": "to cannot write null values",
|
||||
|
||||
"to_time": "Flaky test https://github.com/influxdata/influxdb/issues/19577",
|
||||
},
|
||||
"experimental": {
|
||||
"set": "Reason TBD",
|
||||
|
|
Loading…
Reference in New Issue