compare tables directly for generated end-to-end tests
parent
a3ad02288b
commit
2384927047
|
@ -1,16 +1,22 @@
|
|||
package functions_test
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/execute/executetest"
|
||||
ifql "github.com/influxdata/flux/influxql"
|
||||
"github.com/influxdata/flux/lang"
|
||||
"github.com/influxdata/flux/memory"
|
||||
"github.com/influxdata/flux/querytest"
|
||||
"github.com/influxdata/platform"
|
||||
"github.com/influxdata/platform/mock"
|
||||
|
@ -123,7 +129,7 @@ func Test_GeneratedInfluxQLQueries(t *testing.T) {
|
|||
if skip {
|
||||
t.Skip(reason)
|
||||
}
|
||||
testGeneratedInfluxQL(t, querier, prefix, ".influxql")
|
||||
testGeneratedInfluxQL(t, prefix, ".influxql")
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -231,7 +237,7 @@ func testInfluxQL(t testing.TB, querier *querytest.Querier, prefix, queryExt str
|
|||
QueryTestCheckSpec(t, querier, req, string(jsonOut))
|
||||
}
|
||||
|
||||
func testGeneratedInfluxQL(t testing.TB, querier *querytest.Querier, prefix, queryExt string) {
|
||||
func testGeneratedInfluxQL(t testing.TB, prefix, queryExt string) {
|
||||
q, err := ioutil.ReadFile(prefix + queryExt)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
|
@ -240,26 +246,89 @@ func testGeneratedInfluxQL(t testing.TB, querier *querytest.Querier, prefix, que
|
|||
t.Skip("influxql query is missing")
|
||||
}
|
||||
|
||||
jsonInFileName := prefix + ".in.json"
|
||||
jsonOut, err := ioutil.ReadFile(prefix + ".out.json")
|
||||
inFile := prefix + ".in.json"
|
||||
outFile := prefix + ".out.json"
|
||||
|
||||
out, err := jsonToResultIterator(outFile)
|
||||
defer out.Release()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Fatalf("failed to read expected JSON results: %v", err)
|
||||
}
|
||||
|
||||
compiler := influxql.NewCompiler(dbrpMappingSvc)
|
||||
compiler.Cluster = "cluster"
|
||||
compiler.DB = "db0"
|
||||
compiler.Query = string(q)
|
||||
var exp []flux.Result
|
||||
for out.More() {
|
||||
exp = append(exp, out.Next())
|
||||
}
|
||||
|
||||
res, err := resultsFromQuerier(querier, influxQLCompiler(string(q), inFile))
|
||||
defer res.Release()
|
||||
if err != nil {
|
||||
t.Fatalf("failed to run query: %v", err)
|
||||
}
|
||||
|
||||
var got []flux.Result
|
||||
for res.More() {
|
||||
got = append(got, res.Next())
|
||||
}
|
||||
|
||||
if ok, err := executetest.EqualResults(exp, got); !ok {
|
||||
t.Errorf("result not as expected: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func resultsFromQuerier(querier *querytest.Querier, compiler flux.Compiler) (flux.ResultIterator, error) {
|
||||
req := &query.ProxyRequest{
|
||||
Request: query.Request{
|
||||
Compiler: querytest.FromInfluxJSONCompiler{
|
||||
Compiler: compiler,
|
||||
InputFile: jsonInFileName,
|
||||
},
|
||||
Compiler: compiler,
|
||||
},
|
||||
Dialect: new(influxql.Dialect),
|
||||
}
|
||||
QueryTestCheckSpec(t, querier, req, string(jsonOut))
|
||||
jsonBuf, err := queryToJSON(querier, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
decoder := ifql.NewResultDecoder(new(memory.Allocator))
|
||||
return decoder.Decode(ioutil.NopCloser(jsonBuf))
|
||||
}
|
||||
|
||||
func influxQLCompiler(query, filename string) querytest.FromInfluxJSONCompiler {
|
||||
compiler := influxql.NewCompiler(dbrpMappingSvc)
|
||||
compiler.Cluster = "cluster"
|
||||
compiler.DB = "db0"
|
||||
compiler.Query = query
|
||||
return querytest.FromInfluxJSONCompiler{
|
||||
Compiler: compiler,
|
||||
InputFile: filename,
|
||||
}
|
||||
}
|
||||
|
||||
func queryToJSON(querier *querytest.Querier, req *query.ProxyRequest) (io.ReadCloser, error) {
|
||||
var buf bytes.Buffer
|
||||
_, err := querier.Query(context.Background(), &buf, req.Request.Compiler, req.Dialect)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ioutil.NopCloser(&buf), nil
|
||||
}
|
||||
|
||||
func jsonToResultIterator(file string) (flux.ResultIterator, error) {
|
||||
f, err := os.Open(file)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Reader for influxql json file
|
||||
jsonReader := bufio.NewReaderSize(f, 8196)
|
||||
|
||||
// InfluxQL json -> Flux tables decoder
|
||||
decoder := ifql.NewResultDecoder(new(memory.Allocator))
|
||||
|
||||
// Decode json into Flux tables
|
||||
results, err := decoder.Decode(ioutil.NopCloser(jsonReader))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func QueryTestCheckSpec(t testing.TB, querier *querytest.Querier, req *query.ProxyRequest, want string) {
|
||||
|
|
Loading…
Reference in New Issue