Merge pull request #12165 from influxdata/flux-staging

update to Flux v0.21.0
pull/12203/head
Jonathan A. Sternberg 2019-02-26 13:12:37 -06:00 committed by GitHub
commit dd50e10d6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1205 additions and 222 deletions

6
go.mod
View File

@ -28,7 +28,7 @@ require (
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/docker/docker v1.13.1 // indirect
github.com/duosecurity/duo_api_golang v0.0.0-20190107154727-539434bf0d45 // indirect
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190208081306-44819d1a54a8
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190219201458-ead62885d7c8
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/fatih/structs v1.1.0 // indirect
github.com/getkin/kin-openapi v0.1.1-0.20190103155524-1fa206970bc1
@ -62,7 +62,7 @@ require (
github.com/hashicorp/vault v0.11.5
github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect
github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect
github.com/influxdata/flux v0.20.0
github.com/influxdata/flux v0.21.0
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6
github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368
github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect
@ -127,7 +127,7 @@ require (
google.golang.org/grpc v1.17.0
gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect
gopkg.in/editorconfig/editorconfig-core-go.v1 v1.3.0 // indirect
gopkg.in/ini.v1 v1.41.0 // indirect
gopkg.in/ini.v1 v1.42.0 // indirect
gopkg.in/ldap.v2 v2.5.1 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5

12
go.sum
View File

@ -101,8 +101,8 @@ github.com/duosecurity/duo_api_golang v0.0.0-20190107154727-539434bf0d45 h1:AbIf
github.com/duosecurity/duo_api_golang v0.0.0-20190107154727-539434bf0d45/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190208081306-44819d1a54a8 h1:s04AS/nXlTOt+hrPd50Wm0uC3/26lBuB17bNvAW3zqI=
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190208081306-44819d1a54a8/go.mod h1:S5wy26xEAot3+yfuCE3/NlL9n/O/TDX/5xt1rGWAOXY=
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190219201458-ead62885d7c8 h1:wRymegulm4k5oGb5rJFqZ79CFsoqKsmnJaY4hfgyhB4=
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190219201458-ead62885d7c8/go.mod h1:S5wy26xEAot3+yfuCE3/NlL9n/O/TDX/5xt1rGWAOXY=
github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk=
github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/emirpasic/gods v1.9.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
@ -224,8 +224,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.20.0 h1:v+R/pQGamzx5XVB03dCuVnHPnoreLTNLDrrNJkS5xEA=
github.com/influxdata/flux v0.20.0/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/flux v0.21.0 h1:9Oz3lZg8BrFpJLv2+mqWsfnofPGZc56Upa+w8JxAWzg=
github.com/influxdata/flux v0.21.0/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY=
github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo=
github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo=
@ -488,8 +488,8 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.41.0 h1:Ka3ViY6gNYSKiVy71zXBEqKplnV35ImDLVG+8uoIklE=
gopkg.in/ini.v1 v1.41.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ldap.v2 v2.5.1 h1:wiu0okdNfjlBzg6UWvd1Hn8Y+Ux17/u/4nlk4CQr6tU=
gopkg.in/ldap.v2 v2.5.1/go.mod h1:oI0cpe/D7HRtBQl8aTg+ZmzFUAvu4lsv3eLXMLGFxWk=
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU=

View File

@ -16,25 +16,23 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/parser"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxql"
)
// QueryRequest is a flux query request.
type QueryRequest struct {
Extern *ast.File `json:"extern,omitempty"`
Spec *flux.Spec `json:"spec,omitempty"`
AST *ast.Package `json:"ast,omitempty"`
Query string `json:"query"`
Type string `json:"type"`
Dialect QueryDialect `json:"dialect"`
Org *platform.Organization `json:"-"`
Org *influxdb.Organization `json:"-"`
}
// QueryDialect is the formatting options for the query response.
@ -70,6 +68,13 @@ func (r QueryRequest) Validate() error {
return errors.New(`request body requires either query, spec, or AST`)
}
if r.Spec != nil && r.Extern != nil {
return &influxdb.Error{
Code: influxdb.EInvalid,
Msg: "request body cannot specify both a spec and external declarations",
}
}
if r.Type != "flux" {
return fmt.Errorf(`unknown query type: %s`, r.Type)
}
@ -206,47 +211,6 @@ func columnFromCharacter(q string, char int) int {
var influxqlParseErrorRE = regexp.MustCompile(`^(.+) at line (\d+), char (\d+)$`)
func nowFunc(now time.Time) values.Function {
timeVal := values.NewTime(values.ConvertTime(now))
ftype := semantic.NewFunctionPolyType(semantic.FunctionPolySignature{
Return: semantic.Time,
})
call := func(args values.Object) (values.Value, error) {
return timeVal, nil
}
sideEffect := false
return values.NewFunction("now", ftype, call, sideEffect)
}
func toSpec(p *ast.Package, now func() time.Time) (*flux.Spec, error) {
semProg, err := semantic.New(p)
if err != nil {
return nil, err
}
scope := flux.Prelude()
scope.Set("now", nowFunc(now()))
itrp := interpreter.NewInterpreter()
sideEffects, err := itrp.Eval(semProg, scope, flux.StdLib())
if err != nil {
return nil, err
}
nowOpt, ok := scope.Lookup("now")
if !ok {
return nil, fmt.Errorf("now option not set")
}
nowTime, err := nowOpt.Function().Call(nil)
if err != nil {
return nil, err
}
return flux.ToSpec(sideEffects, nowTime.Time().Time())
}
// ProxyRequest returns a request to proxy from the flux.
func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) {
return r.proxyRequest(time.Now)
@ -259,18 +223,27 @@ func (r QueryRequest) proxyRequest(now func() time.Time) (*query.ProxyRequest, e
// Query is preferred over spec
var compiler flux.Compiler
if r.Query != "" {
compiler = lang.FluxCompiler{
Query: r.Query,
}
} else if r.AST != nil {
var err error
r.Spec, err = toSpec(r.AST, now)
pkg, err := flux.Parse(r.Query)
if err != nil {
return nil, err
}
compiler = lang.SpecCompiler{
Spec: r.Spec,
c := lang.ASTCompiler{
AST: pkg,
Now: now,
}
if r.Extern != nil {
c.PrependFile(r.Extern)
}
compiler = c
} else if r.AST != nil {
c := lang.ASTCompiler{
AST: r.AST,
Now: now,
}
if r.Extern != nil {
c.PrependFile(r.Extern)
}
compiler = c
} else if r.Spec != nil {
compiler = lang.SpecCompiler{
Spec: r.Spec,
@ -312,6 +285,9 @@ func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error
case lang.SpecCompiler:
qr.Type = "flux"
qr.Spec = c.Spec
case lang.ASTCompiler:
qr.Type = "flux"
qr.AST = c.AST
default:
return nil, fmt.Errorf("unsupported compiler %T", c)
}
@ -330,7 +306,7 @@ func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error
return qr, nil
}
func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*QueryRequest, error) {
func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.OrganizationService) (*QueryRequest, error) {
var req QueryRequest
var contentType = "application/json"
@ -365,7 +341,7 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ
return &req, err
}
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth platform.Authorizer, svc platform.OrganizationService) (*query.ProxyRequest, error) {
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth influxdb.Authorizer, svc influxdb.OrganizationService) (*query.ProxyRequest, error) {
req, err := decodeQueryRequest(ctx, r, svc)
if err != nil {
return nil, err
@ -376,10 +352,10 @@ func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth platform
return nil, err
}
a, ok := auth.(*platform.Authorization)
a, ok := auth.(*influxdb.Authorization)
if !ok {
// TODO(desa): this should go away once we're using platform.Authorizers everywhere.
return pr, platform.ErrAuthorizerNotSupported
// TODO(desa): this should go away once we're using influxdb.Authorizers everywhere.
return pr, influxdb.ErrAuthorizerNotSupported
}
pr.Request.Authorization = a

View File

@ -71,10 +71,7 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
defer results.Release()
// Setup headers
stats, hasStats := results.(flux.Statisticser)
if hasStats {
w.Header().Set("Trailer", queryStatisticsTrailer)
}
w.Header().Set("Trailer", queryStatisticsTrailer)
// NOTE: We do not write out the headers here.
// It is possible that if the encoding step fails
@ -102,15 +99,13 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
}
}
if hasStats {
data, err := json.Marshal(stats.Statistics())
if err != nil {
h.Logger.Info("Failed to encode statistics", zap.Error(err))
return
}
// Write statisitcs trailer
w.Header().Set(queryStatisticsTrailer, string(data))
data, err := json.Marshal(results.Statistics())
if err != nil {
h.Logger.Info("Failed to encode statistics", zap.Error(err))
return
}
// Write statisitcs trailer
w.Header().Set(queryStatisticsTrailer, string(data))
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.

View File

@ -22,6 +22,27 @@ import (
_ "github.com/influxdata/influxdb/query/builtin"
)
var compareASTCompiler = func(x, y lang.ASTCompiler) bool {
if x.Now == nil && y.Now != nil {
return false
}
if x.Now != nil && y.Now == nil {
return false
}
if x.Now != nil && y.Now != nil && !x.Now().Equal(y.Now()) {
return false
}
return cmp.Equal(x.AST, y.AST, cmpopts.IgnoreTypes(ast.BaseNode{}))
}
var cmpOptions = cmp.Options{
cmpopts.IgnoreTypes(ast.BaseNode{}),
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
cmpopts.IgnoreUnexported(query.Request{}),
cmpopts.IgnoreUnexported(flux.Spec{}),
cmpopts.EquateEmpty(),
}
func TestQueryRequest_WithDefaults(t *testing.T) {
type fields struct {
Spec *flux.Spec
@ -67,6 +88,7 @@ func TestQueryRequest_WithDefaults(t *testing.T) {
func TestQueryRequest_Validate(t *testing.T) {
type fields struct {
Extern *ast.File
Spec *flux.Spec
AST *ast.Package
Query string
@ -86,6 +108,19 @@ func TestQueryRequest_Validate(t *testing.T) {
},
wantErr: true,
},
{
name: "query cannot have both extern and spec",
fields: fields{
Extern: &ast.File{},
Spec: &flux.Spec{},
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
},
},
wantErr: true,
},
{
name: "requires flux type",
fields: fields{
@ -166,6 +201,7 @@ func TestQueryRequest_Validate(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := QueryRequest{
Extern: tt.fields.Extern,
Spec: tt.fields.Spec,
AST: tt.fields.AST,
Query: tt.fields.Query,
@ -180,59 +216,9 @@ func TestQueryRequest_Validate(t *testing.T) {
}
}
func Test_toSpec(t *testing.T) {
type args struct {
p *ast.Package
now func() time.Time
}
tests := []struct {
name string
args args
want *flux.Spec
wantErr bool
}{
{
name: "ast converts to spec",
args: args{
p: &ast.Package{},
now: func() time.Time { return time.Unix(0, 0) },
},
want: &flux.Spec{
Now: time.Unix(0, 0).UTC(),
},
},
{
name: "bad semantics error",
args: args{
p: &ast.Package{
Files: []*ast.File{{
Body: []ast.Statement{
&ast.ReturnStatement{},
},
}},
},
now: func() time.Time { return time.Unix(0, 0) },
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := toSpec(tt.args.p, tt.args.now)
if (err != nil) != tt.wantErr {
t.Errorf("toSpec() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("toSpec() = %v, want %v", got, tt.want)
}
})
}
}
func TestQueryRequest_proxyRequest(t *testing.T) {
type fields struct {
Extern *ast.File
Spec *flux.Spec
AST *ast.Package
Query string
@ -265,10 +251,23 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
},
org: &platform.Organization{},
},
now: func() time.Time { return time.Unix(1, 1) },
want: &query.ProxyRequest{
Request: query.Request{
Compiler: lang.FluxCompiler{
Query: "howdy",
Compiler: lang.ASTCompiler{
AST: &ast.Package{
Package: "main",
Files: []*ast.File{
{
Body: []ast.Statement{
&ast.ExpressionStatement{
Expression: &ast.Identifier{Name: "howdy"},
},
},
},
},
},
Now: func() time.Time { return time.Unix(1, 1) },
},
},
Dialect: &csv.Dialect{
@ -290,15 +289,64 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
},
org: &platform.Organization{},
},
now: func() time.Time { return time.Unix(0, 0).UTC() },
now: func() time.Time { return time.Unix(1, 1) },
want: &query.ProxyRequest{
Request: query.Request{
Compiler: lang.SpecCompiler{
Spec: &flux.Spec{
Now: time.Unix(0, 0).UTC(),
Compiler: lang.ASTCompiler{
AST: &ast.Package{},
Now: func() time.Time { return time.Unix(1, 1) },
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
{
name: "valid AST with extern",
fields: fields{
Extern: &ast.File{
Body: []ast.Statement{
&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "x"},
Init: &ast.IntegerLiteral{Value: 0},
},
},
},
},
AST: &ast.Package{},
Type: "flux",
Dialect: QueryDialect{
Delimiter: ",",
DateTimeFormat: "RFC3339",
},
org: &platform.Organization{},
},
now: func() time.Time { return time.Unix(1, 1) },
want: &query.ProxyRequest{
Request: query.Request{
Compiler: lang.ASTCompiler{
AST: &ast.Package{
Files: []*ast.File{
{
Body: []ast.Statement{
&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "x"},
Init: &ast.IntegerLiteral{Value: 0},
},
},
},
},
},
},
Now: func() time.Time { return time.Unix(1, 1) },
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
@ -337,9 +385,11 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
},
},
}
cmpOptions := append(cmpOptions, cmp.Comparer(compareASTCompiler))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := QueryRequest{
Extern: tt.fields.Extern,
Spec: tt.fields.Spec,
AST: tt.fields.AST,
Query: tt.fields.Query,
@ -352,8 +402,8 @@ func TestQueryRequest_proxyRequest(t *testing.T) {
t.Errorf("QueryRequest.ProxyRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("QueryRequest.ProxyRequest() = %#v, want %#v", got, tt.want)
if !cmp.Equal(got, tt.want, cmpOptions...) {
t.Errorf("QueryRequest.ProxyRequest() -want/+got\n%s", cmp.Diff(tt.want, got, cmpOptions...))
}
})
}
@ -482,8 +532,104 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
want: &query.ProxyRequest{
Request: query.Request{
OrganizationID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(),
Compiler: lang.FluxCompiler{
Query: "from()",
Compiler: lang.ASTCompiler{
AST: &ast.Package{
Package: "main",
Files: []*ast.File{
{
Body: []ast.Statement{
&ast.ExpressionStatement{
Expression: &ast.CallExpression{
Callee: &ast.Identifier{Name: "from"},
},
},
},
},
},
},
},
},
Dialect: &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: false,
Delimiter: ',',
},
},
},
},
{
name: "valid query including extern definition",
args: args{
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`
{
"extern": {
"type": "File",
"body": [
{
"type": "OptionStatement",
"assignment": {
"type": "VariableAssignment",
"id": {
"type": "Identifier",
"name": "x"
},
"init": {
"type": "IntegerLiteral",
"value": "0"
}
}
}
]
},
"query": "from(bucket: \"mybucket\")"
}
`)),
svc: &mock.OrganizationService{
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
return &platform.Organization{
ID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(),
}, nil
},
},
},
want: &query.ProxyRequest{
Request: query.Request{
OrganizationID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(),
Compiler: lang.ASTCompiler{
AST: &ast.Package{
Package: "main",
Files: []*ast.File{
{
Body: []ast.Statement{
&ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "x"},
Init: &ast.IntegerLiteral{Value: 0},
},
},
},
},
{
Body: []ast.Statement{
&ast.ExpressionStatement{
Expression: &ast.CallExpression{
Callee: &ast.Identifier{Name: "from"},
Arguments: []ast.Expression{
&ast.ObjectExpression{
Properties: []*ast.Property{
{
Key: &ast.Identifier{Name: "bucket"},
Value: &ast.StringLiteral{Value: "mybucket"},
},
},
},
},
},
},
},
},
},
},
},
},
Dialect: &csv.Dialect{
@ -513,8 +659,31 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
want: &query.ProxyRequest{
Request: query.Request{
OrganizationID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(),
Compiler: lang.FluxCompiler{
Query: "from(bucket: \"mybucket\")",
Compiler: lang.ASTCompiler{
AST: &ast.Package{
Package: "main",
Files: []*ast.File{
{
Body: []ast.Statement{
&ast.ExpressionStatement{
Expression: &ast.CallExpression{
Callee: &ast.Identifier{Name: "from"},
Arguments: []ast.Expression{
&ast.ObjectExpression{
Properties: []*ast.Property{
{
Key: &ast.Identifier{Name: "bucket"},
Value: &ast.StringLiteral{Value: "mybucket"},
},
},
},
},
},
},
},
},
},
},
},
},
Dialect: &csv.Dialect{
@ -526,11 +695,7 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
},
},
}
var cmpOptions = cmp.Options{
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
cmpopts.IgnoreUnexported(query.Request{}),
cmpopts.EquateEmpty(),
}
cmpOptions := append(cmpOptions, cmpopts.IgnoreFields(lang.ASTCompiler{}, "Now"))
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := decodeProxyQueryRequest(tt.args.ctx, tt.args.r, tt.args.auth, tt.args.svc)
@ -538,8 +703,8 @@ func Test_decodeProxyQueryRequest(t *testing.T) {
t.Errorf("decodeProxyQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
return
}
if diff := cmp.Diff(got, tt.want, cmpOptions...); diff != "" {
t.Errorf("decodeProxyQueryRequest() = got/want %v", diff)
if !cmp.Equal(tt.want, got, cmpOptions...) {
t.Errorf("decodeProxyQueryRequest() -want/+got\n%s", cmp.Diff(tt.want, got, cmpOptions...))
}
})
}

View File

@ -4705,6 +4705,8 @@ components:
required:
- query
properties:
extern:
$ref: "#/components/schemas/File"
query:
description: query script to execute.
type: string
@ -4728,6 +4730,413 @@ components:
type: string
dialect:
$ref: "#/components/schemas/Dialect"
Package:
description: represents a complete package source tree
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
path:
description: package import path
type: string
package:
description: package name
type: string
files:
description: package files
type: array
items:
$ref: "#/components/schemas/File"
File:
description: represents a source from a single file
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
name:
description: name of the file
type: string
package:
$ref: "#/components/schemas/PackageClause"
imports:
description: a list of package imports
type: array
items:
$ref: "#/components/schemas/ImportDeclaration"
body:
description: list of Flux statements
type: array
items:
$ref: "#/components/schemas/Statement"
PackageClause:
description: defines a package identifier
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
name:
$ref: "#/components/schemas/Identifier"
ImportDeclaration:
description: declares a package import
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
as:
$ref: "#/components/schemas/Identifier"
path:
$ref: "#/components/schemas/StringLiteral"
Node:
oneOf:
- $ref: "#/components/schemas/Expression"
- $ref: "#/components/schemas/Block"
Block:
description: a set of statements
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
body:
description: block body
type: array
items:
$ref: "#/components/schemas/Statement"
Statement:
oneOf:
- $ref: "#/components/schemas/BadStatement"
- $ref: "#/components/schemas/VariableAssignment"
- $ref: "#/components/schemas/MemberAssignment"
- $ref: "#/components/schemas/ExpressionStatement"
- $ref: "#/components/schemas/ReturnStatement"
- $ref: "#/components/schemas/OptionStatement"
- $ref: "#/components/schemas/BuiltinStatement"
- $ref: "#/components/schemas/TestStatement"
BadStatement:
description: a placeholder for statements for which no correct statement nodes can be created
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
text:
description: raw source text
type: string
VariableAssignment:
description: represents the declaration of a variable
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
id:
$ref: "#/components/schemas/Identifier"
init:
$ref: "#/components/schemas/Expression"
MemberAssignment:
description: object property assignment
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
member:
$ref: "#/components/schemas/MemberExpression"
init:
$ref: "#/components/schemas/Expression"
ExpressionStatement:
description: may consist of an expression that does not return a value and is executed solely for its side-effects
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
expression:
$ref: "#/components/schemas/Expression"
ReturnStatement:
description: defines an expression to return
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
argument:
$ref: "#/components/schemas/Expression"
OptionStatement:
description: a single variable declaration
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
assignment:
oneOf:
- $ref: "#/components/schemas/VariableAssignment"
- $ref: "#/components/schemas/MemberAssignment"
BuiltinStatement:
description: declares a builtin identifier and its type
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
id:
$ref: "#/components/schemas/Identifier"
TestStatement:
description: declares a Flux test case
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
assignment:
$ref: "#/components/schemas/VariableAssignment"
Expression:
oneOf:
- $ref: "#/components/schemas/ArrayExpression"
- $ref: "#/components/schemas/FunctionExpression"
- $ref: "#/components/schemas/BinaryExpression"
- $ref: "#/components/schemas/CallExpression"
- $ref: "#/components/schemas/ConditionalExpression"
- $ref: "#/components/schemas/LogicalExpression"
- $ref: "#/components/schemas/MemberExpression"
- $ref: "#/components/schemas/IndexExpression"
- $ref: "#/components/schemas/ObjectExpression"
- $ref: "#/components/schemas/PipeExpression"
- $ref: "#/components/schemas/UnaryExpression"
- $ref: "#/components/schemas/BooleanLiteral"
- $ref: "#/components/schemas/DateTimeLiteral"
- $ref: "#/components/schemas/DurationLiteral"
- $ref: "#/components/schemas/FloatLiteral"
- $ref: "#/components/schemas/IntegerLiteral"
- $ref: "#/components/schemas/PipeLiteral"
- $ref: "#/components/schemas/RegexpLiteral"
- $ref: "#/components/schemas/StringLiteral"
- $ref: "#/components/schemas/UnsignedIntegerLiteral"
- $ref: "#/components/schemas/Identifier"
ArrayExpression:
description: used to create and directly specify the elements of an array object
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
elements:
description: elements of the array
type: array
items:
$ref: "#/components/schemas/Expression"
FunctionExpression:
description: function expression
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
params:
description: function parameters
type: array
items:
$ref: "#/components/schemas/Property"
body:
$ref: "#/components/schemas/Node"
BinaryExpression:
description: uses binary operators to act on two operands in an expression
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
operator:
type: string
left:
$ref: "#/components/schemas/Expression"
right:
$ref: "#/components/schemas/Expression"
CallExpression:
description: represents a function call
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
callee:
$ref: "#/components/schemas/Expression"
arguments:
description: function arguments
type: array
items:
$ref: "#/components/schemas/Expression"
ConditionalExpression:
description: selects one of two expressions, `Alternate` or `Consequent`, depending on a third boolean expression, `Test`
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
test:
$ref: "#/components/schemas/Expression"
alternate:
$ref: "#/components/schemas/Expression"
consequent:
$ref: "#/components/schemas/Expression"
LogicalExpression:
description: represents the rule conditions that collectively evaluate to either true or false
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
operator:
type: string
left:
$ref: "#/components/schemas/Expression"
right:
$ref: "#/components/schemas/Expression"
MemberExpression:
description: represents accessing a property of an object
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
object:
$ref: "#/components/schemas/Expression"
property:
$ref: "#/components/schemas/PropertyKey"
IndexExpression:
description: represents indexing into an array
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
array:
$ref: "#/components/schemas/Expression"
index:
$ref: "#/components/schemas/Expression"
ObjectExpression:
description: allows the declaration of an anonymous object within a declaration
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
properties:
description: object properties
type: array
items:
$ref: "#/components/schemas/Property"
PipeExpression:
description: call expression with pipe argument
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
argument:
$ref: "#/components/schemas/Expression"
call:
$ref: "#/components/schemas/CallExpression"
UnaryExpression:
description: uses operators to act on a single operand in an expression
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
operator:
type: string
argument:
$ref: "#/components/schemas/Expression"
BooleanLiteral:
description: represents boolean values
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
value:
type: boolean
DateTimeLiteral:
description: represents an instant in time with nanosecond precision using the syntax of golang's RFC3339 Nanosecond variant
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
value:
type: string
DurationLiteral:
description: represents the elapsed time between two instants as an int64 nanosecond count with syntax of golang's time.Duration
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
values:
description: duration values
type: array
items:
$ref: "#/components/schemas/Duration"
FloatLiteral:
description: represents floating point numbers according to the double representations defined by the IEEE-754-1985
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
value:
type: number
IntegerLiteral:
description: represents integer numbers
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
value:
type: string
PipeLiteral:
description: represents a specialized literal value, indicating the left hand value of a pipe expression
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
RegexpLiteral:
description: expressions begin and end with `/` and are regular expressions with syntax accepted by RE2
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
value:
type: string
StringLiteral:
description: expressions begin and end with double quote marks
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
value:
type: string
UnsignedIntegerLiteral:
description: represents integer numbers
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
value:
type: string
Duration:
description: a pair consisting of length of time and the unit of time measured. It is the atomic unit from which all duration literals are composed.
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
magnitude:
type: integer
unit:
type: string
Property:
description: the value associated with a key
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
key:
$ref: "#/components/schemas/PropertyKey"
value:
$ref: "#/components/schemas/Expression"
PropertyKey:
oneOf:
- $ref: "#/components/schemas/Identifier"
- $ref: "#/components/schemas/StringLiteral"
Identifier:
description: a valid Flux identifier
type: object
properties:
type:
$ref: "#/components/schemas/NodeType"
name:
type: string
NodeType:
description: type of AST node
type: string
QuerySpecification:
description: consists of a set of operations and a set of edges between those operations to instruct the query engine to operate.
type: object
@ -7104,8 +7513,7 @@ components:
type: object
properties:
ast:
description: the AST of the supplied Flux query
type: object
$ref: "#/components/schemas/Package"
WritePrecision:
type: string
enum:

View File

@ -36,11 +36,8 @@ func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *Pr
defer results.Release()
// Setup headers
stats, hasStats := results.(flux.Statisticser)
if hasStats {
if w, ok := w.(http.ResponseWriter); ok {
w.Header().Set("Trailer", "Influx-Query-Statistics")
}
if w, ok := w.(http.ResponseWriter); ok {
w.Header().Set("Trailer", "Influx-Query-Statistics")
}
encoder := req.Dialect.Encoder()
@ -50,10 +47,8 @@ func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *Pr
}
if w, ok := w.(http.ResponseWriter); ok {
if hasStats {
data, _ := json.Marshal(stats.Statistics())
w.Header().Set("Influx-Query-Statistics", string(data))
}
data, _ := json.Marshal(results.Statistics())
w.Header().Set("Influx-Query-Statistics", string(data))
}
return n, nil

View File

@ -42,12 +42,10 @@ func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *Prox
}
// Check if this result iterator reports stats. We call this defer before cancel because
// the query needs to be finished before it will have valid statistics.
if s, ok := results.(flux.Statisticser); ok {
defer func() {
stats = s.Statistics()
}()
}
defer results.Release()
defer func() {
results.Release()
stats = results.Statistics()
}()
encoder := req.Dialect.Encoder()
n, err = encoder.Encode(w, results)

View File

@ -10,6 +10,7 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/semantic"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
)
@ -76,7 +77,7 @@ type source struct {
currentTime execute.Time
overflow bool
stats flux.Statistics
stats cursors.CursorStats
}
func NewSource(id execute.DatasetID, r Reader, readSpec ReadSpec, bounds execute.Bounds, w execute.Window, currentTime execute.Time) execute.Source {
@ -101,6 +102,13 @@ func (s *source) Run(ctx context.Context) {
}
}
func (s *source) Metadata() flux.Metadata {
return flux.Metadata{
"influxdb/scanned-bytes": []interface{}{s.stats.ScannedBytes},
"influxdb/scanned-values": []interface{}{s.stats.ScannedValues},
}
}
func (s *source) run(ctx context.Context) error {
//TODO(nathanielc): Pass through context to actual network I/O.
for tables, mark, ok := s.next(ctx); ok; tables, mark, ok = s.next(ctx) {
@ -120,7 +128,12 @@ func (s *source) run(ctx context.Context) error {
if err != nil {
return err
}
s.stats = s.stats.Add(tables.Statistics())
// Track the number of bytes and values scanned.
stats := tables.Statistics()
s.stats.ScannedValues += stats.ScannedValues
s.stats.ScannedBytes += stats.ScannedBytes
for _, t := range s.ts {
if err := t.UpdateWatermark(s.id, mark); err != nil {
return err
@ -130,7 +143,7 @@ func (s *source) run(ctx context.Context) error {
return nil
}
func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bool) {
func (s *source) next(ctx context.Context) (TableIterator, execute.Time, bool) {
if s.overflow {
return nil, 0, false
}
@ -163,10 +176,6 @@ func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bo
return bi, stop, true
}
func (s *source) Statistics() flux.Statistics {
return s.stats
}
type GroupMode int
const (
@ -228,6 +237,12 @@ type ReadSpec struct {
}
type Reader interface {
Read(ctx context.Context, rs ReadSpec, start, stop execute.Time) (flux.TableIterator, error)
Read(ctx context.Context, rs ReadSpec, start, stop execute.Time) (TableIterator, error)
Close()
}
// TableIterator is a table iterator that also keeps track of cursor statistics from the storage engine.
type TableIterator interface {
flux.TableIterator
Statistics() cursors.CursorStats
}

View File

@ -545,6 +545,10 @@ func writeTable(t *ToTransformation, tbl flux.Table) error {
}
fieldValues.Range(func(k string, v values.Value) {
if v.IsNull() {
fields[k] = nil
return
}
switch v.Type() {
case semantic.Float:
fields[k] = v.Float()

View File

@ -4,4 +4,5 @@ package stdlib
import (
_ "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb"
_ "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb/v1"
_ "github.com/influxdata/influxdb/query/stdlib/testing"
)

View File

@ -0,0 +1,419 @@
package testing_test
import (
"bufio"
"bytes"
"context"
"io"
"io/ioutil"
nethttp "net/http"
"os"
"path/filepath"
"strings"
"testing"
"github.com/influxdata/flux"
"github.com/influxdata/flux/ast"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/parser"
"github.com/influxdata/flux/stdlib"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/bolt"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
"github.com/influxdata/influxdb/http"
"github.com/influxdata/influxdb/query"
_ "github.com/influxdata/flux/stdlib" // Import the built-in functions
_ "github.com/influxdata/influxdb/query/stdlib" // Import the stdlib
)
// Default context.
var ctx = context.Background()
func init() {
flux.FinalizeBuiltIns()
}
var skipTests = map[string]string{
// TODO(adam) determine the reason for these test failures.
"cov": "Reason TBD",
"covariance": "Reason TBD",
"cumulative_sum": "Reason TBD",
"cumulative_sum_default": "Reason TBD",
"cumulative_sum_noop": "Reason TBD",
"difference_panic": "Reason TBD",
"drop_non_existent": "Reason TBD",
"filter_by_regex_function": "Reason TBD",
"first": "Reason TBD",
"group_by_irregular": "Reason TBD",
"highestAverage": "Reason TBD",
"highestMax": "Reason TBD",
"histogram": "Reason TBD",
"histogram_normalize": "Reason TBD",
"histogram_quantile": "Reason TBD",
"join": "Reason TBD",
"join_across_measurements": "Reason TBD",
"keep_non_existent": "Reason TBD",
"key_values": "Reason TBD",
"key_values_host_name": "Reason TBD",
"last": "Reason TBD",
"lowestAverage": "Reason TBD",
"max": "Reason TBD",
"meta_query_fields": "Reason TBD",
"meta_query_keys": "Reason TBD",
"meta_query_measurements": "Reason TBD",
"min": "Reason TBD",
"multiple_range": "Reason TBD",
"sample": "Reason TBD",
"selector_preserve_time": "Reason TBD",
"shift": "Reason TBD",
"shift_negative_duration": "Reason TBD",
"show_all_tag_keys": "Reason TBD",
"sort": "Reason TBD",
"task_per_line": "Reason TBD",
"top": "Reason TBD",
"union": "Reason TBD",
"union_heterogeneous": "Reason TBD",
"unique": "Reason TBD",
"distinct": "Reason TBD",
// it appears these occur when writing the input data. `to` may not be null safe.
"fill_bool": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_float": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_int": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_string": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_time": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"fill_uint": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64",
"window_null": "failed to read meta data: panic: interface conversion: interface {} is nil, not float64",
// these may just be missing calls to range() in the tests. easy to fix in a new PR.
"group_nulls": "unbounded test",
"integral": "unbounded test",
"integral_columns": "unbounded test",
"map": "unbounded test",
// the following tests have a difference between the CSV-decoded input table, and the storage-retrieved version of that table
"columns": "group key mismatch",
"count": "column order mismatch",
"mean": "column order mismatch",
"percentile_aggregate": "column order mismatch",
"percentile_tdigest": "column order mismatch",
"set": "column order mismatch",
"set_new_column": "column order mismatch",
"skew": "column order mismatch",
"spread": "column order mismatch",
"stddev": "column order mismatch",
"sum": "column order mismatch",
"simple_max": "_stop missing from expected output",
"derivative": "time bounds mismatch (engine uses now() instead of bounds on input table)",
"percentile": "time bounds mismatch (engine uses now() instead of bounds on input table)",
"difference_columns": "data write/read path loses columns x and y",
"keys": "group key mismatch",
"pivot_task_test": "possible group key or column order mismatch",
// failed to read meta data errors: the CSV encoding is incomplete probably due to data schema errors. needs more detailed investigation to find root cause of error
"filter_by_regex": "failed to read metadata",
"filter_by_tags": "failed to read metadata",
"group": "failed to read metadata",
"group_ungroup": "failed to read metadata",
"pivot_mean": "failed to read metadata",
"select_measurement": "failed to read metadata",
"select_measurement_field": "failed to read metadata",
"histogram_quantile_minvalue": "failed to read meta data: no column with label _measurement exists",
"increase": "failed to read meta data: table has no _value column",
"string_max": "error: invalid use of function: *functions.MaxSelector has no implementation for type string (https://github.com/influxdata/platform/issues/224)",
"null_as_value": "null not supported as value in influxql (https://github.com/influxdata/platform/issues/353)",
"string_interp": "string interpolation not working as expected in flux (https://github.com/influxdata/platform/issues/404)",
"to": "to functions are not supported in the testing framework (https://github.com/influxdata/flux/issues/77)",
"covariance_missing_column_1": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"covariance_missing_column_2": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"drop_before_rename": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"drop_referenced": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
"yield": "yield requires special test case (https://github.com/influxdata/flux/issues/535)",
}
func TestFluxEndToEnd(t *testing.T) {
runEndToEnd(t, stdlib.FluxTestPackages)
}
func BenchmarkFluxEndToEnd(b *testing.B) {
benchEndToEnd(b, stdlib.FluxTestPackages)
}
func runEndToEnd(t *testing.T, pkgs []*ast.Package) {
l := RunMainOrFail(t, ctx)
l.SetupOrFail(t)
defer l.ShutdownOrFail(t, ctx)
for _, pkg := range pkgs {
pkg := pkg.Copy().(*ast.Package)
name := pkg.Files[0].Name
t.Run(name, func(t *testing.T) {
if reason, ok := skipTests[strings.TrimSuffix(name, ".flux")]; ok {
t.Skip(reason)
}
testFlux(t, l, pkg)
})
}
}
func benchEndToEnd(b *testing.B, pkgs []*ast.Package) {
l := RunMainOrFail(b, ctx)
l.SetupOrFail(b)
defer l.ShutdownOrFail(b, ctx)
for _, pkg := range pkgs {
pkg := pkg.Copy().(*ast.Package)
name := pkg.Files[0].Name
b.Run(name, func(b *testing.B) {
if reason, ok := skipTests[strings.TrimSuffix(name, ".flux")]; ok {
b.Skip(reason)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
testFlux(b, l, pkg)
}
})
}
}
var optionsSource = `
import "testing"
import c "csv"
// Options bucket and org are defined dynamically per test
option testing.loadStorage = (csv) => {
c.from(csv: csv) |> to(bucket: bucket, org: org)
return from(bucket: bucket)
}
`
var optionsAST *ast.File
func init() {
pkg := parser.ParseSource(optionsSource)
if ast.Check(pkg) > 0 {
panic(ast.GetError(pkg))
}
optionsAST = pkg.Files[0]
}
func testFlux(t testing.TB, l *Launcher, pkg *ast.Package) {
// Query server to ensure write persists.
b := &platform.Bucket{
Organization: "ORG",
Name: t.Name(),
RetentionPeriod: 0,
}
s := l.BucketService()
if err := s.CreateBucket(context.Background(), b); err != nil {
t.Fatal(err)
}
// Define bucket and org options
bucketOpt := &ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "bucket"},
Init: &ast.StringLiteral{Value: b.Name},
},
}
orgOpt := &ast.OptionStatement{
Assignment: &ast.VariableAssignment{
ID: &ast.Identifier{Name: "org"},
Init: &ast.StringLiteral{Value: b.Organization},
},
}
options := optionsAST.Copy().(*ast.File)
options.Body = append([]ast.Statement{bucketOpt, orgOpt}, options.Body...)
// Add options to pkg
pkg.Files = append(pkg.Files, options)
// Add testing.inspect call to ensure the data is loaded
inspectCalls := stdlib.TestingInspectCalls(pkg)
pkg.Files = append(pkg.Files, inspectCalls)
req := &query.Request{
OrganizationID: l.Org.ID,
Compiler: lang.ASTCompiler{AST: pkg},
}
if r, err := l.FluxService().Query(ctx, req); err != nil {
t.Fatal(err)
} else {
for r.More() {
v := r.Next()
if err := v.Tables().Do(func(tbl flux.Table) error {
return nil
}); err != nil {
t.Error(err)
}
}
}
// quirk: our execution engine doesn't guarantee the order of execution for disconnected DAGS
// so that our function-with-side effects call to `to` may run _after_ the test instead of before.
// running twice makes sure that `to` happens at least once before we run the test.
// this time we use a call to `run` so that the assertion error is triggered
runCalls := stdlib.TestingRunCalls(pkg)
pkg.Files[len(pkg.Files)-1] = runCalls
r, err := l.FluxService().Query(ctx, req)
if err != nil {
t.Fatal(err)
}
for r.More() {
v := r.Next()
if err := v.Tables().Do(func(tbl flux.Table) error {
return nil
}); err != nil {
t.Error(err)
}
}
if err := r.Err(); err != nil {
t.Error(err)
// Replace the testing.run calls with testing.inspect calls.
pkg.Files[len(pkg.Files)-1] = inspectCalls
r, err := l.FluxService().Query(ctx, req)
if err != nil {
t.Fatal(err)
}
var out bytes.Buffer
defer func() {
if t.Failed() {
scanner := bufio.NewScanner(&out)
for scanner.Scan() {
t.Log(scanner.Text())
}
}
}()
for r.More() {
v := r.Next()
err := execute.FormatResult(&out, v)
if err != nil {
t.Error(err)
}
}
if err := r.Err(); err != nil {
t.Error(err)
}
}
}
// Launcher is a test wrapper for main.Launcher.
type Launcher struct {
*launcher.Launcher
// Root temporary directory for all data.
Path string
// Initialized after calling the Setup() helper.
User *platform.User
Org *platform.Organization
Bucket *platform.Bucket
Auth *platform.Authorization
// Standard in/out/err buffers.
Stdin bytes.Buffer
Stdout bytes.Buffer
Stderr bytes.Buffer
}
// NewLauncher returns a new instance of Launcher.
func NewLauncher() *Launcher {
l := &Launcher{Launcher: launcher.NewLauncher()}
l.Launcher.Stdin = &l.Stdin
l.Launcher.Stdout = &l.Stdout
l.Launcher.Stderr = &l.Stderr
if testing.Verbose() {
l.Launcher.Stdout = io.MultiWriter(l.Launcher.Stdout, os.Stdout)
l.Launcher.Stderr = io.MultiWriter(l.Launcher.Stderr, os.Stderr)
}
path, err := ioutil.TempDir("", "")
if err != nil {
panic(err)
}
l.Path = path
return l
}
// RunMainOrFail initializes and starts the server.
func RunMainOrFail(tb testing.TB, ctx context.Context, args ...string) *Launcher {
tb.Helper()
l := NewLauncher()
if err := l.Run(ctx, args...); err != nil {
tb.Fatal(err)
}
return l
}
// Run executes the program with additional arguments to set paths and ports.
func (l *Launcher) Run(ctx context.Context, args ...string) error {
args = append(args, "--bolt-path", filepath.Join(l.Path, "influxd.bolt"))
args = append(args, "--protos-path", filepath.Join(l.Path, "protos"))
args = append(args, "--engine-path", filepath.Join(l.Path, "engine"))
args = append(args, "--http-bind-address", "127.0.0.1:0")
args = append(args, "--log-level", "debug")
return l.Launcher.Run(ctx, args...)
}
// Shutdown stops the program and cleans up temporary paths.
func (l *Launcher) Shutdown(ctx context.Context) error {
l.Cancel()
l.Launcher.Shutdown(ctx)
return os.RemoveAll(l.Path)
}
// ShutdownOrFail stops the program and cleans up temporary paths. Fail on error.
func (l *Launcher) ShutdownOrFail(tb testing.TB, ctx context.Context) {
tb.Helper()
if err := l.Shutdown(ctx); err != nil {
tb.Fatal(err)
}
}
// SetupOrFail creates a new user, bucket, org, and auth token. Fail on error.
func (l *Launcher) SetupOrFail(tb testing.TB) {
svc := &http.SetupService{Addr: l.URL()}
results, err := svc.Generate(ctx, &platform.OnboardingRequest{
User: "USER",
Password: "PASSWORD",
Org: "ORG",
Bucket: "BUCKET",
})
if err != nil {
tb.Fatal(err)
}
l.User = results.User
l.Org = results.Org
l.Bucket = results.Bucket
l.Auth = results.Auth
}
func (l *Launcher) FluxService() *http.FluxQueryService {
return &http.FluxQueryService{Addr: l.URL(), Token: l.Auth.Token}
}
func (l *Launcher) BucketService() *http.BucketService {
return &http.BucketService{
Addr: l.URL(),
Token: l.Auth.Token,
OpPrefix: bolt.OpPrefix,
}
}
// MustNewHTTPRequest returns a new nethttp.Request with base URL and auth attached. Fail on error.
func (l *Launcher) MustNewHTTPRequest(method, rawurl, body string) *nethttp.Request {
req, err := nethttp.NewRequest(method, l.URL()+rawurl, strings.NewReader(body))
if err != nil {
panic(err)
}
req.Header.Set("Authorization", "Token "+l.Auth.Token)
return req
}

View File

@ -0,0 +1 @@
package testing

View File

@ -20,6 +20,7 @@ type storageTable interface {
flux.Table
Close()
Cancel()
Statistics() cursors.CursorStats
}
type storeReader struct {
@ -30,7 +31,7 @@ func NewReader(s Store) influxdb.Reader {
return &storeReader{s: s}
}
func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, stop execute.Time) (flux.TableIterator, error) {
func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, stop execute.Time) (influxdb.TableIterator, error) {
var predicate *datatypes.Predicate
if rs.Predicate != nil {
p, err := toStoragePredicate(rs.Predicate)
@ -57,10 +58,10 @@ type tableIterator struct {
s Store
readSpec influxdb.ReadSpec
predicate *datatypes.Predicate
stats flux.Statistics
stats cursors.CursorStats
}
func (bi *tableIterator) Statistics() flux.Statistics { return bi.stats }
func (bi *tableIterator) Statistics() cursors.CursorStats { return bi.stats }
func (bi *tableIterator) Do(f func(flux.Table) error) error {
src, err := bi.s.GetSource(bi.readSpec)
@ -192,7 +193,9 @@ READ:
}
table.Close()
bi.stats = bi.stats.Add(table.Statistics())
stats := table.Statistics()
bi.stats.ScannedValues += stats.ScannedValues
bi.stats.ScannedBytes += stats.ScannedBytes
table = nil
}
return rs.Err()
@ -316,6 +319,9 @@ READ:
}
table.Close()
stats := table.Statistics()
bi.stats.ScannedValues += stats.ScannedValues
bi.stats.ScannedBytes += stats.ScannedBytes
table = nil
gc = rs.Next()

View File

@ -7,12 +7,12 @@
package reads
import (
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/memory"
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
@ -57,15 +57,15 @@ func (t *floatTable) Close() {
t.mu.Unlock()
}
func (t *floatTable) Statistics() flux.Statistics {
func (t *floatTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -233,12 +233,12 @@ func (t *floatGroupTable) advanceCursor() bool {
return false
}
func (t *floatGroupTable) Statistics() flux.Statistics {
func (t *floatGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -283,15 +283,15 @@ func (t *integerTable) Close() {
t.mu.Unlock()
}
func (t *integerTable) Statistics() flux.Statistics {
func (t *integerTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -459,12 +459,12 @@ func (t *integerGroupTable) advanceCursor() bool {
return false
}
func (t *integerGroupTable) Statistics() flux.Statistics {
func (t *integerGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -509,15 +509,15 @@ func (t *unsignedTable) Close() {
t.mu.Unlock()
}
func (t *unsignedTable) Statistics() flux.Statistics {
func (t *unsignedTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -685,12 +685,12 @@ func (t *unsignedGroupTable) advanceCursor() bool {
return false
}
func (t *unsignedGroupTable) Statistics() flux.Statistics {
func (t *unsignedGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -735,15 +735,15 @@ func (t *stringTable) Close() {
t.mu.Unlock()
}
func (t *stringTable) Statistics() flux.Statistics {
func (t *stringTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -911,12 +911,12 @@ func (t *stringGroupTable) advanceCursor() bool {
return false
}
func (t *stringGroupTable) Statistics() flux.Statistics {
func (t *stringGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -961,15 +961,15 @@ func (t *booleanTable) Close() {
t.mu.Unlock()
}
func (t *booleanTable) Statistics() flux.Statistics {
func (t *booleanTable) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -1137,12 +1137,12 @@ func (t *booleanGroupTable) advanceCursor() bool {
return false
}
func (t *booleanGroupTable) Statistics() flux.Statistics {
func (t *booleanGroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}

View File

@ -1,12 +1,12 @@
package reads
import (
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/memory"
"sync"
"github.com/influxdata/flux"
"github.com/influxdata/flux/arrow"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/memory"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb/cursors"
"github.com/pkg/errors"
@ -51,15 +51,15 @@ func (t *{{.name}}Table) Close() {
t.mu.Unlock()
}
func (t *{{.name}}Table) Statistics() flux.Statistics {
func (t *{{.name}}Table) Statistics() cursors.CursorStats {
t.mu.Lock()
defer t.mu.Unlock()
cur := t.cur
if cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}
@ -227,12 +227,12 @@ func (t *{{.name}}GroupTable) advanceCursor() bool {
return false
}
func (t *{{.name}}GroupTable) Statistics() flux.Statistics {
func (t *{{.name}}GroupTable) Statistics() cursors.CursorStats {
if t.cur == nil {
return flux.Statistics{}
return cursors.CursorStats{}
}
cs := t.cur.Stats()
return flux.Statistics{
return cursors.CursorStats{
ScannedValues: cs.ScannedValues,
ScannedBytes: cs.ScannedBytes,
}

View File

@ -211,7 +211,7 @@ func newTableNoPoints(
func (t *tableNoPoints) Close() {}
func (t *tableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} }
func (t *tableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} }
func (t *tableNoPoints) Do(f func(flux.ColReader) error) error {
if t.isCancelled() {
@ -251,7 +251,7 @@ func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error {
return t.err
}
func (t *groupTableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} }
func (t *groupTableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} }
func (t *floatTable) toArrowBuffer(vs []float64) *array.Float64 {
return arrow.NewFloat(vs, &memory.Allocator{})

2
ui/package-lock.json generated
View File

@ -11029,7 +11029,7 @@
"dependencies": {
"minimist": {
"version": "0.0.10",
"resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.10.tgz",
"resolved": "http://registry.npmjs.org/minimist/-/minimist-0.0.10.tgz",
"integrity": "sha1-3j+YVD2/lggr5IrRoMfNqDYwHc8=",
"dev": true
},