diff --git a/go.mod b/go.mod index cd6c726afb..53d8fee1f5 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 github.com/docker/docker v1.13.1 // indirect github.com/duosecurity/duo_api_golang v0.0.0-20190107154727-539434bf0d45 // indirect - github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190208081306-44819d1a54a8 + github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190219201458-ead62885d7c8 github.com/elazarl/go-bindata-assetfs v1.0.0 github.com/fatih/structs v1.1.0 // indirect github.com/getkin/kin-openapi v0.1.1-0.20190103155524-1fa206970bc1 @@ -62,7 +62,7 @@ require ( github.com/hashicorp/vault v0.11.5 github.com/hashicorp/vault-plugin-secrets-kv v0.0.0-20181106190520-2236f141171e // indirect github.com/hashicorp/yamux v0.0.0-20181012175058-2f1d1f20f75d // indirect - github.com/influxdata/flux v0.20.0 + github.com/influxdata/flux v0.21.0 github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 github.com/jefferai/jsonx v0.0.0-20160721235117-9cc31c3135ee // indirect @@ -127,7 +127,7 @@ require ( google.golang.org/grpc v1.17.0 gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d // indirect gopkg.in/editorconfig/editorconfig-core-go.v1 v1.3.0 // indirect - gopkg.in/ini.v1 v1.41.0 // indirect + gopkg.in/ini.v1 v1.42.0 // indirect gopkg.in/ldap.v2 v2.5.1 // indirect gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5 diff --git a/go.sum b/go.sum index 993c5ecdca..acd4b64c3b 100644 --- a/go.sum +++ b/go.sum @@ -101,8 +101,8 @@ github.com/duosecurity/duo_api_golang v0.0.0-20190107154727-539434bf0d45 h1:AbIf github.com/duosecurity/duo_api_golang v0.0.0-20190107154727-539434bf0d45/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo= github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= -github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190208081306-44819d1a54a8 h1:s04AS/nXlTOt+hrPd50Wm0uC3/26lBuB17bNvAW3zqI= -github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190208081306-44819d1a54a8/go.mod h1:S5wy26xEAot3+yfuCE3/NlL9n/O/TDX/5xt1rGWAOXY= +github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190219201458-ead62885d7c8 h1:wRymegulm4k5oGb5rJFqZ79CFsoqKsmnJaY4hfgyhB4= +github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190219201458-ead62885d7c8/go.mod h1:S5wy26xEAot3+yfuCE3/NlL9n/O/TDX/5xt1rGWAOXY= github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/emirpasic/gods v1.9.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= @@ -224,8 +224,8 @@ github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/influxdata/flux v0.20.0 h1:v+R/pQGamzx5XVB03dCuVnHPnoreLTNLDrrNJkS5xEA= -github.com/influxdata/flux v0.20.0/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY= +github.com/influxdata/flux v0.21.0 h1:9Oz3lZg8BrFpJLv2+mqWsfnofPGZc56Upa+w8JxAWzg= +github.com/influxdata/flux v0.21.0/go.mod h1:0f5Yrm4VPSd/Ne6jIVOVtPo0MFe6jpLCr6vdaZYp7wY= github.com/influxdata/goreleaser v0.97.0-influx/go.mod h1:MnjA0e0Uq6ISqjG1WxxMAl+3VS1QYjILSWVnMYDxasE= github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo= github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= @@ -488,8 +488,8 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= -gopkg.in/ini.v1 v1.41.0 h1:Ka3ViY6gNYSKiVy71zXBEqKplnV35ImDLVG+8uoIklE= -gopkg.in/ini.v1 v1.41.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= +gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ldap.v2 v2.5.1 h1:wiu0okdNfjlBzg6UWvd1Hn8Y+Ux17/u/4nlk4CQr6tU= gopkg.in/ldap.v2 v2.5.1/go.mod h1:oI0cpe/D7HRtBQl8aTg+ZmzFUAvu4lsv3eLXMLGFxWk= gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce h1:xcEWjVhvbDy+nHP67nPDDpbYrY+ILlfndk4bRioVHaU= diff --git a/http/query.go b/http/query.go index 6201a8c736..c00a3731b7 100644 --- a/http/query.go +++ b/http/query.go @@ -16,25 +16,23 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/ast" "github.com/influxdata/flux/csv" - "github.com/influxdata/flux/interpreter" "github.com/influxdata/flux/lang" "github.com/influxdata/flux/parser" - "github.com/influxdata/flux/semantic" - "github.com/influxdata/flux/values" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/query" "github.com/influxdata/influxql" ) // QueryRequest is a flux query request. type QueryRequest struct { + Extern *ast.File `json:"extern,omitempty"` Spec *flux.Spec `json:"spec,omitempty"` AST *ast.Package `json:"ast,omitempty"` Query string `json:"query"` Type string `json:"type"` Dialect QueryDialect `json:"dialect"` - Org *platform.Organization `json:"-"` + Org *influxdb.Organization `json:"-"` } // QueryDialect is the formatting options for the query response. @@ -70,6 +68,13 @@ func (r QueryRequest) Validate() error { return errors.New(`request body requires either query, spec, or AST`) } + if r.Spec != nil && r.Extern != nil { + return &influxdb.Error{ + Code: influxdb.EInvalid, + Msg: "request body cannot specify both a spec and external declarations", + } + } + if r.Type != "flux" { return fmt.Errorf(`unknown query type: %s`, r.Type) } @@ -206,47 +211,6 @@ func columnFromCharacter(q string, char int) int { var influxqlParseErrorRE = regexp.MustCompile(`^(.+) at line (\d+), char (\d+)$`) -func nowFunc(now time.Time) values.Function { - timeVal := values.NewTime(values.ConvertTime(now)) - ftype := semantic.NewFunctionPolyType(semantic.FunctionPolySignature{ - Return: semantic.Time, - }) - call := func(args values.Object) (values.Value, error) { - return timeVal, nil - } - sideEffect := false - return values.NewFunction("now", ftype, call, sideEffect) -} - -func toSpec(p *ast.Package, now func() time.Time) (*flux.Spec, error) { - semProg, err := semantic.New(p) - if err != nil { - return nil, err - } - - scope := flux.Prelude() - scope.Set("now", nowFunc(now())) - - itrp := interpreter.NewInterpreter() - - sideEffects, err := itrp.Eval(semProg, scope, flux.StdLib()) - if err != nil { - return nil, err - } - - nowOpt, ok := scope.Lookup("now") - if !ok { - return nil, fmt.Errorf("now option not set") - } - - nowTime, err := nowOpt.Function().Call(nil) - if err != nil { - return nil, err - } - - return flux.ToSpec(sideEffects, nowTime.Time().Time()) -} - // ProxyRequest returns a request to proxy from the flux. func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) { return r.proxyRequest(time.Now) @@ -259,18 +223,27 @@ func (r QueryRequest) proxyRequest(now func() time.Time) (*query.ProxyRequest, e // Query is preferred over spec var compiler flux.Compiler if r.Query != "" { - compiler = lang.FluxCompiler{ - Query: r.Query, - } - } else if r.AST != nil { - var err error - r.Spec, err = toSpec(r.AST, now) + pkg, err := flux.Parse(r.Query) if err != nil { return nil, err } - compiler = lang.SpecCompiler{ - Spec: r.Spec, + c := lang.ASTCompiler{ + AST: pkg, + Now: now, } + if r.Extern != nil { + c.PrependFile(r.Extern) + } + compiler = c + } else if r.AST != nil { + c := lang.ASTCompiler{ + AST: r.AST, + Now: now, + } + if r.Extern != nil { + c.PrependFile(r.Extern) + } + compiler = c } else if r.Spec != nil { compiler = lang.SpecCompiler{ Spec: r.Spec, @@ -312,6 +285,9 @@ func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error case lang.SpecCompiler: qr.Type = "flux" qr.Spec = c.Spec + case lang.ASTCompiler: + qr.Type = "flux" + qr.AST = c.AST default: return nil, fmt.Errorf("unsupported compiler %T", c) } @@ -330,7 +306,7 @@ func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error return qr, nil } -func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*QueryRequest, error) { +func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.OrganizationService) (*QueryRequest, error) { var req QueryRequest var contentType = "application/json" @@ -365,7 +341,7 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ return &req, err } -func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth platform.Authorizer, svc platform.OrganizationService) (*query.ProxyRequest, error) { +func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth influxdb.Authorizer, svc influxdb.OrganizationService) (*query.ProxyRequest, error) { req, err := decodeQueryRequest(ctx, r, svc) if err != nil { return nil, err @@ -376,10 +352,10 @@ func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth platform return nil, err } - a, ok := auth.(*platform.Authorization) + a, ok := auth.(*influxdb.Authorization) if !ok { - // TODO(desa): this should go away once we're using platform.Authorizers everywhere. - return pr, platform.ErrAuthorizerNotSupported + // TODO(desa): this should go away once we're using influxdb.Authorizers everywhere. + return pr, influxdb.ErrAuthorizerNotSupported } pr.Request.Authorization = a diff --git a/http/query_service.go b/http/query_service.go index f1eddb8d2d..0b9f81ce30 100644 --- a/http/query_service.go +++ b/http/query_service.go @@ -71,10 +71,7 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { defer results.Release() // Setup headers - stats, hasStats := results.(flux.Statisticser) - if hasStats { - w.Header().Set("Trailer", queryStatisticsTrailer) - } + w.Header().Set("Trailer", queryStatisticsTrailer) // NOTE: We do not write out the headers here. // It is possible that if the encoding step fails @@ -102,15 +99,13 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { } } - if hasStats { - data, err := json.Marshal(stats.Statistics()) - if err != nil { - h.Logger.Info("Failed to encode statistics", zap.Error(err)) - return - } - // Write statisitcs trailer - w.Header().Set(queryStatisticsTrailer, string(data)) + data, err := json.Marshal(results.Statistics()) + if err != nil { + h.Logger.Info("Failed to encode statistics", zap.Error(err)) + return } + // Write statisitcs trailer + w.Header().Set(queryStatisticsTrailer, string(data)) } // PrometheusCollectors satisifies the prom.PrometheusCollector interface. diff --git a/http/query_test.go b/http/query_test.go index 25a6c38179..725b47a9fd 100644 --- a/http/query_test.go +++ b/http/query_test.go @@ -22,6 +22,27 @@ import ( _ "github.com/influxdata/influxdb/query/builtin" ) +var compareASTCompiler = func(x, y lang.ASTCompiler) bool { + if x.Now == nil && y.Now != nil { + return false + } + if x.Now != nil && y.Now == nil { + return false + } + if x.Now != nil && y.Now != nil && !x.Now().Equal(y.Now()) { + return false + } + return cmp.Equal(x.AST, y.AST, cmpopts.IgnoreTypes(ast.BaseNode{})) +} + +var cmpOptions = cmp.Options{ + cmpopts.IgnoreTypes(ast.BaseNode{}), + cmpopts.IgnoreUnexported(query.ProxyRequest{}), + cmpopts.IgnoreUnexported(query.Request{}), + cmpopts.IgnoreUnexported(flux.Spec{}), + cmpopts.EquateEmpty(), +} + func TestQueryRequest_WithDefaults(t *testing.T) { type fields struct { Spec *flux.Spec @@ -67,6 +88,7 @@ func TestQueryRequest_WithDefaults(t *testing.T) { func TestQueryRequest_Validate(t *testing.T) { type fields struct { + Extern *ast.File Spec *flux.Spec AST *ast.Package Query string @@ -86,6 +108,19 @@ func TestQueryRequest_Validate(t *testing.T) { }, wantErr: true, }, + { + name: "query cannot have both extern and spec", + fields: fields{ + Extern: &ast.File{}, + Spec: &flux.Spec{}, + Type: "flux", + Dialect: QueryDialect{ + Delimiter: ",", + DateTimeFormat: "RFC3339", + }, + }, + wantErr: true, + }, { name: "requires flux type", fields: fields{ @@ -166,6 +201,7 @@ func TestQueryRequest_Validate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := QueryRequest{ + Extern: tt.fields.Extern, Spec: tt.fields.Spec, AST: tt.fields.AST, Query: tt.fields.Query, @@ -180,59 +216,9 @@ func TestQueryRequest_Validate(t *testing.T) { } } -func Test_toSpec(t *testing.T) { - type args struct { - p *ast.Package - now func() time.Time - } - tests := []struct { - name string - args args - want *flux.Spec - wantErr bool - }{ - { - name: "ast converts to spec", - args: args{ - p: &ast.Package{}, - now: func() time.Time { return time.Unix(0, 0) }, - }, - want: &flux.Spec{ - Now: time.Unix(0, 0).UTC(), - }, - }, - { - name: "bad semantics error", - args: args{ - p: &ast.Package{ - Files: []*ast.File{{ - Body: []ast.Statement{ - &ast.ReturnStatement{}, - }, - }}, - }, - now: func() time.Time { return time.Unix(0, 0) }, - }, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - - got, err := toSpec(tt.args.p, tt.args.now) - if (err != nil) != tt.wantErr { - t.Errorf("toSpec() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("toSpec() = %v, want %v", got, tt.want) - } - }) - } -} - func TestQueryRequest_proxyRequest(t *testing.T) { type fields struct { + Extern *ast.File Spec *flux.Spec AST *ast.Package Query string @@ -265,10 +251,23 @@ func TestQueryRequest_proxyRequest(t *testing.T) { }, org: &platform.Organization{}, }, + now: func() time.Time { return time.Unix(1, 1) }, want: &query.ProxyRequest{ Request: query.Request{ - Compiler: lang.FluxCompiler{ - Query: "howdy", + Compiler: lang.ASTCompiler{ + AST: &ast.Package{ + Package: "main", + Files: []*ast.File{ + { + Body: []ast.Statement{ + &ast.ExpressionStatement{ + Expression: &ast.Identifier{Name: "howdy"}, + }, + }, + }, + }, + }, + Now: func() time.Time { return time.Unix(1, 1) }, }, }, Dialect: &csv.Dialect{ @@ -290,15 +289,64 @@ func TestQueryRequest_proxyRequest(t *testing.T) { }, org: &platform.Organization{}, }, - now: func() time.Time { return time.Unix(0, 0).UTC() }, + now: func() time.Time { return time.Unix(1, 1) }, want: &query.ProxyRequest{ Request: query.Request{ - Compiler: lang.SpecCompiler{ - Spec: &flux.Spec{ - Now: time.Unix(0, 0).UTC(), + Compiler: lang.ASTCompiler{ + AST: &ast.Package{}, + Now: func() time.Time { return time.Unix(1, 1) }, + }, + }, + Dialect: &csv.Dialect{ + ResultEncoderConfig: csv.ResultEncoderConfig{ + NoHeader: false, + Delimiter: ',', + }, + }, + }, + }, + { + name: "valid AST with extern", + fields: fields{ + Extern: &ast.File{ + Body: []ast.Statement{ + &ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: "x"}, + Init: &ast.IntegerLiteral{Value: 0}, + }, }, }, }, + AST: &ast.Package{}, + Type: "flux", + Dialect: QueryDialect{ + Delimiter: ",", + DateTimeFormat: "RFC3339", + }, + org: &platform.Organization{}, + }, + now: func() time.Time { return time.Unix(1, 1) }, + want: &query.ProxyRequest{ + Request: query.Request{ + Compiler: lang.ASTCompiler{ + AST: &ast.Package{ + Files: []*ast.File{ + { + Body: []ast.Statement{ + &ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: "x"}, + Init: &ast.IntegerLiteral{Value: 0}, + }, + }, + }, + }, + }, + }, + Now: func() time.Time { return time.Unix(1, 1) }, + }, + }, Dialect: &csv.Dialect{ ResultEncoderConfig: csv.ResultEncoderConfig{ NoHeader: false, @@ -337,9 +385,11 @@ func TestQueryRequest_proxyRequest(t *testing.T) { }, }, } + cmpOptions := append(cmpOptions, cmp.Comparer(compareASTCompiler)) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { r := QueryRequest{ + Extern: tt.fields.Extern, Spec: tt.fields.Spec, AST: tt.fields.AST, Query: tt.fields.Query, @@ -352,8 +402,8 @@ func TestQueryRequest_proxyRequest(t *testing.T) { t.Errorf("QueryRequest.ProxyRequest() error = %v, wantErr %v", err, tt.wantErr) return } - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("QueryRequest.ProxyRequest() = %#v, want %#v", got, tt.want) + if !cmp.Equal(got, tt.want, cmpOptions...) { + t.Errorf("QueryRequest.ProxyRequest() -want/+got\n%s", cmp.Diff(tt.want, got, cmpOptions...)) } }) } @@ -482,8 +532,104 @@ func Test_decodeProxyQueryRequest(t *testing.T) { want: &query.ProxyRequest{ Request: query.Request{ OrganizationID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(), - Compiler: lang.FluxCompiler{ - Query: "from()", + Compiler: lang.ASTCompiler{ + AST: &ast.Package{ + Package: "main", + Files: []*ast.File{ + { + Body: []ast.Statement{ + &ast.ExpressionStatement{ + Expression: &ast.CallExpression{ + Callee: &ast.Identifier{Name: "from"}, + }, + }, + }, + }, + }, + }, + }, + }, + Dialect: &csv.Dialect{ + ResultEncoderConfig: csv.ResultEncoderConfig{ + NoHeader: false, + Delimiter: ',', + }, + }, + }, + }, + { + name: "valid query including extern definition", + args: args{ + r: httptest.NewRequest("POST", "/", bytes.NewBufferString(` +{ + "extern": { + "type": "File", + "body": [ + { + "type": "OptionStatement", + "assignment": { + "type": "VariableAssignment", + "id": { + "type": "Identifier", + "name": "x" + }, + "init": { + "type": "IntegerLiteral", + "value": "0" + } + } + } + ] + }, + "query": "from(bucket: \"mybucket\")" +} +`)), + svc: &mock.OrganizationService{ + FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) { + return &platform.Organization{ + ID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(), + }, nil + }, + }, + }, + want: &query.ProxyRequest{ + Request: query.Request{ + OrganizationID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(), + Compiler: lang.ASTCompiler{ + AST: &ast.Package{ + Package: "main", + Files: []*ast.File{ + { + Body: []ast.Statement{ + &ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: "x"}, + Init: &ast.IntegerLiteral{Value: 0}, + }, + }, + }, + }, + { + Body: []ast.Statement{ + &ast.ExpressionStatement{ + Expression: &ast.CallExpression{ + Callee: &ast.Identifier{Name: "from"}, + Arguments: []ast.Expression{ + &ast.ObjectExpression{ + Properties: []*ast.Property{ + { + Key: &ast.Identifier{Name: "bucket"}, + Value: &ast.StringLiteral{Value: "mybucket"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, }, }, Dialect: &csv.Dialect{ @@ -513,8 +659,31 @@ func Test_decodeProxyQueryRequest(t *testing.T) { want: &query.ProxyRequest{ Request: query.Request{ OrganizationID: func() platform.ID { s, _ := platform.IDFromString("deadbeefdeadbeef"); return *s }(), - Compiler: lang.FluxCompiler{ - Query: "from(bucket: \"mybucket\")", + Compiler: lang.ASTCompiler{ + AST: &ast.Package{ + Package: "main", + Files: []*ast.File{ + { + Body: []ast.Statement{ + &ast.ExpressionStatement{ + Expression: &ast.CallExpression{ + Callee: &ast.Identifier{Name: "from"}, + Arguments: []ast.Expression{ + &ast.ObjectExpression{ + Properties: []*ast.Property{ + { + Key: &ast.Identifier{Name: "bucket"}, + Value: &ast.StringLiteral{Value: "mybucket"}, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, }, }, Dialect: &csv.Dialect{ @@ -526,11 +695,7 @@ func Test_decodeProxyQueryRequest(t *testing.T) { }, }, } - var cmpOptions = cmp.Options{ - cmpopts.IgnoreUnexported(query.ProxyRequest{}), - cmpopts.IgnoreUnexported(query.Request{}), - cmpopts.EquateEmpty(), - } + cmpOptions := append(cmpOptions, cmpopts.IgnoreFields(lang.ASTCompiler{}, "Now")) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { got, err := decodeProxyQueryRequest(tt.args.ctx, tt.args.r, tt.args.auth, tt.args.svc) @@ -538,8 +703,8 @@ func Test_decodeProxyQueryRequest(t *testing.T) { t.Errorf("decodeProxyQueryRequest() error = %v, wantErr %v", err, tt.wantErr) return } - if diff := cmp.Diff(got, tt.want, cmpOptions...); diff != "" { - t.Errorf("decodeProxyQueryRequest() = got/want %v", diff) + if !cmp.Equal(tt.want, got, cmpOptions...) { + t.Errorf("decodeProxyQueryRequest() -want/+got\n%s", cmp.Diff(tt.want, got, cmpOptions...)) } }) } diff --git a/http/swagger.yml b/http/swagger.yml index bf1a84e3f6..258748eb71 100644 --- a/http/swagger.yml +++ b/http/swagger.yml @@ -4705,6 +4705,8 @@ components: required: - query properties: + extern: + $ref: "#/components/schemas/File" query: description: query script to execute. type: string @@ -4728,6 +4730,413 @@ components: type: string dialect: $ref: "#/components/schemas/Dialect" + Package: + description: represents a complete package source tree + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + path: + description: package import path + type: string + package: + description: package name + type: string + files: + description: package files + type: array + items: + $ref: "#/components/schemas/File" + File: + description: represents a source from a single file + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + name: + description: name of the file + type: string + package: + $ref: "#/components/schemas/PackageClause" + imports: + description: a list of package imports + type: array + items: + $ref: "#/components/schemas/ImportDeclaration" + body: + description: list of Flux statements + type: array + items: + $ref: "#/components/schemas/Statement" + PackageClause: + description: defines a package identifier + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + name: + $ref: "#/components/schemas/Identifier" + ImportDeclaration: + description: declares a package import + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + as: + $ref: "#/components/schemas/Identifier" + path: + $ref: "#/components/schemas/StringLiteral" + Node: + oneOf: + - $ref: "#/components/schemas/Expression" + - $ref: "#/components/schemas/Block" + Block: + description: a set of statements + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + body: + description: block body + type: array + items: + $ref: "#/components/schemas/Statement" + Statement: + oneOf: + - $ref: "#/components/schemas/BadStatement" + - $ref: "#/components/schemas/VariableAssignment" + - $ref: "#/components/schemas/MemberAssignment" + - $ref: "#/components/schemas/ExpressionStatement" + - $ref: "#/components/schemas/ReturnStatement" + - $ref: "#/components/schemas/OptionStatement" + - $ref: "#/components/schemas/BuiltinStatement" + - $ref: "#/components/schemas/TestStatement" + BadStatement: + description: a placeholder for statements for which no correct statement nodes can be created + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + text: + description: raw source text + type: string + VariableAssignment: + description: represents the declaration of a variable + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + id: + $ref: "#/components/schemas/Identifier" + init: + $ref: "#/components/schemas/Expression" + MemberAssignment: + description: object property assignment + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + member: + $ref: "#/components/schemas/MemberExpression" + init: + $ref: "#/components/schemas/Expression" + ExpressionStatement: + description: may consist of an expression that does not return a value and is executed solely for its side-effects + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + expression: + $ref: "#/components/schemas/Expression" + ReturnStatement: + description: defines an expression to return + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + argument: + $ref: "#/components/schemas/Expression" + OptionStatement: + description: a single variable declaration + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + assignment: + oneOf: + - $ref: "#/components/schemas/VariableAssignment" + - $ref: "#/components/schemas/MemberAssignment" + BuiltinStatement: + description: declares a builtin identifier and its type + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + id: + $ref: "#/components/schemas/Identifier" + TestStatement: + description: declares a Flux test case + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + assignment: + $ref: "#/components/schemas/VariableAssignment" + Expression: + oneOf: + - $ref: "#/components/schemas/ArrayExpression" + - $ref: "#/components/schemas/FunctionExpression" + - $ref: "#/components/schemas/BinaryExpression" + - $ref: "#/components/schemas/CallExpression" + - $ref: "#/components/schemas/ConditionalExpression" + - $ref: "#/components/schemas/LogicalExpression" + - $ref: "#/components/schemas/MemberExpression" + - $ref: "#/components/schemas/IndexExpression" + - $ref: "#/components/schemas/ObjectExpression" + - $ref: "#/components/schemas/PipeExpression" + - $ref: "#/components/schemas/UnaryExpression" + - $ref: "#/components/schemas/BooleanLiteral" + - $ref: "#/components/schemas/DateTimeLiteral" + - $ref: "#/components/schemas/DurationLiteral" + - $ref: "#/components/schemas/FloatLiteral" + - $ref: "#/components/schemas/IntegerLiteral" + - $ref: "#/components/schemas/PipeLiteral" + - $ref: "#/components/schemas/RegexpLiteral" + - $ref: "#/components/schemas/StringLiteral" + - $ref: "#/components/schemas/UnsignedIntegerLiteral" + - $ref: "#/components/schemas/Identifier" + ArrayExpression: + description: used to create and directly specify the elements of an array object + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + elements: + description: elements of the array + type: array + items: + $ref: "#/components/schemas/Expression" + FunctionExpression: + description: function expression + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + params: + description: function parameters + type: array + items: + $ref: "#/components/schemas/Property" + body: + $ref: "#/components/schemas/Node" + BinaryExpression: + description: uses binary operators to act on two operands in an expression + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + operator: + type: string + left: + $ref: "#/components/schemas/Expression" + right: + $ref: "#/components/schemas/Expression" + CallExpression: + description: represents a function call + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + callee: + $ref: "#/components/schemas/Expression" + arguments: + description: function arguments + type: array + items: + $ref: "#/components/schemas/Expression" + ConditionalExpression: + description: selects one of two expressions, `Alternate` or `Consequent`, depending on a third boolean expression, `Test` + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + test: + $ref: "#/components/schemas/Expression" + alternate: + $ref: "#/components/schemas/Expression" + consequent: + $ref: "#/components/schemas/Expression" + LogicalExpression: + description: represents the rule conditions that collectively evaluate to either true or false + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + operator: + type: string + left: + $ref: "#/components/schemas/Expression" + right: + $ref: "#/components/schemas/Expression" + MemberExpression: + description: represents accessing a property of an object + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + object: + $ref: "#/components/schemas/Expression" + property: + $ref: "#/components/schemas/PropertyKey" + IndexExpression: + description: represents indexing into an array + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + array: + $ref: "#/components/schemas/Expression" + index: + $ref: "#/components/schemas/Expression" + ObjectExpression: + description: allows the declaration of an anonymous object within a declaration + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + properties: + description: object properties + type: array + items: + $ref: "#/components/schemas/Property" + PipeExpression: + description: call expression with pipe argument + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + argument: + $ref: "#/components/schemas/Expression" + call: + $ref: "#/components/schemas/CallExpression" + UnaryExpression: + description: uses operators to act on a single operand in an expression + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + operator: + type: string + argument: + $ref: "#/components/schemas/Expression" + BooleanLiteral: + description: represents boolean values + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + value: + type: boolean + DateTimeLiteral: + description: represents an instant in time with nanosecond precision using the syntax of golang's RFC3339 Nanosecond variant + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + value: + type: string + DurationLiteral: + description: represents the elapsed time between two instants as an int64 nanosecond count with syntax of golang's time.Duration + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + values: + description: duration values + type: array + items: + $ref: "#/components/schemas/Duration" + FloatLiteral: + description: represents floating point numbers according to the double representations defined by the IEEE-754-1985 + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + value: + type: number + IntegerLiteral: + description: represents integer numbers + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + value: + type: string + PipeLiteral: + description: represents a specialized literal value, indicating the left hand value of a pipe expression + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + RegexpLiteral: + description: expressions begin and end with `/` and are regular expressions with syntax accepted by RE2 + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + value: + type: string + StringLiteral: + description: expressions begin and end with double quote marks + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + value: + type: string + UnsignedIntegerLiteral: + description: represents integer numbers + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + value: + type: string + Duration: + description: a pair consisting of length of time and the unit of time measured. It is the atomic unit from which all duration literals are composed. + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + magnitude: + type: integer + unit: + type: string + Property: + description: the value associated with a key + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + key: + $ref: "#/components/schemas/PropertyKey" + value: + $ref: "#/components/schemas/Expression" + PropertyKey: + oneOf: + - $ref: "#/components/schemas/Identifier" + - $ref: "#/components/schemas/StringLiteral" + Identifier: + description: a valid Flux identifier + type: object + properties: + type: + $ref: "#/components/schemas/NodeType" + name: + type: string + NodeType: + description: type of AST node + type: string QuerySpecification: description: consists of a set of operations and a set of edges between those operations to instruct the query engine to operate. type: object @@ -7104,8 +7513,7 @@ components: type: object properties: ast: - description: the AST of the supplied Flux query - type: object + $ref: "#/components/schemas/Package" WritePrecision: type: string enum: diff --git a/query/bridges.go b/query/bridges.go index c043fb7f07..91da7f7ef4 100644 --- a/query/bridges.go +++ b/query/bridges.go @@ -36,11 +36,8 @@ func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *Pr defer results.Release() // Setup headers - stats, hasStats := results.(flux.Statisticser) - if hasStats { - if w, ok := w.(http.ResponseWriter); ok { - w.Header().Set("Trailer", "Influx-Query-Statistics") - } + if w, ok := w.(http.ResponseWriter); ok { + w.Header().Set("Trailer", "Influx-Query-Statistics") } encoder := req.Dialect.Encoder() @@ -50,10 +47,8 @@ func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *Pr } if w, ok := w.(http.ResponseWriter); ok { - if hasStats { - data, _ := json.Marshal(stats.Statistics()) - w.Header().Set("Influx-Query-Statistics", string(data)) - } + data, _ := json.Marshal(results.Statistics()) + w.Header().Set("Influx-Query-Statistics", string(data)) } return n, nil diff --git a/query/logging.go b/query/logging.go index c82e60a459..f9f1c29b65 100644 --- a/query/logging.go +++ b/query/logging.go @@ -42,12 +42,10 @@ func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *Prox } // Check if this result iterator reports stats. We call this defer before cancel because // the query needs to be finished before it will have valid statistics. - if s, ok := results.(flux.Statisticser); ok { - defer func() { - stats = s.Statistics() - }() - } - defer results.Release() + defer func() { + results.Release() + stats = results.Statistics() + }() encoder := req.Dialect.Encoder() n, err = encoder.Encode(w, results) diff --git a/query/stdlib/influxdata/influxdb/storage.go b/query/stdlib/influxdata/influxdb/storage.go index 233fc46718..a0a8450b6a 100644 --- a/query/stdlib/influxdata/influxdb/storage.go +++ b/query/stdlib/influxdata/influxdb/storage.go @@ -10,6 +10,7 @@ import ( "github.com/influxdata/flux/execute" "github.com/influxdata/flux/semantic" platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/tsdb/cursors" "github.com/pkg/errors" ) @@ -76,7 +77,7 @@ type source struct { currentTime execute.Time overflow bool - stats flux.Statistics + stats cursors.CursorStats } func NewSource(id execute.DatasetID, r Reader, readSpec ReadSpec, bounds execute.Bounds, w execute.Window, currentTime execute.Time) execute.Source { @@ -101,6 +102,13 @@ func (s *source) Run(ctx context.Context) { } } +func (s *source) Metadata() flux.Metadata { + return flux.Metadata{ + "influxdb/scanned-bytes": []interface{}{s.stats.ScannedBytes}, + "influxdb/scanned-values": []interface{}{s.stats.ScannedValues}, + } +} + func (s *source) run(ctx context.Context) error { //TODO(nathanielc): Pass through context to actual network I/O. for tables, mark, ok := s.next(ctx); ok; tables, mark, ok = s.next(ctx) { @@ -120,7 +128,12 @@ func (s *source) run(ctx context.Context) error { if err != nil { return err } - s.stats = s.stats.Add(tables.Statistics()) + + // Track the number of bytes and values scanned. + stats := tables.Statistics() + s.stats.ScannedValues += stats.ScannedValues + s.stats.ScannedBytes += stats.ScannedBytes + for _, t := range s.ts { if err := t.UpdateWatermark(s.id, mark); err != nil { return err @@ -130,7 +143,7 @@ func (s *source) run(ctx context.Context) error { return nil } -func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bool) { +func (s *source) next(ctx context.Context) (TableIterator, execute.Time, bool) { if s.overflow { return nil, 0, false } @@ -163,10 +176,6 @@ func (s *source) next(ctx context.Context) (flux.TableIterator, execute.Time, bo return bi, stop, true } -func (s *source) Statistics() flux.Statistics { - return s.stats -} - type GroupMode int const ( @@ -228,6 +237,12 @@ type ReadSpec struct { } type Reader interface { - Read(ctx context.Context, rs ReadSpec, start, stop execute.Time) (flux.TableIterator, error) + Read(ctx context.Context, rs ReadSpec, start, stop execute.Time) (TableIterator, error) Close() } + +// TableIterator is a table iterator that also keeps track of cursor statistics from the storage engine. +type TableIterator interface { + flux.TableIterator + Statistics() cursors.CursorStats +} diff --git a/query/stdlib/influxdata/influxdb/to.go b/query/stdlib/influxdata/influxdb/to.go index 3df1f8b63d..ea68e18994 100644 --- a/query/stdlib/influxdata/influxdb/to.go +++ b/query/stdlib/influxdata/influxdb/to.go @@ -545,6 +545,10 @@ func writeTable(t *ToTransformation, tbl flux.Table) error { } fieldValues.Range(func(k string, v values.Value) { + if v.IsNull() { + fields[k] = nil + return + } switch v.Type() { case semantic.Float: fields[k] = v.Float() diff --git a/query/stdlib/packages.go b/query/stdlib/packages.go index 69d58389c7..d998c59165 100644 --- a/query/stdlib/packages.go +++ b/query/stdlib/packages.go @@ -4,4 +4,5 @@ package stdlib import ( _ "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb" _ "github.com/influxdata/influxdb/query/stdlib/influxdata/influxdb/v1" + _ "github.com/influxdata/influxdb/query/stdlib/testing" ) diff --git a/query/stdlib/testing/end_to_end_test.go b/query/stdlib/testing/end_to_end_test.go new file mode 100644 index 0000000000..c25486df74 --- /dev/null +++ b/query/stdlib/testing/end_to_end_test.go @@ -0,0 +1,419 @@ +package testing_test + +import ( + "bufio" + "bytes" + "context" + "io" + "io/ioutil" + nethttp "net/http" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/ast" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/lang" + "github.com/influxdata/flux/parser" + "github.com/influxdata/flux/stdlib" + + platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/bolt" + "github.com/influxdata/influxdb/cmd/influxd/launcher" + "github.com/influxdata/influxdb/http" + "github.com/influxdata/influxdb/query" + + _ "github.com/influxdata/flux/stdlib" // Import the built-in functions + _ "github.com/influxdata/influxdb/query/stdlib" // Import the stdlib +) + +// Default context. +var ctx = context.Background() + +func init() { + flux.FinalizeBuiltIns() +} + +var skipTests = map[string]string{ + // TODO(adam) determine the reason for these test failures. + "cov": "Reason TBD", + "covariance": "Reason TBD", + "cumulative_sum": "Reason TBD", + "cumulative_sum_default": "Reason TBD", + "cumulative_sum_noop": "Reason TBD", + "difference_panic": "Reason TBD", + "drop_non_existent": "Reason TBD", + "filter_by_regex_function": "Reason TBD", + "first": "Reason TBD", + "group_by_irregular": "Reason TBD", + "highestAverage": "Reason TBD", + "highestMax": "Reason TBD", + "histogram": "Reason TBD", + "histogram_normalize": "Reason TBD", + "histogram_quantile": "Reason TBD", + "join": "Reason TBD", + "join_across_measurements": "Reason TBD", + "keep_non_existent": "Reason TBD", + "key_values": "Reason TBD", + "key_values_host_name": "Reason TBD", + "last": "Reason TBD", + "lowestAverage": "Reason TBD", + "max": "Reason TBD", + "meta_query_fields": "Reason TBD", + "meta_query_keys": "Reason TBD", + "meta_query_measurements": "Reason TBD", + "min": "Reason TBD", + "multiple_range": "Reason TBD", + "sample": "Reason TBD", + "selector_preserve_time": "Reason TBD", + "shift": "Reason TBD", + "shift_negative_duration": "Reason TBD", + "show_all_tag_keys": "Reason TBD", + "sort": "Reason TBD", + "task_per_line": "Reason TBD", + "top": "Reason TBD", + "union": "Reason TBD", + "union_heterogeneous": "Reason TBD", + "unique": "Reason TBD", + "distinct": "Reason TBD", + + // it appears these occur when writing the input data. `to` may not be null safe. + "fill_bool": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64", + "fill_float": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64", + "fill_int": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64", + "fill_string": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64", + "fill_time": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64", + "fill_uint": "failed to read meta data: panic: interface conversion: interface {} is nil, not uint64", + "window_null": "failed to read meta data: panic: interface conversion: interface {} is nil, not float64", + + // these may just be missing calls to range() in the tests. easy to fix in a new PR. + "group_nulls": "unbounded test", + "integral": "unbounded test", + "integral_columns": "unbounded test", + "map": "unbounded test", + + // the following tests have a difference between the CSV-decoded input table, and the storage-retrieved version of that table + "columns": "group key mismatch", + "count": "column order mismatch", + "mean": "column order mismatch", + "percentile_aggregate": "column order mismatch", + "percentile_tdigest": "column order mismatch", + "set": "column order mismatch", + "set_new_column": "column order mismatch", + "skew": "column order mismatch", + "spread": "column order mismatch", + "stddev": "column order mismatch", + "sum": "column order mismatch", + "simple_max": "_stop missing from expected output", + "derivative": "time bounds mismatch (engine uses now() instead of bounds on input table)", + "percentile": "time bounds mismatch (engine uses now() instead of bounds on input table)", + "difference_columns": "data write/read path loses columns x and y", + "keys": "group key mismatch", + "pivot_task_test": "possible group key or column order mismatch", + + // failed to read meta data errors: the CSV encoding is incomplete probably due to data schema errors. needs more detailed investigation to find root cause of error + "filter_by_regex": "failed to read metadata", + "filter_by_tags": "failed to read metadata", + "group": "failed to read metadata", + "group_ungroup": "failed to read metadata", + "pivot_mean": "failed to read metadata", + "select_measurement": "failed to read metadata", + "select_measurement_field": "failed to read metadata", + "histogram_quantile_minvalue": "failed to read meta data: no column with label _measurement exists", + "increase": "failed to read meta data: table has no _value column", + + "string_max": "error: invalid use of function: *functions.MaxSelector has no implementation for type string (https://github.com/influxdata/platform/issues/224)", + "null_as_value": "null not supported as value in influxql (https://github.com/influxdata/platform/issues/353)", + "string_interp": "string interpolation not working as expected in flux (https://github.com/influxdata/platform/issues/404)", + "to": "to functions are not supported in the testing framework (https://github.com/influxdata/flux/issues/77)", + "covariance_missing_column_1": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)", + "covariance_missing_column_2": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)", + "drop_before_rename": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)", + "drop_referenced": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)", + "yield": "yield requires special test case (https://github.com/influxdata/flux/issues/535)", +} + +func TestFluxEndToEnd(t *testing.T) { + runEndToEnd(t, stdlib.FluxTestPackages) +} +func BenchmarkFluxEndToEnd(b *testing.B) { + benchEndToEnd(b, stdlib.FluxTestPackages) +} + +func runEndToEnd(t *testing.T, pkgs []*ast.Package) { + l := RunMainOrFail(t, ctx) + l.SetupOrFail(t) + defer l.ShutdownOrFail(t, ctx) + for _, pkg := range pkgs { + pkg := pkg.Copy().(*ast.Package) + name := pkg.Files[0].Name + t.Run(name, func(t *testing.T) { + if reason, ok := skipTests[strings.TrimSuffix(name, ".flux")]; ok { + t.Skip(reason) + } + testFlux(t, l, pkg) + }) + } +} + +func benchEndToEnd(b *testing.B, pkgs []*ast.Package) { + l := RunMainOrFail(b, ctx) + l.SetupOrFail(b) + defer l.ShutdownOrFail(b, ctx) + for _, pkg := range pkgs { + pkg := pkg.Copy().(*ast.Package) + name := pkg.Files[0].Name + b.Run(name, func(b *testing.B) { + if reason, ok := skipTests[strings.TrimSuffix(name, ".flux")]; ok { + b.Skip(reason) + } + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + testFlux(b, l, pkg) + } + }) + } +} + +var optionsSource = ` +import "testing" +import c "csv" + +// Options bucket and org are defined dynamically per test + +option testing.loadStorage = (csv) => { + c.from(csv: csv) |> to(bucket: bucket, org: org) + return from(bucket: bucket) +} +` +var optionsAST *ast.File + +func init() { + pkg := parser.ParseSource(optionsSource) + if ast.Check(pkg) > 0 { + panic(ast.GetError(pkg)) + } + optionsAST = pkg.Files[0] +} + +func testFlux(t testing.TB, l *Launcher, pkg *ast.Package) { + + // Query server to ensure write persists. + + b := &platform.Bucket{ + Organization: "ORG", + Name: t.Name(), + RetentionPeriod: 0, + } + + s := l.BucketService() + if err := s.CreateBucket(context.Background(), b); err != nil { + t.Fatal(err) + } + + // Define bucket and org options + bucketOpt := &ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: "bucket"}, + Init: &ast.StringLiteral{Value: b.Name}, + }, + } + orgOpt := &ast.OptionStatement{ + Assignment: &ast.VariableAssignment{ + ID: &ast.Identifier{Name: "org"}, + Init: &ast.StringLiteral{Value: b.Organization}, + }, + } + options := optionsAST.Copy().(*ast.File) + options.Body = append([]ast.Statement{bucketOpt, orgOpt}, options.Body...) + + // Add options to pkg + pkg.Files = append(pkg.Files, options) + + // Add testing.inspect call to ensure the data is loaded + inspectCalls := stdlib.TestingInspectCalls(pkg) + pkg.Files = append(pkg.Files, inspectCalls) + + req := &query.Request{ + OrganizationID: l.Org.ID, + Compiler: lang.ASTCompiler{AST: pkg}, + } + if r, err := l.FluxService().Query(ctx, req); err != nil { + t.Fatal(err) + } else { + for r.More() { + v := r.Next() + if err := v.Tables().Do(func(tbl flux.Table) error { + return nil + }); err != nil { + t.Error(err) + } + } + } + + // quirk: our execution engine doesn't guarantee the order of execution for disconnected DAGS + // so that our function-with-side effects call to `to` may run _after_ the test instead of before. + // running twice makes sure that `to` happens at least once before we run the test. + // this time we use a call to `run` so that the assertion error is triggered + runCalls := stdlib.TestingRunCalls(pkg) + pkg.Files[len(pkg.Files)-1] = runCalls + r, err := l.FluxService().Query(ctx, req) + if err != nil { + t.Fatal(err) + } + + for r.More() { + v := r.Next() + if err := v.Tables().Do(func(tbl flux.Table) error { + return nil + }); err != nil { + t.Error(err) + } + } + if err := r.Err(); err != nil { + t.Error(err) + // Replace the testing.run calls with testing.inspect calls. + pkg.Files[len(pkg.Files)-1] = inspectCalls + r, err := l.FluxService().Query(ctx, req) + if err != nil { + t.Fatal(err) + } + var out bytes.Buffer + defer func() { + if t.Failed() { + scanner := bufio.NewScanner(&out) + for scanner.Scan() { + t.Log(scanner.Text()) + } + } + }() + for r.More() { + v := r.Next() + err := execute.FormatResult(&out, v) + if err != nil { + t.Error(err) + } + } + if err := r.Err(); err != nil { + t.Error(err) + } + } +} + +// Launcher is a test wrapper for main.Launcher. +type Launcher struct { + *launcher.Launcher + + // Root temporary directory for all data. + Path string + + // Initialized after calling the Setup() helper. + User *platform.User + Org *platform.Organization + Bucket *platform.Bucket + Auth *platform.Authorization + + // Standard in/out/err buffers. + Stdin bytes.Buffer + Stdout bytes.Buffer + Stderr bytes.Buffer +} + +// NewLauncher returns a new instance of Launcher. +func NewLauncher() *Launcher { + l := &Launcher{Launcher: launcher.NewLauncher()} + l.Launcher.Stdin = &l.Stdin + l.Launcher.Stdout = &l.Stdout + l.Launcher.Stderr = &l.Stderr + if testing.Verbose() { + l.Launcher.Stdout = io.MultiWriter(l.Launcher.Stdout, os.Stdout) + l.Launcher.Stderr = io.MultiWriter(l.Launcher.Stderr, os.Stderr) + } + + path, err := ioutil.TempDir("", "") + if err != nil { + panic(err) + } + l.Path = path + return l +} + +// RunMainOrFail initializes and starts the server. +func RunMainOrFail(tb testing.TB, ctx context.Context, args ...string) *Launcher { + tb.Helper() + l := NewLauncher() + if err := l.Run(ctx, args...); err != nil { + tb.Fatal(err) + } + return l +} + +// Run executes the program with additional arguments to set paths and ports. +func (l *Launcher) Run(ctx context.Context, args ...string) error { + args = append(args, "--bolt-path", filepath.Join(l.Path, "influxd.bolt")) + args = append(args, "--protos-path", filepath.Join(l.Path, "protos")) + args = append(args, "--engine-path", filepath.Join(l.Path, "engine")) + args = append(args, "--http-bind-address", "127.0.0.1:0") + args = append(args, "--log-level", "debug") + return l.Launcher.Run(ctx, args...) +} + +// Shutdown stops the program and cleans up temporary paths. +func (l *Launcher) Shutdown(ctx context.Context) error { + l.Cancel() + l.Launcher.Shutdown(ctx) + return os.RemoveAll(l.Path) +} + +// ShutdownOrFail stops the program and cleans up temporary paths. Fail on error. +func (l *Launcher) ShutdownOrFail(tb testing.TB, ctx context.Context) { + tb.Helper() + if err := l.Shutdown(ctx); err != nil { + tb.Fatal(err) + } +} + +// SetupOrFail creates a new user, bucket, org, and auth token. Fail on error. +func (l *Launcher) SetupOrFail(tb testing.TB) { + svc := &http.SetupService{Addr: l.URL()} + results, err := svc.Generate(ctx, &platform.OnboardingRequest{ + User: "USER", + Password: "PASSWORD", + Org: "ORG", + Bucket: "BUCKET", + }) + if err != nil { + tb.Fatal(err) + } + + l.User = results.User + l.Org = results.Org + l.Bucket = results.Bucket + l.Auth = results.Auth +} + +func (l *Launcher) FluxService() *http.FluxQueryService { + return &http.FluxQueryService{Addr: l.URL(), Token: l.Auth.Token} +} + +func (l *Launcher) BucketService() *http.BucketService { + return &http.BucketService{ + Addr: l.URL(), + Token: l.Auth.Token, + OpPrefix: bolt.OpPrefix, + } +} + +// MustNewHTTPRequest returns a new nethttp.Request with base URL and auth attached. Fail on error. +func (l *Launcher) MustNewHTTPRequest(method, rawurl, body string) *nethttp.Request { + req, err := nethttp.NewRequest(method, l.URL()+rawurl, strings.NewReader(body)) + if err != nil { + panic(err) + } + + req.Header.Set("Authorization", "Token "+l.Auth.Token) + return req +} diff --git a/query/stdlib/testing/testing.go b/query/stdlib/testing/testing.go new file mode 100644 index 0000000000..7603f836a0 --- /dev/null +++ b/query/stdlib/testing/testing.go @@ -0,0 +1 @@ +package testing diff --git a/storage/reads/reader.go b/storage/reads/reader.go index dcf6f26159..be34c211df 100644 --- a/storage/reads/reader.go +++ b/storage/reads/reader.go @@ -20,6 +20,7 @@ type storageTable interface { flux.Table Close() Cancel() + Statistics() cursors.CursorStats } type storeReader struct { @@ -30,7 +31,7 @@ func NewReader(s Store) influxdb.Reader { return &storeReader{s: s} } -func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, stop execute.Time) (flux.TableIterator, error) { +func (r *storeReader) Read(ctx context.Context, rs influxdb.ReadSpec, start, stop execute.Time) (influxdb.TableIterator, error) { var predicate *datatypes.Predicate if rs.Predicate != nil { p, err := toStoragePredicate(rs.Predicate) @@ -57,10 +58,10 @@ type tableIterator struct { s Store readSpec influxdb.ReadSpec predicate *datatypes.Predicate - stats flux.Statistics + stats cursors.CursorStats } -func (bi *tableIterator) Statistics() flux.Statistics { return bi.stats } +func (bi *tableIterator) Statistics() cursors.CursorStats { return bi.stats } func (bi *tableIterator) Do(f func(flux.Table) error) error { src, err := bi.s.GetSource(bi.readSpec) @@ -192,7 +193,9 @@ READ: } table.Close() - bi.stats = bi.stats.Add(table.Statistics()) + stats := table.Statistics() + bi.stats.ScannedValues += stats.ScannedValues + bi.stats.ScannedBytes += stats.ScannedBytes table = nil } return rs.Err() @@ -316,6 +319,9 @@ READ: } table.Close() + stats := table.Statistics() + bi.stats.ScannedValues += stats.ScannedValues + bi.stats.ScannedBytes += stats.ScannedBytes table = nil gc = rs.Next() diff --git a/storage/reads/table.gen.go b/storage/reads/table.gen.go index 957b8d8fe5..6d7960fa8b 100644 --- a/storage/reads/table.gen.go +++ b/storage/reads/table.gen.go @@ -7,12 +7,12 @@ package reads import ( - "github.com/influxdata/flux/arrow" - "github.com/influxdata/flux/memory" "sync" "github.com/influxdata/flux" + "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/memory" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb/cursors" "github.com/pkg/errors" @@ -57,15 +57,15 @@ func (t *floatTable) Close() { t.mu.Unlock() } -func (t *floatTable) Statistics() flux.Statistics { +func (t *floatTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -233,12 +233,12 @@ func (t *floatGroupTable) advanceCursor() bool { return false } -func (t *floatGroupTable) Statistics() flux.Statistics { +func (t *floatGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := t.cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -283,15 +283,15 @@ func (t *integerTable) Close() { t.mu.Unlock() } -func (t *integerTable) Statistics() flux.Statistics { +func (t *integerTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -459,12 +459,12 @@ func (t *integerGroupTable) advanceCursor() bool { return false } -func (t *integerGroupTable) Statistics() flux.Statistics { +func (t *integerGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := t.cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -509,15 +509,15 @@ func (t *unsignedTable) Close() { t.mu.Unlock() } -func (t *unsignedTable) Statistics() flux.Statistics { +func (t *unsignedTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -685,12 +685,12 @@ func (t *unsignedGroupTable) advanceCursor() bool { return false } -func (t *unsignedGroupTable) Statistics() flux.Statistics { +func (t *unsignedGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := t.cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -735,15 +735,15 @@ func (t *stringTable) Close() { t.mu.Unlock() } -func (t *stringTable) Statistics() flux.Statistics { +func (t *stringTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -911,12 +911,12 @@ func (t *stringGroupTable) advanceCursor() bool { return false } -func (t *stringGroupTable) Statistics() flux.Statistics { +func (t *stringGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := t.cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -961,15 +961,15 @@ func (t *booleanTable) Close() { t.mu.Unlock() } -func (t *booleanTable) Statistics() flux.Statistics { +func (t *booleanTable) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -1137,12 +1137,12 @@ func (t *booleanGroupTable) advanceCursor() bool { return false } -func (t *booleanGroupTable) Statistics() flux.Statistics { +func (t *booleanGroupTable) Statistics() cursors.CursorStats { if t.cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := t.cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } diff --git a/storage/reads/table.gen.go.tmpl b/storage/reads/table.gen.go.tmpl index 47a2558844..3140f0ccd6 100644 --- a/storage/reads/table.gen.go.tmpl +++ b/storage/reads/table.gen.go.tmpl @@ -1,12 +1,12 @@ package reads import ( - "github.com/influxdata/flux/arrow" - "github.com/influxdata/flux/memory" "sync" "github.com/influxdata/flux" + "github.com/influxdata/flux/arrow" "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/memory" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb/cursors" "github.com/pkg/errors" @@ -51,15 +51,15 @@ func (t *{{.name}}Table) Close() { t.mu.Unlock() } -func (t *{{.name}}Table) Statistics() flux.Statistics { +func (t *{{.name}}Table) Statistics() cursors.CursorStats { t.mu.Lock() defer t.mu.Unlock() cur := t.cur if cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } @@ -227,12 +227,12 @@ func (t *{{.name}}GroupTable) advanceCursor() bool { return false } -func (t *{{.name}}GroupTable) Statistics() flux.Statistics { +func (t *{{.name}}GroupTable) Statistics() cursors.CursorStats { if t.cur == nil { - return flux.Statistics{} + return cursors.CursorStats{} } cs := t.cur.Stats() - return flux.Statistics{ + return cursors.CursorStats{ ScannedValues: cs.ScannedValues, ScannedBytes: cs.ScannedBytes, } diff --git a/storage/reads/table.go b/storage/reads/table.go index a4e4dcfe51..665f2c3a10 100644 --- a/storage/reads/table.go +++ b/storage/reads/table.go @@ -211,7 +211,7 @@ func newTableNoPoints( func (t *tableNoPoints) Close() {} -func (t *tableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} } +func (t *tableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} } func (t *tableNoPoints) Do(f func(flux.ColReader) error) error { if t.isCancelled() { @@ -251,7 +251,7 @@ func (t *groupTableNoPoints) Do(f func(flux.ColReader) error) error { return t.err } -func (t *groupTableNoPoints) Statistics() flux.Statistics { return flux.Statistics{} } +func (t *groupTableNoPoints) Statistics() cursors.CursorStats { return cursors.CursorStats{} } func (t *floatTable) toArrowBuffer(vs []float64) *array.Float64 { return arrow.NewFloat(vs, &memory.Allocator{}) diff --git a/ui/package-lock.json b/ui/package-lock.json index 6d4b1ddfe6..19fe892851 100644 --- a/ui/package-lock.json +++ b/ui/package-lock.json @@ -11029,7 +11029,7 @@ "dependencies": { "minimist": { "version": "0.0.10", - "resolved": "https://registry.npmjs.org/minimist/-/minimist-0.0.10.tgz", + "resolved": "http://registry.npmjs.org/minimist/-/minimist-0.0.10.tgz", "integrity": "sha1-3j+YVD2/lggr5IrRoMfNqDYwHc8=", "dev": true },