diff --git a/Gopkg.lock b/Gopkg.lock index 6cfa6a6ee8..e87c9b53ad 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -463,7 +463,7 @@ [[projects]] branch = "master" - digest = "1:eb7bc670810e9d06d8b42574de77248f0f28ae4cea2ee84d5b4fc70c1dd16a4c" + digest = "1:531dfd2544e63009087e1f01e0ec878e84d80c1890dd4975a55e30946afd4caa" name = "github.com/influxdata/flux" packages = [ ".", @@ -478,6 +478,7 @@ "functions/storage", "interpreter", "iocounter", + "lang", "options", "parser", "plan", @@ -488,7 +489,7 @@ "values", ] pruneopts = "UT" - revision = "1de8e9acfe7dc7a40c19ac63f15bddc3fdfc49be" + revision = "54c893986878754089554512132439b943dc6b47" [[projects]] branch = "platform" @@ -1244,6 +1245,7 @@ "github.com/influxdata/flux/functions", "github.com/influxdata/flux/functions/storage", "github.com/influxdata/flux/iocounter", + "github.com/influxdata/flux/lang", "github.com/influxdata/flux/options", "github.com/influxdata/flux/parser", "github.com/influxdata/flux/plan", diff --git a/cmd/fluxd/main.go b/cmd/fluxd/main.go index 68e1f5ab9c..b00c0cae1a 100644 --- a/cmd/fluxd/main.go +++ b/cmd/fluxd/main.go @@ -8,7 +8,6 @@ import ( "runtime" "strings" - "github.com/influxdata/flux" "github.com/influxdata/flux/control" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/functions" @@ -17,7 +16,9 @@ import ( "github.com/influxdata/platform" "github.com/influxdata/platform/http" "github.com/influxdata/platform/kit/prom" + "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" + pcontrol "github.com/influxdata/platform/query/control" "github.com/influxdata/platform/query/functions/storage/pb" "github.com/influxdata/platform/snowflake" pzap "github.com/influxdata/platform/zap" @@ -103,7 +104,7 @@ func fluxF(cmd *cobra.Command, args []string) { logger.Error("error injecting dependencies", zap.Error(err)) os.Exit(1) } - c := control.New(config) + c := pcontrol.New(config) reg.MustRegister(c.PrometheusCollectors()...) orgName, err := getStrList("ORGANIZATION_NAME") @@ -114,8 +115,8 @@ func fluxF(cmd *cobra.Command, args []string) { queryHandler := http.NewExternalQueryHandler() - queryHandler.ProxyQueryService = flux.ProxyQueryServiceBridge{ - QueryService: flux.QueryServiceBridge{ + queryHandler.ProxyQueryService = query.ProxyQueryServiceBridge{ + QueryService: query.QueryServiceBridge{ AsyncQueryService: c, }, } @@ -163,7 +164,7 @@ func injectDeps(deps execute.Dependencies) error { return functions.InjectFromDependencies(deps, storage.Dependencies{ Reader: sr, BucketLookup: bucketLookup{}, - OrganizationLookup: flux.FromOrganizationService(&orgSvc), + OrganizationLookup: query.FromOrganizationService(&orgSvc), }) } diff --git a/cmd/influx/query.go b/cmd/influx/query.go index 9270b1e806..4b948ccf31 100644 --- a/cmd/influx/query.go +++ b/cmd/influx/query.go @@ -5,13 +5,13 @@ import ( "os" "strings" - "github.com/influxdata/flux" "github.com/influxdata/flux/execute" "github.com/influxdata/flux/functions" "github.com/influxdata/flux/functions/storage" "github.com/influxdata/flux/repl" "github.com/influxdata/platform" "github.com/influxdata/platform/http" + "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" "github.com/influxdata/platform/query/functions/storage/pb" "github.com/spf13/cobra" @@ -66,12 +66,6 @@ func fluxQueryF(cmd *cobra.Command, args []string) { os.Exit(1) } - org, err := orgID(queryFlags.OrgID) - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - buckets, err := bucketService(flags.host, flags.token) if err != nil { fmt.Fprintln(os.Stderr, err) @@ -84,7 +78,7 @@ func fluxQueryF(cmd *cobra.Command, args []string) { os.Exit(1) } - r, err := getFluxREPL(hosts, buckets, orgs, org, queryFlags.Verbose) + r, err := getFluxREPL(hosts, buckets, orgs, queryFlags.Verbose) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -99,8 +93,8 @@ func fluxQueryF(cmd *cobra.Command, args []string) { func injectDeps(deps execute.Dependencies, hosts storage.Reader, buckets platform.BucketService, orgs platform.OrganizationService) error { return functions.InjectFromDependencies(deps, storage.Dependencies{ Reader: hosts, - BucketLookup: flux.FromBucketService(buckets), - OrganizationLookup: flux.FromOrganizationService(orgs), + BucketLookup: query.FromBucketService(buckets), + OrganizationLookup: query.FromOrganizationService(orgs), }) } diff --git a/cmd/influx/repl.go b/cmd/influx/repl.go index 90e2e0aa76..6dcb3d9128 100644 --- a/cmd/influx/repl.go +++ b/cmd/influx/repl.go @@ -57,12 +57,6 @@ func replF(cmd *cobra.Command, args []string) { os.Exit(1) } - org, err := orgID(replFlags.OrgID) - if err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - buckets, err := bucketService(flags.host, flags.token) if err != nil { fmt.Fprintln(os.Stderr, err) @@ -75,7 +69,7 @@ func replF(cmd *cobra.Command, args []string) { os.Exit(1) } - r, err := getFluxREPL(hosts, buckets, orgs, org, replFlags.Verbose) + r, err := getFluxREPL(hosts, buckets, orgs, replFlags.Verbose) if err != nil { fmt.Fprintln(os.Stderr, err) os.Exit(1) @@ -84,7 +78,7 @@ func replF(cmd *cobra.Command, args []string) { r.Run() } -func getFluxREPL(storageHosts storage.Reader, buckets platform.BucketService, orgs platform.OrganizationService, org platform.ID, verbose bool) (*repl.REPL, error) { +func getFluxREPL(storageHosts storage.Reader, buckets platform.BucketService, orgs platform.OrganizationService, verbose bool) (*repl.REPL, error) { conf := control.Config{ ExecutorDependencies: make(execute.Dependencies), ConcurrencyQuota: runtime.NumCPU() * 2, @@ -97,5 +91,5 @@ func getFluxREPL(storageHosts storage.Reader, buckets platform.BucketService, or } c := control.New(conf) - return repl.New(c, org), nil + return repl.New(c), nil } diff --git a/cmd/influxd/main.go b/cmd/influxd/main.go index f6497d8326..ef9d358acb 100644 --- a/cmd/influxd/main.go +++ b/cmd/influxd/main.go @@ -14,8 +14,6 @@ import ( "syscall" "time" - "github.com/influxdata/flux" - "go.uber.org/zap" "github.com/influxdata/flux/control" @@ -27,7 +25,9 @@ import ( "github.com/influxdata/platform/http" "github.com/influxdata/platform/kit/prom" "github.com/influxdata/platform/nats" + "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" + pcontrol "github.com/influxdata/platform/query/control" "github.com/influxdata/platform/source" "github.com/influxdata/platform/task" taskbackend "github.com/influxdata/platform/task/backend" @@ -170,7 +170,7 @@ func platformF(cmd *cobra.Command, args []string) { sourceSvc = c } - var queryService flux.QueryService + var queryService query.QueryService { // TODO(lh): this is temporary until query endpoint is added here. config := control.Config{ @@ -180,8 +180,8 @@ func platformF(cmd *cobra.Command, args []string) { Verbose: false, } - queryService = flux.QueryServiceBridge{ - AsyncQueryService: control.New(config), + queryService = query.QueryServiceBridge{ + AsyncQueryService: pcontrol.New(config), } } diff --git a/http/external_query_handler.go b/http/external_query_handler.go index 7f5564b163..91b5c9bc96 100644 --- a/http/external_query_handler.go +++ b/http/external_query_handler.go @@ -4,8 +4,8 @@ import ( "fmt" "net/http" - "github.com/influxdata/flux" "github.com/influxdata/platform" + "github.com/influxdata/platform/query" "github.com/julienschmidt/httprouter" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -18,7 +18,7 @@ type ExternalQueryHandler struct { Logger *zap.Logger - ProxyQueryService flux.ProxyQueryService + ProxyQueryService query.ProxyQueryService OrganizationService platform.OrganizationService } diff --git a/http/influxdb/source_proxy_query_service.go b/http/influxdb/source_proxy_query_service.go index 08c744e0d6..6f7621728b 100644 --- a/http/influxdb/source_proxy_query_service.go +++ b/http/influxdb/source_proxy_query_service.go @@ -10,8 +10,10 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/lang" "github.com/influxdata/platform" platformhttp "github.com/influxdata/platform/http" + "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/influxql" ) @@ -23,18 +25,18 @@ type SourceProxyQueryService struct { platform.V1SourceFields } -func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { switch req.Request.Compiler.CompilerType() { case influxql.CompilerType: return s.influxQuery(ctx, w, req) - case flux.FluxCompilerType: + case lang.FluxCompilerType: return s.fluxQuery(ctx, w, req) } return 0, fmt.Errorf("compiler type not supported") } -func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { if len(s.FluxURL) == 0 { return 0, fmt.Errorf("fluxURL from source cannot be empty if the compiler type is flux") } @@ -47,12 +49,12 @@ func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, re }{} switch c := req.Request.Compiler.(type) { - case flux.FluxCompiler: + case lang.FluxCompiler: request.Query = c.Query - request.Type = flux.FluxCompilerType - case flux.SpecCompiler: + request.Type = lang.FluxCompilerType + case lang.SpecCompiler: request.Spec = c.Spec - request.Type = flux.SpecCompilerType + request.Type = lang.SpecCompilerType default: return 0, fmt.Errorf("compiler type not supported: %s", c.CompilerType()) } @@ -102,7 +104,7 @@ func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, re return io.Copy(w, resp.Body) } -func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { if len(s.URL) == 0 { return 0, fmt.Errorf("URL from source cannot be empty if the compiler type is influxql") } diff --git a/http/proxy_query_service.go b/http/proxy_query_service.go index aa3fb6904c..ebeb2a2771 100644 --- a/http/proxy_query_service.go +++ b/http/proxy_query_service.go @@ -9,6 +9,7 @@ import ( "net/http" "github.com/influxdata/flux" + "github.com/influxdata/platform/query" "github.com/julienschmidt/httprouter" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -23,7 +24,7 @@ type ProxyQueryHandler struct { Logger *zap.Logger - ProxyQueryService flux.ProxyQueryService + ProxyQueryService query.ProxyQueryService CompilerMappings flux.CompilerMappings DialectMappings flux.DialectMappings @@ -48,7 +49,7 @@ type HTTPDialect interface { func (h *ProxyQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - var req flux.ProxyRequest + var req query.ProxyRequest req.WithCompilerMappings(h.CompilerMappings) req.WithDialectMappings(h.DialectMappings) if err := json.NewDecoder(r.Body).Decode(&req); err != nil { @@ -89,7 +90,7 @@ type ProxyQueryService struct { InsecureSkipVerify bool } -func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { u, err := newURL(s.Addr, proxyQueryPath) if err != nil { return 0, err diff --git a/http/query.go b/http/query.go index 3aadb64a2d..0ddae83399 100644 --- a/http/query.go +++ b/http/query.go @@ -9,8 +9,10 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/lang" "github.com/influxdata/platform" "github.com/influxdata/platform/kit/errors" + "github.com/influxdata/platform/query" ) // QueryRequest is a flux query request. @@ -91,18 +93,18 @@ func (r QueryRequest) Validate() error { } // ProxyRequest returns a request to proxy from the flux. -func (r QueryRequest) ProxyRequest() (*flux.ProxyRequest, error) { +func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) { if err := r.Validate(); err != nil { return nil, err } // Query is preferred over spec var compiler flux.Compiler if r.Query != "" { - compiler = flux.FluxCompiler{ + compiler = lang.FluxCompiler{ Query: r.Query, } } else if r.Spec != nil { - compiler = flux.SpecCompiler{ + compiler = lang.SpecCompiler{ Spec: r.Spec, } } @@ -116,8 +118,8 @@ func (r QueryRequest) ProxyRequest() (*flux.ProxyRequest, error) { // TODO(nathanielc): Use commentPrefix and dateTimeFormat // once they are supported. - return &flux.ProxyRequest{ - Request: flux.Request{ + return &query.ProxyRequest{ + Request: query.Request{ OrganizationID: r.org.ID, Compiler: compiler, }, @@ -147,7 +149,7 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ return &req, err } -func decodeProxyQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*flux.ProxyRequest, error) { +func decodeProxyQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*query.ProxyRequest, error) { req, err := decodeQueryRequest(ctx, r, svc) if err != nil { return nil, err diff --git a/http/query_handler.go b/http/query_handler.go index 2098962efe..08e3bdd42d 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -8,10 +8,10 @@ import ( "io" "net/http" - "github.com/influxdata/flux" "github.com/influxdata/platform" pcontext "github.com/influxdata/platform/context" "github.com/influxdata/platform/kit/errors" + "github.com/influxdata/platform/query" "github.com/julienschmidt/httprouter" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -29,7 +29,7 @@ type FluxHandler struct { AuthorizationService platform.AuthorizationService OrganizationService platform.OrganizationService - ProxyQueryService flux.ProxyQueryService + ProxyQueryService query.ProxyQueryService } // NewFluxHandler returns a new handler at /v2/query for flux queries. @@ -103,7 +103,7 @@ type FluxService struct { } // Query runs a flux query against a influx server and sends the results to the io.Writer. -func (s *FluxService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { u, err := newURL(s.URL, fluxPath) if err != nil { return 0, err diff --git a/http/query_service.go b/http/query_service.go index 3f7300297b..1d3b564033 100644 --- a/http/query_service.go +++ b/http/query_service.go @@ -8,6 +8,7 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/csv" + "github.com/influxdata/platform/query" "github.com/julienschmidt/httprouter" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" @@ -26,7 +27,7 @@ type QueryHandler struct { csvDialect csv.Dialect - QueryService flux.QueryService + QueryService query.QueryService CompilerMappings flux.CompilerMappings } @@ -54,7 +55,7 @@ func (h *QueryHandler) handlePing(w http.ResponseWriter, r *http.Request) { func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - var req flux.Request + var req query.Request req.WithCompilerMappings(h.CompilerMappings) if err := json.NewDecoder(r.Body).Decode(&req); err != nil { EncodeError(ctx, err, w) @@ -125,7 +126,7 @@ type QueryService struct { InsecureSkipVerify bool } -func (s *QueryService) Query(ctx context.Context, req *flux.Request) (flux.ResultIterator, error) { +func (s *QueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) { u, err := newURL(s.Addr, queryPath) if err != nil { return nil, err diff --git a/http/source_proxy_service.go b/http/source_proxy_service.go index e62ac4bafb..d84a6d743a 100644 --- a/http/source_proxy_service.go +++ b/http/source_proxy_service.go @@ -10,8 +10,9 @@ import ( "net/url" "strings" - "github.com/influxdata/flux" + "github.com/influxdata/flux/lang" "github.com/influxdata/platform" + "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/influxql" ) @@ -21,17 +22,17 @@ type SourceProxyQueryService struct { platform.SourceFields } -func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { switch req.Request.Compiler.CompilerType() { case influxql.CompilerType: return s.queryInfluxQL(ctx, w, req) - case flux.FluxCompilerType: + case lang.FluxCompilerType: return s.queryFlux(ctx, w, req) } return 0, fmt.Errorf("compiler type not supported") } -func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { u, err := newURL(s.URL, "/v1/query") if err != nil { return 0, err @@ -61,7 +62,7 @@ func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, re return io.Copy(w, resp.Body) } -func (s *SourceProxyQueryService) queryInfluxQL(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *SourceProxyQueryService) queryInfluxQL(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { compiler, ok := req.Request.Compiler.(*influxql.Compiler) if !ok { diff --git a/http/source_service.go b/http/source_service.go index a96e92603d..2685377e44 100644 --- a/http/source_service.go +++ b/http/source_service.go @@ -12,8 +12,10 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/lang" "github.com/influxdata/platform" kerrors "github.com/influxdata/platform/kit/errors" + "github.com/influxdata/platform/query" "github.com/influxdata/platform/query/influxql" "github.com/julienschmidt/httprouter" ) @@ -70,7 +72,7 @@ type SourceHandler struct { // TODO(desa): this was done so in order to remove an import cycle and to allow // for http mocking. NewBucketService func(s *platform.Source) (platform.BucketService, error) - NewQueryService func(s *platform.Source) (flux.ProxyQueryService, error) + NewQueryService func(s *platform.Source) (query.ProxyQueryService, error) } // NewSourceHandler returns a new instance of SourceHandler. @@ -81,7 +83,7 @@ func NewSourceHandler() *SourceHandler { NewBucketService: func(s *platform.Source) (platform.BucketService, error) { return nil, fmt.Errorf("bucket service not set") }, - NewQueryService: func(s *platform.Source) (flux.ProxyQueryService, error) { + NewQueryService: func(s *platform.Source) (query.ProxyQueryService, error) { return nil, fmt.Errorf("query service not set") }, } @@ -99,7 +101,7 @@ func NewSourceHandler() *SourceHandler { return h } -func decodeSourceQueryRequest(r *http.Request) (*flux.ProxyRequest, error) { +func decodeSourceQueryRequest(r *http.Request) (*query.ProxyRequest, error) { // starts here request := struct { Spec *flux.Spec `json:"spec"` @@ -118,18 +120,18 @@ func decodeSourceQueryRequest(r *http.Request) (*flux.ProxyRequest, error) { return nil, err } - req := &flux.ProxyRequest{} + req := &query.ProxyRequest{} req.Dialect = request.Dialect req.Request.OrganizationID = request.OrganizationID switch request.Type { - case flux.FluxCompilerType: - req.Request.Compiler = flux.FluxCompiler{ + case lang.FluxCompilerType: + req.Request.Compiler = lang.FluxCompiler{ Query: request.Query, } - case flux.SpecCompilerType: - req.Request.Compiler = flux.SpecCompiler{ + case lang.SpecCompilerType: + req.Request.Compiler = lang.SpecCompiler{ Spec: request.Spec, } case influxql.CompilerType: diff --git a/query/bridges.go b/query/bridges.go new file mode 100644 index 0000000000..dcb0ad1319 --- /dev/null +++ b/query/bridges.go @@ -0,0 +1,36 @@ +package query + +import ( + "context" + "io" + + "github.com/influxdata/flux" +) + +// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface. +type QueryServiceBridge struct { + AsyncQueryService AsyncQueryService +} + +func (b QueryServiceBridge) Query(ctx context.Context, req *Request) (flux.ResultIterator, error) { + query, err := b.AsyncQueryService.Query(ctx, req) + if err != nil { + return nil, err + } + return flux.NewResultIteratorFromQuery(query), nil +} + +// ProxyQueryServiceBridge implements ProxyQueryService while consuming a QueryService interface. +type ProxyQueryServiceBridge struct { + QueryService QueryService +} + +func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (int64, error) { + results, err := b.QueryService.Query(ctx, &req.Request) + if err != nil { + return 0, err + } + defer results.Cancel() + encoder := req.Dialect.Encoder() + return encoder.Encode(w, results) +} diff --git a/query/control/controller.go b/query/control/controller.go new file mode 100644 index 0000000000..7b120e67b5 --- /dev/null +++ b/query/control/controller.go @@ -0,0 +1,39 @@ +package control + +import ( + "context" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/control" + "github.com/influxdata/platform/query" + "github.com/prometheus/client_golang/prometheus" +) + +// orgLabel is the metric label to use in the controller +const orgLabel = "org" + +// Controller implements AsyncQueryService by consuming a control.Controller. +type Controller struct { + c *control.Controller +} + +// NewController creates a new Controller specific to platform. +func New(config control.Config) *Controller { + config.MetricLabelKeys = append(config.MetricLabelKeys, orgLabel) + c := control.New(config) + return &Controller{c: c} +} + +// Query satisifies the AsyncQueryService while ensuring the request is propogated on the context. +func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, error) { + // Set the request on the context so platform specific Flux operations can retrieve it later. + ctx = query.ContextWithRequest(ctx, req) + // Set the org label value for controller metrics + ctx = context.WithValue(ctx, orgLabel, req.OrganizationID.String()) + return c.c.Query(ctx, req.Compiler) +} + +// PrometheusCollectors satisifies the prom.PrometheusCollector interface. +func (c *Controller) PrometheusCollectors() []prometheus.Collector { + return c.c.PrometheusCollectors() +} diff --git a/query/dependency.go b/query/dependency.go new file mode 100644 index 0000000000..a6fdc54e75 --- /dev/null +++ b/query/dependency.go @@ -0,0 +1,56 @@ +package query + +import ( + "context" + + "github.com/influxdata/platform" +) + +// FromBucketService wraps an platform.BucketService in the BucketLookup interface. +func FromBucketService(srv platform.BucketService) *BucketLookup { + return &BucketLookup{ + BucketService: srv, + } +} + +// BucketLookup converts Flux bucket lookups into platform.BucketService calls. +type BucketLookup struct { + BucketService platform.BucketService +} + +// Lookup returns the bucket id and its existence given an org id and bucket name. +func (b *BucketLookup) Lookup(orgID platform.ID, name string) (platform.ID, bool) { + oid := platform.ID(orgID) + filter := platform.BucketFilter{ + OrganizationID: &oid, + Name: &name, + } + bucket, err := b.BucketService.FindBucket(context.Background(), filter) + if err != nil { + return nil, false + } + return bucket.ID, true +} + +// FromOrganizationService wraps a platform.OrganizationService in the OrganizationLookup interface. +func FromOrganizationService(srv platform.OrganizationService) *OrganizationLookup { + return &OrganizationLookup{OrganizationService: srv} +} + +// OrganizationLookup converts organization name lookups into platform.OrganizationService calls. +type OrganizationLookup struct { + OrganizationService platform.OrganizationService +} + +// Lookup returns the organization ID and its existence given an organization name. +func (o *OrganizationLookup) Lookup(ctx context.Context, name string) (platform.ID, bool) { + org, err := o.OrganizationService.FindOrganization( + ctx, + platform.OrganizationFilter{Name: &name}, + ) + + if err != nil { + return nil, false + } + return org.ID, true +} diff --git a/query/functions/query_test.go b/query/functions/query_test.go index 5d99d95027..f64d75df42 100644 --- a/query/functions/query_test.go +++ b/query/functions/query_test.go @@ -9,11 +9,12 @@ import ( "strings" "testing" - "github.com/influxdata/flux" "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/lang" "github.com/influxdata/flux/querytest" "github.com/influxdata/platform" "github.com/influxdata/platform/mock" + "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" "github.com/influxdata/platform/query/influxql" @@ -52,7 +53,7 @@ var skipTests = map[string]string{ "string_interp": "string interpolation not working as expected in flux (https://github.com/influxdata/platform/issues/404)", } -var pqs = querytest.GetProxyQueryServiceBridge() +var querier = querytest.NewQuerier() func withEachFluxFile(t testing.TB, fn func(prefix, caseName string)) { dir, err := os.Getwd() @@ -84,13 +85,13 @@ func Test_QueryEndToEnd(t *testing.T) { if skip { t.Skip(reason) } - testFlux(t, pqs, prefix, ".flux") + testFlux(t, querier, prefix, ".flux") }) t.Run(influxqlName, func(t *testing.T) { if skip { t.Skip(reason) } - testInfluxQL(t, pqs, prefix, ".influxql") + testInfluxQL(t, querier, prefix, ".influxql") }) }) } @@ -111,7 +112,7 @@ func Benchmark_QueryEndToEnd(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - testFlux(b, pqs, prefix, ".flux") + testFlux(b, querier, prefix, ".flux") } }) b.Run(influxqlName, func(b *testing.B) { @@ -121,13 +122,13 @@ func Benchmark_QueryEndToEnd(b *testing.B) { b.ResetTimer() b.ReportAllocs() for i := 0; i < b.N; i++ { - testInfluxQL(b, pqs, prefix, ".influxql") + testInfluxQL(b, querier, prefix, ".influxql") } }) }) } -func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) { +func testFlux(t testing.TB, querier *querytest.Querier, prefix, queryExt string) { q, err := ioutil.ReadFile(prefix + queryExt) if err != nil { t.Fatal(err) @@ -139,11 +140,11 @@ func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) t.Fatal(err) } - compiler := flux.FluxCompiler{ + compiler := lang.FluxCompiler{ Query: string(q), } - req := &flux.ProxyRequest{ - Request: flux.Request{ + req := &query.ProxyRequest{ + Request: query.Request{ Compiler: querytest.FromCSVCompiler{ Compiler: compiler, InputFile: csvInFilename, @@ -152,10 +153,10 @@ func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) Dialect: csv.DefaultDialect(), } - QueryTestCheckSpec(t, pqs, req, string(csvOut)) + QueryTestCheckSpec(t, querier, req, string(csvOut)) } -func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) { +func testInfluxQL(t testing.TB, querier *querytest.Querier, prefix, queryExt string) { q, err := ioutil.ReadFile(prefix + queryExt) if err != nil { if !os.IsNotExist(err) { @@ -174,8 +175,8 @@ func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt str compiler.Cluster = "cluster" compiler.DB = "db0" compiler.Query = string(q) - req := &flux.ProxyRequest{ - Request: flux.Request{ + req := &query.ProxyRequest{ + Request: query.Request{ Compiler: querytest.FromCSVCompiler{ Compiler: compiler, InputFile: csvInFilename, @@ -183,7 +184,7 @@ func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt str }, Dialect: csv.DefaultDialect(), } - QueryTestCheckSpec(t, pqs, req, string(csvOut)) + QueryTestCheckSpec(t, querier, req, string(csvOut)) // Rerun test for InfluxQL JSON dialect req.Dialect = new(influxql.Dialect) @@ -195,14 +196,14 @@ func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt str } t.Skip("influxql expected json is missing") } - QueryTestCheckSpec(t, pqs, req, string(jsonOut)) + QueryTestCheckSpec(t, querier, req, string(jsonOut)) } -func QueryTestCheckSpec(t testing.TB, pqs flux.ProxyQueryService, req *flux.ProxyRequest, want string) { +func QueryTestCheckSpec(t testing.TB, querier *querytest.Querier, req *query.ProxyRequest, want string) { t.Helper() var buf bytes.Buffer - _, err := pqs.Query(context.Background(), &buf, req) + _, err := querier.Query(context.Background(), &buf, req.Request.Compiler, req.Dialect) if err != nil { t.Errorf("failed to run query: %v", err) return diff --git a/query/logger.go b/query/logger.go new file mode 100644 index 0000000000..0821082cc2 --- /dev/null +++ b/query/logger.go @@ -0,0 +1,51 @@ +package query + +import ( + "time" + + "github.com/influxdata/flux" + "github.com/influxdata/platform" +) + +// Logger persists metadata about executed queries. +type Logger interface { + Log(Log) error +} + +// Log captures a query and any relevant metadata for the query execution. +type Log struct { + // Time is the time the query was completed + Time time.Time + // OrganizationID is the ID of the organization that requested the query + OrganizationID platform.ID + // Error is any error encountered by the query + Error error + + // ProxyRequest is the query request + ProxyRequest *ProxyRequest + // ResponseSize is the size in bytes of the query response + ResponseSize int64 + // Statistics is a set of statistics about the query execution + Statistics flux.Statistics +} + +// Redact removes any sensitive information before logging +func (q *Log) Redact() { + if q.ProxyRequest != nil && q.ProxyRequest.Request.Authorization != nil { + // Make shallow copy of request + request := new(ProxyRequest) + *request = *q.ProxyRequest + + // Make shallow copy of authorization + auth := new(platform.Authorization) + *auth = *q.ProxyRequest.Request.Authorization + // Redact authorization token + auth.Token = "" + + // Apply redacted authorization + request.Request.Authorization = auth + + // Apply redacted request + q.ProxyRequest = request + } +} diff --git a/query/logging.go b/query/logging.go new file mode 100644 index 0000000000..c55ad075da --- /dev/null +++ b/query/logging.go @@ -0,0 +1,59 @@ +package query + +import ( + "context" + "fmt" + "io" + "time" + + "github.com/influxdata/flux" +) + +// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface. +type LoggingServiceBridge struct { + QueryService QueryService + QueryLogger Logger +} + +// Query executes and logs the query. +func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (n int64, err error) { + var stats flux.Statistics + defer func() { + r := recover() + if r != nil { + err = fmt.Errorf("panic: %v", r) + } + log := Log{ + OrganizationID: req.Request.OrganizationID, + ProxyRequest: req, + ResponseSize: n, + Time: time.Now(), + Statistics: stats, + } + if err != nil { + log.Error = err + } + s.QueryLogger.Log(log) + }() + + results, err := s.QueryService.Query(ctx, &req.Request) + if err != nil { + return 0, err + } + // Check if this result iterator reports stats. We call this defer before cancel because + // the query needs to be finished before it will have valid statistics. + if s, ok := results.(flux.Statisticser); ok { + defer func() { + stats = s.Statistics() + }() + } + defer results.Cancel() + + encoder := req.Dialect.Encoder() + n, err = encoder.Encode(w, results) + if err != nil { + return n, err + } + // The results iterator may have had an error independent of encoding errors. + return n, results.Err() +} diff --git a/query/mock/service.go b/query/mock/service.go new file mode 100644 index 0000000000..8f5b11b299 --- /dev/null +++ b/query/mock/service.go @@ -0,0 +1,39 @@ +package mock + +import ( + "context" + "io" + + "github.com/influxdata/flux" + "github.com/influxdata/platform/query" +) + +// ProxyQueryService mocks the idep QueryService for testing. +type ProxyQueryService struct { + QueryF func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) +} + +// Query writes the results of the query request. +func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { + return s.QueryF(ctx, w, req) +} + +// QueryService mocks the idep QueryService for testing. +type QueryService struct { + QueryF func(ctx context.Context, req *query.Request) (flux.ResultIterator, error) +} + +// Query writes the results of the query request. +func (s *QueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) { + return s.QueryF(ctx, req) +} + +// AsyncQueryService mocks the idep QueryService for testing. +type AsyncQueryService struct { + QueryF func(ctx context.Context, req *query.Request) (flux.Query, error) +} + +// Query writes the results of the query request. +func (s *AsyncQueryService) Query(ctx context.Context, req *query.Request) (flux.Query, error) { + return s.QueryF(ctx, req) +} diff --git a/query/preauthorizer.go b/query/preauthorizer.go index 335039873c..bbfd0c0b1d 100644 --- a/query/preauthorizer.go +++ b/query/preauthorizer.go @@ -29,7 +29,7 @@ type preAuthorizer struct { // given the Authorization. Returns nil on success, and an error with an appropriate message otherwise. func (a *preAuthorizer) PreAuthorize(ctx context.Context, spec *flux.Spec, auth *platform.Authorization) error { - readBuckets, writeBuckets, err := spec.BucketsAccessed() + readBuckets, writeBuckets, err := BucketsAccessed(spec) if err != nil { return errors.Wrap(err, "Could not retrieve buckets for query.Spec") diff --git a/query/preauthorizer_test.go b/query/preauthorizer_test.go index dd26773f14..22a1671d5b 100644 --- a/query/preauthorizer_test.go +++ b/query/preauthorizer_test.go @@ -1,4 +1,4 @@ -package query +package query_test import ( "context" @@ -10,6 +10,7 @@ import ( "github.com/influxdata/platform" "github.com/influxdata/platform/kit/errors" "github.com/influxdata/platform/mock" + "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" ) @@ -40,7 +41,7 @@ func TestPreAuthorizer_PreAuthorize(t *testing.T) { // and no authorization auth := &platform.Authorization{Status: platform.Active} emptyBucketService := mock.NewBucketService() - preAuthorizer := NewPreAuthorizer(emptyBucketService) + preAuthorizer := query.NewPreAuthorizer(emptyBucketService) err = preAuthorizer.PreAuthorize(ctx, spec, auth) if diagnostic := cmp.Diff("Bucket service returned nil bucket", err.Error()); diagnostic != "" { @@ -55,7 +56,7 @@ func TestPreAuthorizer_PreAuthorize(t *testing.T) { ID: *id, }) - preAuthorizer = NewPreAuthorizer(bucketService) + preAuthorizer = query.NewPreAuthorizer(bucketService) err = preAuthorizer.PreAuthorize(ctx, spec, auth) if diagnostic := cmp.Diff(`No read permission for bucket: "my_bucket"`, err.Error()); diagnostic != "" { t.Errorf("Authorize message mismatch: -want/+got:\n%v", diagnostic) diff --git a/query/querytest/compile.go b/query/querytest/compile.go new file mode 100644 index 0000000000..d8f0a5d174 --- /dev/null +++ b/query/querytest/compile.go @@ -0,0 +1,85 @@ +package querytest + +import ( + "context" + "testing" + "time" + + "fmt" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/influxdata/flux" + "github.com/influxdata/flux/functions" + "github.com/influxdata/flux/semantic/semantictest" + "github.com/influxdata/platform" + "github.com/influxdata/platform/query" +) + +type BucketAwareQueryTestCase struct { + Name string + Raw string + Want *flux.Spec + WantErr bool + WantReadBuckets *[]platform.BucketFilter + WantWriteBuckets *[]platform.BucketFilter +} + +var opts = append( + semantictest.CmpOptions, + cmp.AllowUnexported(flux.Spec{}), + cmp.AllowUnexported(functions.JoinOpSpec{}), + cmpopts.IgnoreUnexported(flux.Spec{}), + cmpopts.IgnoreUnexported(functions.JoinOpSpec{}), +) + +func BucketAwareQueryTestHelper(t *testing.T, tc BucketAwareQueryTestCase) { + t.Helper() + + now := time.Now().UTC() + got, err := flux.Compile(context.Background(), tc.Raw, now) + if (err != nil) != tc.WantErr { + t.Errorf("error compiling spec error: %v, wantErr %v", err, tc.WantErr) + return + } + if tc.WantErr { + return + } + if tc.Want != nil { + tc.Want.Now = now + if !cmp.Equal(tc.Want, got, opts...) { + t.Errorf("unexpected specs -want/+got %s", cmp.Diff(tc.Want, got, opts...)) + } + } + + var gotReadBuckets, gotWriteBuckets []platform.BucketFilter + if tc.WantReadBuckets != nil || tc.WantWriteBuckets != nil { + gotReadBuckets, gotWriteBuckets, err = query.BucketsAccessed(got) + } + + if tc.WantReadBuckets != nil { + if diagnostic := verifyBuckets(*tc.WantReadBuckets, gotReadBuckets); diagnostic != "" { + t.Errorf("Could not verify read buckets: %v", diagnostic) + } + } + + if tc.WantWriteBuckets != nil { + if diagnostic := verifyBuckets(*tc.WantWriteBuckets, gotWriteBuckets); diagnostic != "" { + t.Errorf("Could not verify write buckets: %v", diagnostic) + } + } +} + +func verifyBuckets(wantBuckets, gotBuckets []platform.BucketFilter) string { + if len(wantBuckets) != len(gotBuckets) { + return fmt.Sprintf("Expected %v buckets but got %v", len(wantBuckets), len(gotBuckets)) + } + + for i, wantBucket := range wantBuckets { + if diagnostic := cmp.Diff(wantBucket, gotBuckets[i]); diagnostic != "" { + return fmt.Sprintf("Bucket mismatch: -want/+got:\n%v", diagnostic) + } + } + + return "" +} diff --git a/query/request.go b/query/request.go new file mode 100644 index 0000000000..0e7341c264 --- /dev/null +++ b/query/request.go @@ -0,0 +1,153 @@ +package query + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/influxdata/flux" + "github.com/influxdata/platform" +) + +// Request respresents the query to run. +type Request struct { + // Scope + Authorization *platform.Authorization `json:"authorization,omitempty"` + OrganizationID platform.ID `json:"organization_id"` + + // Command + + // Compiler converts the query to a specification to run against the data. + Compiler flux.Compiler `json:"compiler"` + + // compilerMappings maps compiler types to creation methods + compilerMappings flux.CompilerMappings +} + +// WithCompilerMappings sets the query type mappings on the request. +func (r *Request) WithCompilerMappings(mappings flux.CompilerMappings) { + r.compilerMappings = mappings +} + +// UnmarshalJSON populates the request from the JSON data. +// WithCompilerMappings must have been called or an error will occur. +func (r *Request) UnmarshalJSON(data []byte) error { + type Alias Request + raw := struct { + *Alias + CompilerType flux.CompilerType `json:"compiler_type"` + Compiler json.RawMessage `json:"compiler"` + }{ + Alias: (*Alias)(r), + } + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + createCompiler, ok := r.compilerMappings[raw.CompilerType] + if !ok { + return fmt.Errorf("unsupported compiler type %q", raw.CompilerType) + } + + c := createCompiler() + if err := json.Unmarshal(raw.Compiler, c); err != nil { + return err + } + r.Compiler = c + + return nil +} + +func (r Request) MarshalJSON() ([]byte, error) { + type Alias Request + raw := struct { + Alias + CompilerType flux.CompilerType `json:"compiler_type"` + }{ + Alias: (Alias)(r), + CompilerType: r.Compiler.CompilerType(), + } + return json.Marshal(raw) +} + +type contextKey struct{} + +var activeContextKey = contextKey{} + +// ContextWithRequest returns a new context with a reference to the request. +func ContextWithRequest(ctx context.Context, req *Request) context.Context { + return context.WithValue(ctx, activeContextKey, req) +} + +//RequestFromContext retrieves a *Request from a context. +// If not request exists on the context nil is returned. +func RequestFromContext(ctx context.Context) *Request { + v := ctx.Value(activeContextKey) + if v == nil { + return nil + } + return v.(*Request) +} + +// ProxyRequest specifies a query request and the dialect for the results. +type ProxyRequest struct { + // Request is the basic query request + Request Request `json:"request"` + + // Dialect is the result encoder + Dialect flux.Dialect `json:"dialect"` + + // dialectMappings maps dialect types to creation methods + dialectMappings flux.DialectMappings +} + +// WithCompilerMappings sets the compiler type mappings on the request. +func (r *ProxyRequest) WithCompilerMappings(mappings flux.CompilerMappings) { + r.Request.WithCompilerMappings(mappings) +} + +// WithDialectMappings sets the dialect type mappings on the request. +func (r *ProxyRequest) WithDialectMappings(mappings flux.DialectMappings) { + r.dialectMappings = mappings +} + +// UnmarshalJSON populates the request from the JSON data. +// WithCompilerMappings and WithDialectMappings must have been called or an error will occur. +func (r *ProxyRequest) UnmarshalJSON(data []byte) error { + type Alias ProxyRequest + raw := struct { + *Alias + DialectType flux.DialectType `json:"dialect_type"` + Dialect json.RawMessage `json:"dialect"` + }{ + Alias: (*Alias)(r), + } + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + createDialect, ok := r.dialectMappings[raw.DialectType] + if !ok { + return fmt.Errorf("unsupported dialect type %q", raw.DialectType) + } + + d := createDialect() + if err := json.Unmarshal(raw.Dialect, d); err != nil { + return err + } + r.Dialect = d + + return nil +} + +func (r ProxyRequest) MarshalJSON() ([]byte, error) { + type Alias ProxyRequest + raw := struct { + Alias + DialectType flux.DialectType `json:"dialect_type"` + }{ + Alias: (Alias)(r), + DialectType: r.Dialect.DialectType(), + } + return json.Marshal(raw) +} diff --git a/query/service.go b/query/service.go new file mode 100644 index 0000000000..cf140940e3 --- /dev/null +++ b/query/service.go @@ -0,0 +1,30 @@ +package query + +import ( + "context" + "io" + + "github.com/influxdata/flux" +) + +// QueryService represents a type capable of performing queries. +type QueryService interface { + // Query submits a query for execution returning a results iterator. + // Cancel must be called on any returned results to free resources. + Query(ctx context.Context, req *Request) (flux.ResultIterator, error) +} + +// AsyncQueryService represents a service for performing queries where the results are delivered asynchronously. +type AsyncQueryService interface { + // Query submits a query for execution returning immediately. + // Done must be called on any returned Query objects. + Query(ctx context.Context, req *Request) (flux.Query, error) +} + +// ProxyQueryService performs queries and encodes the result into a writer. +// The results are opaque to a ProxyQueryService. +type ProxyQueryService interface { + // Query performs the requested query and encodes the results into w. + // The number of bytes written to w is returned __independent__ of any error. + Query(ctx context.Context, w io.Writer, req *ProxyRequest) (int64, error) +} diff --git a/query/service_test.go b/query/service_test.go new file mode 100644 index 0000000000..f451da7d44 --- /dev/null +++ b/query/service_test.go @@ -0,0 +1,130 @@ +package query_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/influxdata/flux" + "github.com/influxdata/platform" + "github.com/influxdata/platform/query" +) + +var CmpOpts = []cmp.Option{ + cmpopts.IgnoreUnexported(query.ProxyRequest{}), + cmpopts.IgnoreUnexported(query.Request{}), +} + +type compilerA struct { + A string `json:"a"` +} + +func (c compilerA) Compile(ctx context.Context) (*flux.Spec, error) { + panic("not implemented") +} + +func (c compilerA) CompilerType() flux.CompilerType { + return "compilerA" +} + +var compilerMappings = flux.CompilerMappings{ + "compilerA": func() flux.Compiler { return new(compilerA) }, +} + +type dialectB struct { + B int `json:"b"` +} + +func (d dialectB) Encoder() flux.MultiResultEncoder { + panic("not implemented") +} + +func (d dialectB) DialectType() flux.DialectType { + return "dialectB" +} + +var dialectMappings = flux.DialectMappings{ + "dialectB": func() flux.Dialect { return new(dialectB) }, +} + +func TestRequest_JSON(t *testing.T) { + testCases := []struct { + name string + data string + want query.Request + }{ + { + name: "simple", + data: `{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"}`, + want: query.Request{ + OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA}, + Compiler: &compilerA{ + A: "my custom compiler", + }, + }, + }, + } + for _, tc := range testCases { + var r query.Request + r.WithCompilerMappings(compilerMappings) + + if err := json.Unmarshal([]byte(tc.data), &r); err != nil { + t.Fatal(err) + } + if !cmp.Equal(tc.want, r, CmpOpts...) { + t.Fatalf("unexpected request: -want/+got:\n%s", cmp.Diff(tc.want, r, CmpOpts...)) + } + marshalled, err := json.Marshal(r) + if err != nil { + t.Fatal(err) + } + if got, want := string(marshalled), tc.data; got != want { + t.Fatalf("unexpected marshalled request: -want/+got:\n%s", cmp.Diff(want, got)) + } + } +} + +func TestProxyRequest_JSON(t *testing.T) { + testCases := []struct { + name string + data string + want query.ProxyRequest + }{ + { + name: "simple", + data: `{"request":{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"},"dialect":{"b":42},"dialect_type":"dialectB"}`, + want: query.ProxyRequest{ + Request: query.Request{ + OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA}, + Compiler: &compilerA{ + A: "my custom compiler", + }, + }, + Dialect: &dialectB{ + B: 42, + }, + }, + }, + } + for _, tc := range testCases { + var pr query.ProxyRequest + pr.WithCompilerMappings(compilerMappings) + pr.WithDialectMappings(dialectMappings) + + if err := json.Unmarshal([]byte(tc.data), &pr); err != nil { + t.Fatal(err) + } + if !cmp.Equal(tc.want, pr, CmpOpts...) { + t.Fatalf("unexpected proxy request: -want/+got:\n%s", cmp.Diff(tc.want, pr, CmpOpts...)) + } + marshalled, err := json.Marshal(pr) + if err != nil { + t.Fatal(err) + } + if got, want := string(marshalled), tc.data; got != want { + t.Fatalf("unexpected marshalled proxy request: -want/+got:\n%s", cmp.Diff(want, got)) + } + } +} diff --git a/query/spec.go b/query/spec.go new file mode 100644 index 0000000000..c2e34d8ead --- /dev/null +++ b/query/spec.go @@ -0,0 +1,30 @@ +package query + +import ( + "github.com/influxdata/flux" + "github.com/influxdata/platform" +) + +// BucketAwareOperationSpec specifies an operation that reads or writes buckets +type BucketAwareOperationSpec interface { + BucketsAccessed() (readBuckets, writeBuckets []platform.BucketFilter) +} + +// BucketsAccessed returns the set of buckets read and written by a query spec +func BucketsAccessed(q *flux.Spec) (readBuckets, writeBuckets []platform.BucketFilter, err error) { + err = q.Walk(func(o *flux.Operation) error { + bucketAwareOpSpec, ok := o.Spec.(BucketAwareOperationSpec) + if ok { + opBucketsRead, opBucketsWritten := bucketAwareOpSpec.BucketsAccessed() + readBuckets = append(readBuckets, opBucketsRead...) + writeBuckets = append(writeBuckets, opBucketsWritten...) + } + return nil + }) + + if err != nil { + return nil, nil, err + } + + return readBuckets, writeBuckets, nil +} diff --git a/source/query.go b/source/query.go index 62c024d99c..ab128373f0 100644 --- a/source/query.go +++ b/source/query.go @@ -3,14 +3,14 @@ package source import ( "fmt" - "github.com/influxdata/flux" "github.com/influxdata/platform" "github.com/influxdata/platform/http" "github.com/influxdata/platform/http/influxdb" + "github.com/influxdata/platform/query" ) // NewQueryService creates a bucket service from a source. -func NewQueryService(s *platform.Source) (flux.ProxyQueryService, error) { +func NewQueryService(s *platform.Source) (query.ProxyQueryService, error) { switch s.Type { case platform.SelfSourceType: // TODO(fntlnz): this is supposed to call a query service directly locally, diff --git a/task/backend/executor/executor.go b/task/backend/executor/executor.go index 02c9b51d25..553eb34941 100644 --- a/task/backend/executor/executor.go +++ b/task/backend/executor/executor.go @@ -8,14 +8,16 @@ import ( "time" "github.com/influxdata/flux" + "github.com/influxdata/flux/lang" "github.com/influxdata/influxdb/logger" + "github.com/influxdata/platform/query" "github.com/influxdata/platform/task/backend" "go.uber.org/zap" ) // queryServiceExecutor is an implementation of backend.Executor that depends on a QueryService. type queryServiceExecutor struct { - svc flux.QueryService + svc query.QueryService st backend.Store logger *zap.Logger } @@ -25,7 +27,7 @@ var _ backend.Executor = (*queryServiceExecutor)(nil) // NewQueryServiceExecutor returns a new executor based on the given QueryService. // In general, you should prefer NewAsyncQueryServiceExecutor, as that code is smaller and simpler, // because asynchronous queries are more in line with the Executor interface. -func NewQueryServiceExecutor(logger *zap.Logger, svc flux.QueryService, st backend.Store) backend.Executor { +func NewQueryServiceExecutor(logger *zap.Logger, svc query.QueryService, st backend.Store) backend.Executor { return &queryServiceExecutor{logger: logger, svc: svc, st: st} } @@ -41,7 +43,7 @@ func (e *queryServiceExecutor) Execute(ctx context.Context, run backend.QueuedRu // syncRunPromise implements backend.RunPromise for a synchronous QueryService. type syncRunPromise struct { qr backend.QueuedRun - svc flux.QueryService + svc query.QueryService t *backend.StoreTask ctx context.Context cancel context.CancelFunc @@ -123,9 +125,9 @@ func (p *syncRunPromise) doQuery() { return } - req := &flux.Request{ + req := &query.Request{ OrganizationID: p.t.Org, - Compiler: flux.SpecCompiler{ + Compiler: lang.SpecCompiler{ Spec: spec, }, } @@ -159,7 +161,7 @@ func (p *syncRunPromise) cancelOnContextDone() { // asyncQueryServiceExecutor is an implementation of backend.Executor that depends on an AsyncQueryService. type asyncQueryServiceExecutor struct { - svc flux.AsyncQueryService + svc query.AsyncQueryService st backend.Store logger *zap.Logger } @@ -167,7 +169,7 @@ type asyncQueryServiceExecutor struct { var _ backend.Executor = (*asyncQueryServiceExecutor)(nil) // NewQueryServiceExecutor returns a new executor based on the given AsyncQueryService. -func NewAsyncQueryServiceExecutor(logger *zap.Logger, svc flux.AsyncQueryService, st backend.Store) backend.Executor { +func NewAsyncQueryServiceExecutor(logger *zap.Logger, svc query.AsyncQueryService, st backend.Store) backend.Executor { return &asyncQueryServiceExecutor{logger: logger, svc: svc, st: st} } @@ -182,9 +184,9 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que return nil, err } - req := &flux.Request{ + req := &query.Request{ OrganizationID: t.Org, - Compiler: flux.SpecCompiler{ + Compiler: lang.SpecCompiler{ Spec: spec, }, } diff --git a/task/backend/executor/executor_test.go b/task/backend/executor/executor_test.go index 2539fb6469..6baa703dec 100644 --- a/task/backend/executor/executor_test.go +++ b/task/backend/executor/executor_test.go @@ -13,8 +13,10 @@ import ( "github.com/influxdata/flux" "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/lang" "github.com/influxdata/flux/values" "github.com/influxdata/platform" + "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" "github.com/influxdata/platform/task/backend" "github.com/influxdata/platform/task/backend/executor" @@ -27,7 +29,7 @@ type fakeQueryService struct { queryErr error } -var _ flux.AsyncQueryService = (*fakeQueryService)(nil) +var _ query.AsyncQueryService = (*fakeQueryService)(nil) func makeSpec(q string) *flux.Spec { qs, err := flux.Compile(context.Background(), q, time.Unix(123, 0)) @@ -49,7 +51,7 @@ func newFakeQueryService() *fakeQueryService { return &fakeQueryService{queries: make(map[string]*fakeQuery)} } -func (s *fakeQueryService) Query(ctx context.Context, req *flux.Request) (flux.Query, error) { +func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux.Query, error) { s.mu.Lock() defer s.mu.Unlock() if s.queryErr != nil { @@ -58,9 +60,9 @@ func (s *fakeQueryService) Query(ctx context.Context, req *flux.Request) (flux.Q return nil, err } - sc, ok := req.Compiler.(flux.SpecCompiler) + sc, ok := req.Compiler.(lang.SpecCompiler) if !ok { - return nil, fmt.Errorf("fakeQueryService only supports the flux.SpecCompiler, got %T", req.Compiler) + return nil, fmt.Errorf("fakeQueryService only supports the SpecCompiler, got %T", req.Compiler) } fq := &fakeQuery{ @@ -224,7 +226,7 @@ func createSyncSystem() *system { st: st, ex: executor.NewQueryServiceExecutor( zap.NewNop(), - flux.QueryServiceBridge{ + query.QueryServiceBridge{ AsyncQueryService: svc, }, st, diff --git a/zap/proxy_query_service.go b/zap/proxy_query_service.go index 29fa8f33d9..34580029e3 100644 --- a/zap/proxy_query_service.go +++ b/zap/proxy_query_service.go @@ -4,7 +4,7 @@ import ( "context" "io" - "github.com/influxdata/flux" + "github.com/influxdata/platform/query" "go.uber.org/zap" ) @@ -25,7 +25,7 @@ func NewProxyQueryService(l *zap.Logger) *ProxyQueryService { } // Query logs the query request. -func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) { +func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { if req != nil { s.Logger.Info("query", zap.Any("request", req)) }