feat(internal/cmd): add internal command for influxdb flux test harness (#20508)
The internal command can be used to execute flux tests using the same internals as the `flux test` command, but it will invoke these tests against a test influxdb instance that is launched in-process.pull/20572/head
parent
10fcc2bf1d
commit
f6669f7512
|
@ -104,7 +104,7 @@ func (tl *TestLauncher) RunOrFail(tb testing.TB, ctx context.Context, setters ..
|
||||||
|
|
||||||
// Run executes the program with additional arguments to set paths and ports.
|
// Run executes the program with additional arguments to set paths and ports.
|
||||||
// Passed arguments will overwrite/add to the default ones.
|
// Passed arguments will overwrite/add to the default ones.
|
||||||
func (tl *TestLauncher) Run(tb testing.TB, ctx context.Context, setters ...OptSetter) error {
|
func (tl *TestLauncher) Run(tb zaptest.TestingT, ctx context.Context, setters ...OptSetter) error {
|
||||||
opts := newOpts(viper.New())
|
opts := newOpts(viper.New())
|
||||||
if !tl.realServer {
|
if !tl.realServer {
|
||||||
opts.StoreType = "memory"
|
opts.StoreType = "memory"
|
||||||
|
|
|
@ -0,0 +1,233 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/influxdata/flux"
|
||||||
|
"github.com/influxdata/flux/ast"
|
||||||
|
"github.com/influxdata/flux/cmd/flux/cmd"
|
||||||
|
"github.com/influxdata/flux/execute/table"
|
||||||
|
"github.com/influxdata/flux/lang"
|
||||||
|
"github.com/influxdata/flux/parser"
|
||||||
|
"github.com/influxdata/flux/stdlib"
|
||||||
|
"github.com/influxdata/influxdb/v2"
|
||||||
|
"github.com/influxdata/influxdb/v2/cmd/influxd/launcher"
|
||||||
|
"github.com/influxdata/influxdb/v2/query"
|
||||||
|
"github.com/spf13/cobra"
|
||||||
|
)
|
||||||
|
|
||||||
|
type testExecutor struct {
|
||||||
|
ctx context.Context
|
||||||
|
l *launcher.TestLauncher
|
||||||
|
writeOptAST *ast.File
|
||||||
|
readOptAST *ast.File
|
||||||
|
errOutput bytes.Buffer
|
||||||
|
i int
|
||||||
|
failed bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTestExecutor(ctx context.Context) (cmd.TestExecutor, error) {
|
||||||
|
e := &testExecutor{ctx: ctx}
|
||||||
|
e.init()
|
||||||
|
|
||||||
|
e.l = launcher.NewTestLauncher()
|
||||||
|
if err := e.l.Run(e, ctx); err != nil {
|
||||||
|
_ = e.l.Shutdown(context.Background())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.l.Setup(); err != nil {
|
||||||
|
_ = e.l.Shutdown(context.Background())
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return e, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) init() {
|
||||||
|
t.writeOptAST = prepareOptions(writeOptSource)
|
||||||
|
t.readOptAST = prepareOptions(readOptSource)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) Close() error {
|
||||||
|
if t.l == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := t.l.Shutdown(context.Background()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.l = nil
|
||||||
|
|
||||||
|
if t.Failed() {
|
||||||
|
_, _ = io.Copy(os.Stdout, &t.errOutput)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) Run(pkg *ast.Package) error {
|
||||||
|
l := t.l.Launcher
|
||||||
|
b := &influxdb.Bucket{
|
||||||
|
OrgID: t.l.Org.ID,
|
||||||
|
Name: fmt.Sprintf("%04d", t.i),
|
||||||
|
}
|
||||||
|
t.i++
|
||||||
|
|
||||||
|
s := l.BucketService()
|
||||||
|
if err := s.CreateBucket(t.ctx, b); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer func() { _ = s.DeleteBucket(t.ctx, b.ID) }()
|
||||||
|
|
||||||
|
// Define bucket and org options
|
||||||
|
bucketOpt := &ast.OptionStatement{
|
||||||
|
Assignment: &ast.VariableAssignment{
|
||||||
|
ID: &ast.Identifier{Name: "bucket"},
|
||||||
|
Init: &ast.StringLiteral{Value: b.Name},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
orgOpt := &ast.OptionStatement{
|
||||||
|
Assignment: &ast.VariableAssignment{
|
||||||
|
ID: &ast.Identifier{Name: "org"},
|
||||||
|
Init: &ast.StringLiteral{Value: t.l.Org.Name},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// During the first execution, we are performing the writes
|
||||||
|
// that are in the testcase. We do not care about errors.
|
||||||
|
_ = t.executeWithOptions(bucketOpt, orgOpt, t.writeOptAST, pkg)
|
||||||
|
|
||||||
|
// Execute the read pass.
|
||||||
|
return t.executeWithOptions(bucketOpt, orgOpt, t.readOptAST, pkg)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) executeWithOptions(bucketOpt, orgOpt *ast.OptionStatement, optionsAST *ast.File, pkg *ast.Package) error {
|
||||||
|
options := optionsAST.Copy().(*ast.File)
|
||||||
|
options.Body = append([]ast.Statement{bucketOpt, orgOpt}, options.Body...)
|
||||||
|
|
||||||
|
// Add options to pkg
|
||||||
|
pkg = pkg.Copy().(*ast.Package)
|
||||||
|
pkg.Files = append(pkg.Files, options)
|
||||||
|
|
||||||
|
// Use testing.inspect call to get all of diff, want, and got
|
||||||
|
inspectCalls := stdlib.TestingInspectCalls(pkg)
|
||||||
|
pkg.Files = append(pkg.Files, inspectCalls)
|
||||||
|
|
||||||
|
bs, err := json.Marshal(pkg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &query.Request{
|
||||||
|
OrganizationID: t.l.Org.ID,
|
||||||
|
Compiler: lang.ASTCompiler{AST: bs},
|
||||||
|
}
|
||||||
|
|
||||||
|
r, err := t.l.FluxQueryService().Query(t.ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer r.Release()
|
||||||
|
|
||||||
|
for r.More() {
|
||||||
|
v := r.Next()
|
||||||
|
|
||||||
|
if err := v.Tables().Do(func(tbl flux.Table) error {
|
||||||
|
// The data returned here is the result of `testing.diff`, so any result means that
|
||||||
|
// a comparison of two tables showed inequality. Capture that inequality as part of the error.
|
||||||
|
// XXX: rockstar (08 Dec 2020) - This could use some ergonomic work, as the diff testOutput
|
||||||
|
// is not exactly "human readable."
|
||||||
|
return fmt.Errorf("%s", table.Stringify(tbl))
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
r.Release()
|
||||||
|
return r.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// This options definition puts to() in the path of the CSV input. The tests
|
||||||
|
// get run in this case and they would normally pass, if we checked the
|
||||||
|
// results, but don't look at them.
|
||||||
|
const writeOptSource = `
|
||||||
|
import "testing"
|
||||||
|
import c "csv"
|
||||||
|
|
||||||
|
option testing.loadStorage = (csv) => {
|
||||||
|
return c.from(csv: csv) |> to(bucket: bucket, org: org)
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
// This options definition is for the second run, the test run. It loads the
|
||||||
|
// data from previously written bucket. We check the results after running this
|
||||||
|
// second pass and report on them.
|
||||||
|
const readOptSource = `
|
||||||
|
import "testing"
|
||||||
|
import c "csv"
|
||||||
|
|
||||||
|
option testing.loadStorage = (csv) => {
|
||||||
|
return from(bucket: bucket)
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
func prepareOptions(optionsSource string) *ast.File {
|
||||||
|
pkg := parser.ParseSource(optionsSource)
|
||||||
|
if ast.Check(pkg) > 0 {
|
||||||
|
panic(ast.GetError(pkg))
|
||||||
|
}
|
||||||
|
return pkg.Files[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) Logf(s string, i ...interface{}) {
|
||||||
|
_, _ = fmt.Fprintf(&t.errOutput, s, i...)
|
||||||
|
_, _ = fmt.Fprintln(&t.errOutput)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) Errorf(s string, i ...interface{}) {
|
||||||
|
t.Logf(s, i...)
|
||||||
|
t.Fail()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) Fail() {
|
||||||
|
t.failed = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) Failed() bool {
|
||||||
|
return t.failed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) Name() string {
|
||||||
|
return "flux"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *testExecutor) FailNow() {
|
||||||
|
t.Fail()
|
||||||
|
panic(errors.New("abort"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func tryExec(cmd *cobra.Command) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if e := recover(); e != nil {
|
||||||
|
var ok bool
|
||||||
|
err, ok = e.(error)
|
||||||
|
if !ok {
|
||||||
|
err = errors.New(fmt.Sprint(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
err = cmd.Execute()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
c := cmd.TestCommand(NewTestExecutor)
|
||||||
|
c.Use = "fluxtest-harness-influxdb"
|
||||||
|
if err := tryExec(c); err != nil {
|
||||||
|
os.Exit(1)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue