chore: Updates to be able to remove platform as a dependency of Flux

pull/10616/head
Nathaniel Cook 2018-09-11 16:56:51 -06:00
parent 402cf4d3f6
commit 672e2d5fe7
31 changed files with 823 additions and 109 deletions

6
Gopkg.lock generated
View File

@ -463,7 +463,7 @@
[[projects]]
branch = "master"
digest = "1:eb7bc670810e9d06d8b42574de77248f0f28ae4cea2ee84d5b4fc70c1dd16a4c"
digest = "1:531dfd2544e63009087e1f01e0ec878e84d80c1890dd4975a55e30946afd4caa"
name = "github.com/influxdata/flux"
packages = [
".",
@ -478,6 +478,7 @@
"functions/storage",
"interpreter",
"iocounter",
"lang",
"options",
"parser",
"plan",
@ -488,7 +489,7 @@
"values",
]
pruneopts = "UT"
revision = "1de8e9acfe7dc7a40c19ac63f15bddc3fdfc49be"
revision = "54c893986878754089554512132439b943dc6b47"
[[projects]]
branch = "platform"
@ -1244,6 +1245,7 @@
"github.com/influxdata/flux/functions",
"github.com/influxdata/flux/functions/storage",
"github.com/influxdata/flux/iocounter",
"github.com/influxdata/flux/lang",
"github.com/influxdata/flux/options",
"github.com/influxdata/flux/parser",
"github.com/influxdata/flux/plan",

View File

@ -8,7 +8,6 @@ import (
"runtime"
"strings"
"github.com/influxdata/flux"
"github.com/influxdata/flux/control"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/functions"
@ -17,7 +16,9 @@ import (
"github.com/influxdata/platform"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/kit/prom"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
pcontrol "github.com/influxdata/platform/query/control"
"github.com/influxdata/platform/query/functions/storage/pb"
"github.com/influxdata/platform/snowflake"
pzap "github.com/influxdata/platform/zap"
@ -103,7 +104,7 @@ func fluxF(cmd *cobra.Command, args []string) {
logger.Error("error injecting dependencies", zap.Error(err))
os.Exit(1)
}
c := control.New(config)
c := pcontrol.New(config)
reg.MustRegister(c.PrometheusCollectors()...)
orgName, err := getStrList("ORGANIZATION_NAME")
@ -114,8 +115,8 @@ func fluxF(cmd *cobra.Command, args []string) {
queryHandler := http.NewExternalQueryHandler()
queryHandler.ProxyQueryService = flux.ProxyQueryServiceBridge{
QueryService: flux.QueryServiceBridge{
queryHandler.ProxyQueryService = query.ProxyQueryServiceBridge{
QueryService: query.QueryServiceBridge{
AsyncQueryService: c,
},
}
@ -163,7 +164,7 @@ func injectDeps(deps execute.Dependencies) error {
return functions.InjectFromDependencies(deps, storage.Dependencies{
Reader: sr,
BucketLookup: bucketLookup{},
OrganizationLookup: flux.FromOrganizationService(&orgSvc),
OrganizationLookup: query.FromOrganizationService(&orgSvc),
})
}

View File

@ -5,13 +5,13 @@ import (
"os"
"strings"
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/functions"
"github.com/influxdata/flux/functions/storage"
"github.com/influxdata/flux/repl"
"github.com/influxdata/platform"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/query/functions/storage/pb"
"github.com/spf13/cobra"
@ -66,12 +66,6 @@ func fluxQueryF(cmd *cobra.Command, args []string) {
os.Exit(1)
}
org, err := orgID(queryFlags.OrgID)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
buckets, err := bucketService(flags.host, flags.token)
if err != nil {
fmt.Fprintln(os.Stderr, err)
@ -84,7 +78,7 @@ func fluxQueryF(cmd *cobra.Command, args []string) {
os.Exit(1)
}
r, err := getFluxREPL(hosts, buckets, orgs, org, queryFlags.Verbose)
r, err := getFluxREPL(hosts, buckets, orgs, queryFlags.Verbose)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
@ -99,8 +93,8 @@ func fluxQueryF(cmd *cobra.Command, args []string) {
func injectDeps(deps execute.Dependencies, hosts storage.Reader, buckets platform.BucketService, orgs platform.OrganizationService) error {
return functions.InjectFromDependencies(deps, storage.Dependencies{
Reader: hosts,
BucketLookup: flux.FromBucketService(buckets),
OrganizationLookup: flux.FromOrganizationService(orgs),
BucketLookup: query.FromBucketService(buckets),
OrganizationLookup: query.FromOrganizationService(orgs),
})
}

View File

@ -57,12 +57,6 @@ func replF(cmd *cobra.Command, args []string) {
os.Exit(1)
}
org, err := orgID(replFlags.OrgID)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
buckets, err := bucketService(flags.host, flags.token)
if err != nil {
fmt.Fprintln(os.Stderr, err)
@ -75,7 +69,7 @@ func replF(cmd *cobra.Command, args []string) {
os.Exit(1)
}
r, err := getFluxREPL(hosts, buckets, orgs, org, replFlags.Verbose)
r, err := getFluxREPL(hosts, buckets, orgs, replFlags.Verbose)
if err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
@ -84,7 +78,7 @@ func replF(cmd *cobra.Command, args []string) {
r.Run()
}
func getFluxREPL(storageHosts storage.Reader, buckets platform.BucketService, orgs platform.OrganizationService, org platform.ID, verbose bool) (*repl.REPL, error) {
func getFluxREPL(storageHosts storage.Reader, buckets platform.BucketService, orgs platform.OrganizationService, verbose bool) (*repl.REPL, error) {
conf := control.Config{
ExecutorDependencies: make(execute.Dependencies),
ConcurrencyQuota: runtime.NumCPU() * 2,
@ -97,5 +91,5 @@ func getFluxREPL(storageHosts storage.Reader, buckets platform.BucketService, or
}
c := control.New(conf)
return repl.New(c, org), nil
return repl.New(c), nil
}

View File

@ -14,8 +14,6 @@ import (
"syscall"
"time"
"github.com/influxdata/flux"
"go.uber.org/zap"
"github.com/influxdata/flux/control"
@ -27,7 +25,9 @@ import (
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/kit/prom"
"github.com/influxdata/platform/nats"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
pcontrol "github.com/influxdata/platform/query/control"
"github.com/influxdata/platform/source"
"github.com/influxdata/platform/task"
taskbackend "github.com/influxdata/platform/task/backend"
@ -170,7 +170,7 @@ func platformF(cmd *cobra.Command, args []string) {
sourceSvc = c
}
var queryService flux.QueryService
var queryService query.QueryService
{
// TODO(lh): this is temporary until query endpoint is added here.
config := control.Config{
@ -180,8 +180,8 @@ func platformF(cmd *cobra.Command, args []string) {
Verbose: false,
}
queryService = flux.QueryServiceBridge{
AsyncQueryService: control.New(config),
queryService = query.QueryServiceBridge{
AsyncQueryService: pcontrol.New(config),
}
}

View File

@ -4,8 +4,8 @@ import (
"fmt"
"net/http"
"github.com/influxdata/flux"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -18,7 +18,7 @@ type ExternalQueryHandler struct {
Logger *zap.Logger
ProxyQueryService flux.ProxyQueryService
ProxyQueryService query.ProxyQueryService
OrganizationService platform.OrganizationService
}

View File

@ -10,8 +10,10 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/platform"
platformhttp "github.com/influxdata/platform/http"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
)
@ -23,18 +25,18 @@ type SourceProxyQueryService struct {
platform.V1SourceFields
}
func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
switch req.Request.Compiler.CompilerType() {
case influxql.CompilerType:
return s.influxQuery(ctx, w, req)
case flux.FluxCompilerType:
case lang.FluxCompilerType:
return s.fluxQuery(ctx, w, req)
}
return 0, fmt.Errorf("compiler type not supported")
}
func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
if len(s.FluxURL) == 0 {
return 0, fmt.Errorf("fluxURL from source cannot be empty if the compiler type is flux")
}
@ -47,12 +49,12 @@ func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, re
}{}
switch c := req.Request.Compiler.(type) {
case flux.FluxCompiler:
case lang.FluxCompiler:
request.Query = c.Query
request.Type = flux.FluxCompilerType
case flux.SpecCompiler:
request.Type = lang.FluxCompilerType
case lang.SpecCompiler:
request.Spec = c.Spec
request.Type = flux.SpecCompilerType
request.Type = lang.SpecCompilerType
default:
return 0, fmt.Errorf("compiler type not supported: %s", c.CompilerType())
}
@ -102,7 +104,7 @@ func (s *SourceProxyQueryService) fluxQuery(ctx context.Context, w io.Writer, re
return io.Copy(w, resp.Body)
}
func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *SourceProxyQueryService) influxQuery(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
if len(s.URL) == 0 {
return 0, fmt.Errorf("URL from source cannot be empty if the compiler type is influxql")
}

View File

@ -9,6 +9,7 @@ import (
"net/http"
"github.com/influxdata/flux"
"github.com/influxdata/platform/query"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -23,7 +24,7 @@ type ProxyQueryHandler struct {
Logger *zap.Logger
ProxyQueryService flux.ProxyQueryService
ProxyQueryService query.ProxyQueryService
CompilerMappings flux.CompilerMappings
DialectMappings flux.DialectMappings
@ -48,7 +49,7 @@ type HTTPDialect interface {
func (h *ProxyQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req flux.ProxyRequest
var req query.ProxyRequest
req.WithCompilerMappings(h.CompilerMappings)
req.WithDialectMappings(h.DialectMappings)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
@ -89,7 +90,7 @@ type ProxyQueryService struct {
InsecureSkipVerify bool
}
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
u, err := newURL(s.Addr, proxyQueryPath)
if err != nil {
return 0, err

View File

@ -9,8 +9,10 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/query"
)
// QueryRequest is a flux query request.
@ -91,18 +93,18 @@ func (r QueryRequest) Validate() error {
}
// ProxyRequest returns a request to proxy from the flux.
func (r QueryRequest) ProxyRequest() (*flux.ProxyRequest, error) {
func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) {
if err := r.Validate(); err != nil {
return nil, err
}
// Query is preferred over spec
var compiler flux.Compiler
if r.Query != "" {
compiler = flux.FluxCompiler{
compiler = lang.FluxCompiler{
Query: r.Query,
}
} else if r.Spec != nil {
compiler = flux.SpecCompiler{
compiler = lang.SpecCompiler{
Spec: r.Spec,
}
}
@ -116,8 +118,8 @@ func (r QueryRequest) ProxyRequest() (*flux.ProxyRequest, error) {
// TODO(nathanielc): Use commentPrefix and dateTimeFormat
// once they are supported.
return &flux.ProxyRequest{
Request: flux.Request{
return &query.ProxyRequest{
Request: query.Request{
OrganizationID: r.org.ID,
Compiler: compiler,
},
@ -147,7 +149,7 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ
return &req, err
}
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*flux.ProxyRequest, error) {
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*query.ProxyRequest, error) {
req, err := decodeQueryRequest(ctx, r, svc)
if err != nil {
return nil, err

View File

@ -8,10 +8,10 @@ import (
"io"
"net/http"
"github.com/influxdata/flux"
"github.com/influxdata/platform"
pcontext "github.com/influxdata/platform/context"
"github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/query"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -29,7 +29,7 @@ type FluxHandler struct {
AuthorizationService platform.AuthorizationService
OrganizationService platform.OrganizationService
ProxyQueryService flux.ProxyQueryService
ProxyQueryService query.ProxyQueryService
}
// NewFluxHandler returns a new handler at /v2/query for flux queries.
@ -103,7 +103,7 @@ type FluxService struct {
}
// Query runs a flux query against a influx server and sends the results to the io.Writer.
func (s *FluxService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
u, err := newURL(s.URL, fluxPath)
if err != nil {
return 0, err

View File

@ -8,6 +8,7 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/platform/query"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -26,7 +27,7 @@ type QueryHandler struct {
csvDialect csv.Dialect
QueryService flux.QueryService
QueryService query.QueryService
CompilerMappings flux.CompilerMappings
}
@ -54,7 +55,7 @@ func (h *QueryHandler) handlePing(w http.ResponseWriter, r *http.Request) {
func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req flux.Request
var req query.Request
req.WithCompilerMappings(h.CompilerMappings)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
EncodeError(ctx, err, w)
@ -125,7 +126,7 @@ type QueryService struct {
InsecureSkipVerify bool
}
func (s *QueryService) Query(ctx context.Context, req *flux.Request) (flux.ResultIterator, error) {
func (s *QueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
u, err := newURL(s.Addr, queryPath)
if err != nil {
return nil, err

View File

@ -10,8 +10,9 @@ import (
"net/url"
"strings"
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
)
@ -21,17 +22,17 @@ type SourceProxyQueryService struct {
platform.SourceFields
}
func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *SourceProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
switch req.Request.Compiler.CompilerType() {
case influxql.CompilerType:
return s.queryInfluxQL(ctx, w, req)
case flux.FluxCompilerType:
case lang.FluxCompilerType:
return s.queryFlux(ctx, w, req)
}
return 0, fmt.Errorf("compiler type not supported")
}
func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
u, err := newURL(s.URL, "/v1/query")
if err != nil {
return 0, err
@ -61,7 +62,7 @@ func (s *SourceProxyQueryService) queryFlux(ctx context.Context, w io.Writer, re
return io.Copy(w, resp.Body)
}
func (s *SourceProxyQueryService) queryInfluxQL(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *SourceProxyQueryService) queryInfluxQL(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
compiler, ok := req.Request.Compiler.(*influxql.Compiler)
if !ok {

View File

@ -12,8 +12,10 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/platform"
kerrors "github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
"github.com/julienschmidt/httprouter"
)
@ -70,7 +72,7 @@ type SourceHandler struct {
// TODO(desa): this was done so in order to remove an import cycle and to allow
// for http mocking.
NewBucketService func(s *platform.Source) (platform.BucketService, error)
NewQueryService func(s *platform.Source) (flux.ProxyQueryService, error)
NewQueryService func(s *platform.Source) (query.ProxyQueryService, error)
}
// NewSourceHandler returns a new instance of SourceHandler.
@ -81,7 +83,7 @@ func NewSourceHandler() *SourceHandler {
NewBucketService: func(s *platform.Source) (platform.BucketService, error) {
return nil, fmt.Errorf("bucket service not set")
},
NewQueryService: func(s *platform.Source) (flux.ProxyQueryService, error) {
NewQueryService: func(s *platform.Source) (query.ProxyQueryService, error) {
return nil, fmt.Errorf("query service not set")
},
}
@ -99,7 +101,7 @@ func NewSourceHandler() *SourceHandler {
return h
}
func decodeSourceQueryRequest(r *http.Request) (*flux.ProxyRequest, error) {
func decodeSourceQueryRequest(r *http.Request) (*query.ProxyRequest, error) {
// starts here
request := struct {
Spec *flux.Spec `json:"spec"`
@ -118,18 +120,18 @@ func decodeSourceQueryRequest(r *http.Request) (*flux.ProxyRequest, error) {
return nil, err
}
req := &flux.ProxyRequest{}
req := &query.ProxyRequest{}
req.Dialect = request.Dialect
req.Request.OrganizationID = request.OrganizationID
switch request.Type {
case flux.FluxCompilerType:
req.Request.Compiler = flux.FluxCompiler{
case lang.FluxCompilerType:
req.Request.Compiler = lang.FluxCompiler{
Query: request.Query,
}
case flux.SpecCompilerType:
req.Request.Compiler = flux.SpecCompiler{
case lang.SpecCompilerType:
req.Request.Compiler = lang.SpecCompiler{
Spec: request.Spec,
}
case influxql.CompilerType:

36
query/bridges.go Normal file
View File

@ -0,0 +1,36 @@
package query
import (
"context"
"io"
"github.com/influxdata/flux"
)
// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface.
type QueryServiceBridge struct {
AsyncQueryService AsyncQueryService
}
func (b QueryServiceBridge) Query(ctx context.Context, req *Request) (flux.ResultIterator, error) {
query, err := b.AsyncQueryService.Query(ctx, req)
if err != nil {
return nil, err
}
return flux.NewResultIteratorFromQuery(query), nil
}
// ProxyQueryServiceBridge implements ProxyQueryService while consuming a QueryService interface.
type ProxyQueryServiceBridge struct {
QueryService QueryService
}
func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (int64, error) {
results, err := b.QueryService.Query(ctx, &req.Request)
if err != nil {
return 0, err
}
defer results.Cancel()
encoder := req.Dialect.Encoder()
return encoder.Encode(w, results)
}

View File

@ -0,0 +1,39 @@
package control
import (
"context"
"github.com/influxdata/flux"
"github.com/influxdata/flux/control"
"github.com/influxdata/platform/query"
"github.com/prometheus/client_golang/prometheus"
)
// orgLabel is the metric label to use in the controller
const orgLabel = "org"
// Controller implements AsyncQueryService by consuming a control.Controller.
type Controller struct {
c *control.Controller
}
// NewController creates a new Controller specific to platform.
func New(config control.Config) *Controller {
config.MetricLabelKeys = append(config.MetricLabelKeys, orgLabel)
c := control.New(config)
return &Controller{c: c}
}
// Query satisifies the AsyncQueryService while ensuring the request is propogated on the context.
func (c *Controller) Query(ctx context.Context, req *query.Request) (flux.Query, error) {
// Set the request on the context so platform specific Flux operations can retrieve it later.
ctx = query.ContextWithRequest(ctx, req)
// Set the org label value for controller metrics
ctx = context.WithValue(ctx, orgLabel, req.OrganizationID.String())
return c.c.Query(ctx, req.Compiler)
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (c *Controller) PrometheusCollectors() []prometheus.Collector {
return c.c.PrometheusCollectors()
}

56
query/dependency.go Normal file
View File

@ -0,0 +1,56 @@
package query
import (
"context"
"github.com/influxdata/platform"
)
// FromBucketService wraps an platform.BucketService in the BucketLookup interface.
func FromBucketService(srv platform.BucketService) *BucketLookup {
return &BucketLookup{
BucketService: srv,
}
}
// BucketLookup converts Flux bucket lookups into platform.BucketService calls.
type BucketLookup struct {
BucketService platform.BucketService
}
// Lookup returns the bucket id and its existence given an org id and bucket name.
func (b *BucketLookup) Lookup(orgID platform.ID, name string) (platform.ID, bool) {
oid := platform.ID(orgID)
filter := platform.BucketFilter{
OrganizationID: &oid,
Name: &name,
}
bucket, err := b.BucketService.FindBucket(context.Background(), filter)
if err != nil {
return nil, false
}
return bucket.ID, true
}
// FromOrganizationService wraps a platform.OrganizationService in the OrganizationLookup interface.
func FromOrganizationService(srv platform.OrganizationService) *OrganizationLookup {
return &OrganizationLookup{OrganizationService: srv}
}
// OrganizationLookup converts organization name lookups into platform.OrganizationService calls.
type OrganizationLookup struct {
OrganizationService platform.OrganizationService
}
// Lookup returns the organization ID and its existence given an organization name.
func (o *OrganizationLookup) Lookup(ctx context.Context, name string) (platform.ID, bool) {
org, err := o.OrganizationService.FindOrganization(
ctx,
platform.OrganizationFilter{Name: &name},
)
if err != nil {
return nil, false
}
return org.ID, true
}

View File

@ -9,11 +9,12 @@ import (
"strings"
"testing"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/querytest"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/query/influxql"
@ -52,7 +53,7 @@ var skipTests = map[string]string{
"string_interp": "string interpolation not working as expected in flux (https://github.com/influxdata/platform/issues/404)",
}
var pqs = querytest.GetProxyQueryServiceBridge()
var querier = querytest.NewQuerier()
func withEachFluxFile(t testing.TB, fn func(prefix, caseName string)) {
dir, err := os.Getwd()
@ -84,13 +85,13 @@ func Test_QueryEndToEnd(t *testing.T) {
if skip {
t.Skip(reason)
}
testFlux(t, pqs, prefix, ".flux")
testFlux(t, querier, prefix, ".flux")
})
t.Run(influxqlName, func(t *testing.T) {
if skip {
t.Skip(reason)
}
testInfluxQL(t, pqs, prefix, ".influxql")
testInfluxQL(t, querier, prefix, ".influxql")
})
})
}
@ -111,7 +112,7 @@ func Benchmark_QueryEndToEnd(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
testFlux(b, pqs, prefix, ".flux")
testFlux(b, querier, prefix, ".flux")
}
})
b.Run(influxqlName, func(b *testing.B) {
@ -121,13 +122,13 @@ func Benchmark_QueryEndToEnd(b *testing.B) {
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
testInfluxQL(b, pqs, prefix, ".influxql")
testInfluxQL(b, querier, prefix, ".influxql")
}
})
})
}
func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) {
func testFlux(t testing.TB, querier *querytest.Querier, prefix, queryExt string) {
q, err := ioutil.ReadFile(prefix + queryExt)
if err != nil {
t.Fatal(err)
@ -139,11 +140,11 @@ func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string)
t.Fatal(err)
}
compiler := flux.FluxCompiler{
compiler := lang.FluxCompiler{
Query: string(q),
}
req := &flux.ProxyRequest{
Request: flux.Request{
req := &query.ProxyRequest{
Request: query.Request{
Compiler: querytest.FromCSVCompiler{
Compiler: compiler,
InputFile: csvInFilename,
@ -152,10 +153,10 @@ func testFlux(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string)
Dialect: csv.DefaultDialect(),
}
QueryTestCheckSpec(t, pqs, req, string(csvOut))
QueryTestCheckSpec(t, querier, req, string(csvOut))
}
func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt string) {
func testInfluxQL(t testing.TB, querier *querytest.Querier, prefix, queryExt string) {
q, err := ioutil.ReadFile(prefix + queryExt)
if err != nil {
if !os.IsNotExist(err) {
@ -174,8 +175,8 @@ func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt str
compiler.Cluster = "cluster"
compiler.DB = "db0"
compiler.Query = string(q)
req := &flux.ProxyRequest{
Request: flux.Request{
req := &query.ProxyRequest{
Request: query.Request{
Compiler: querytest.FromCSVCompiler{
Compiler: compiler,
InputFile: csvInFilename,
@ -183,7 +184,7 @@ func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt str
},
Dialect: csv.DefaultDialect(),
}
QueryTestCheckSpec(t, pqs, req, string(csvOut))
QueryTestCheckSpec(t, querier, req, string(csvOut))
// Rerun test for InfluxQL JSON dialect
req.Dialect = new(influxql.Dialect)
@ -195,14 +196,14 @@ func testInfluxQL(t testing.TB, pqs flux.ProxyQueryService, prefix, queryExt str
}
t.Skip("influxql expected json is missing")
}
QueryTestCheckSpec(t, pqs, req, string(jsonOut))
QueryTestCheckSpec(t, querier, req, string(jsonOut))
}
func QueryTestCheckSpec(t testing.TB, pqs flux.ProxyQueryService, req *flux.ProxyRequest, want string) {
func QueryTestCheckSpec(t testing.TB, querier *querytest.Querier, req *query.ProxyRequest, want string) {
t.Helper()
var buf bytes.Buffer
_, err := pqs.Query(context.Background(), &buf, req)
_, err := querier.Query(context.Background(), &buf, req.Request.Compiler, req.Dialect)
if err != nil {
t.Errorf("failed to run query: %v", err)
return

51
query/logger.go Normal file
View File

@ -0,0 +1,51 @@
package query
import (
"time"
"github.com/influxdata/flux"
"github.com/influxdata/platform"
)
// Logger persists metadata about executed queries.
type Logger interface {
Log(Log) error
}
// Log captures a query and any relevant metadata for the query execution.
type Log struct {
// Time is the time the query was completed
Time time.Time
// OrganizationID is the ID of the organization that requested the query
OrganizationID platform.ID
// Error is any error encountered by the query
Error error
// ProxyRequest is the query request
ProxyRequest *ProxyRequest
// ResponseSize is the size in bytes of the query response
ResponseSize int64
// Statistics is a set of statistics about the query execution
Statistics flux.Statistics
}
// Redact removes any sensitive information before logging
func (q *Log) Redact() {
if q.ProxyRequest != nil && q.ProxyRequest.Request.Authorization != nil {
// Make shallow copy of request
request := new(ProxyRequest)
*request = *q.ProxyRequest
// Make shallow copy of authorization
auth := new(platform.Authorization)
*auth = *q.ProxyRequest.Request.Authorization
// Redact authorization token
auth.Token = ""
// Apply redacted authorization
request.Request.Authorization = auth
// Apply redacted request
q.ProxyRequest = request
}
}

59
query/logging.go Normal file
View File

@ -0,0 +1,59 @@
package query
import (
"context"
"fmt"
"io"
"time"
"github.com/influxdata/flux"
)
// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface.
type LoggingServiceBridge struct {
QueryService QueryService
QueryLogger Logger
}
// Query executes and logs the query.
func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (n int64, err error) {
var stats flux.Statistics
defer func() {
r := recover()
if r != nil {
err = fmt.Errorf("panic: %v", r)
}
log := Log{
OrganizationID: req.Request.OrganizationID,
ProxyRequest: req,
ResponseSize: n,
Time: time.Now(),
Statistics: stats,
}
if err != nil {
log.Error = err
}
s.QueryLogger.Log(log)
}()
results, err := s.QueryService.Query(ctx, &req.Request)
if err != nil {
return 0, err
}
// Check if this result iterator reports stats. We call this defer before cancel because
// the query needs to be finished before it will have valid statistics.
if s, ok := results.(flux.Statisticser); ok {
defer func() {
stats = s.Statistics()
}()
}
defer results.Cancel()
encoder := req.Dialect.Encoder()
n, err = encoder.Encode(w, results)
if err != nil {
return n, err
}
// The results iterator may have had an error independent of encoding errors.
return n, results.Err()
}

39
query/mock/service.go Normal file
View File

@ -0,0 +1,39 @@
package mock
import (
"context"
"io"
"github.com/influxdata/flux"
"github.com/influxdata/platform/query"
)
// ProxyQueryService mocks the idep QueryService for testing.
type ProxyQueryService struct {
QueryF func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error)
}
// Query writes the results of the query request.
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
return s.QueryF(ctx, w, req)
}
// QueryService mocks the idep QueryService for testing.
type QueryService struct {
QueryF func(ctx context.Context, req *query.Request) (flux.ResultIterator, error)
}
// Query writes the results of the query request.
func (s *QueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
return s.QueryF(ctx, req)
}
// AsyncQueryService mocks the idep QueryService for testing.
type AsyncQueryService struct {
QueryF func(ctx context.Context, req *query.Request) (flux.Query, error)
}
// Query writes the results of the query request.
func (s *AsyncQueryService) Query(ctx context.Context, req *query.Request) (flux.Query, error) {
return s.QueryF(ctx, req)
}

View File

@ -29,7 +29,7 @@ type preAuthorizer struct {
// given the Authorization. Returns nil on success, and an error with an appropriate message otherwise.
func (a *preAuthorizer) PreAuthorize(ctx context.Context, spec *flux.Spec, auth *platform.Authorization) error {
readBuckets, writeBuckets, err := spec.BucketsAccessed()
readBuckets, writeBuckets, err := BucketsAccessed(spec)
if err != nil {
return errors.Wrap(err, "Could not retrieve buckets for query.Spec")

View File

@ -1,4 +1,4 @@
package query
package query_test
import (
"context"
@ -10,6 +10,7 @@ import (
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
)
@ -40,7 +41,7 @@ func TestPreAuthorizer_PreAuthorize(t *testing.T) {
// and no authorization
auth := &platform.Authorization{Status: platform.Active}
emptyBucketService := mock.NewBucketService()
preAuthorizer := NewPreAuthorizer(emptyBucketService)
preAuthorizer := query.NewPreAuthorizer(emptyBucketService)
err = preAuthorizer.PreAuthorize(ctx, spec, auth)
if diagnostic := cmp.Diff("Bucket service returned nil bucket", err.Error()); diagnostic != "" {
@ -55,7 +56,7 @@ func TestPreAuthorizer_PreAuthorize(t *testing.T) {
ID: *id,
})
preAuthorizer = NewPreAuthorizer(bucketService)
preAuthorizer = query.NewPreAuthorizer(bucketService)
err = preAuthorizer.PreAuthorize(ctx, spec, auth)
if diagnostic := cmp.Diff(`No read permission for bucket: "my_bucket"`, err.Error()); diagnostic != "" {
t.Errorf("Authorize message mismatch: -want/+got:\n%v", diagnostic)

View File

@ -0,0 +1,85 @@
package querytest
import (
"context"
"testing"
"time"
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/flux"
"github.com/influxdata/flux/functions"
"github.com/influxdata/flux/semantic/semantictest"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
)
type BucketAwareQueryTestCase struct {
Name string
Raw string
Want *flux.Spec
WantErr bool
WantReadBuckets *[]platform.BucketFilter
WantWriteBuckets *[]platform.BucketFilter
}
var opts = append(
semantictest.CmpOptions,
cmp.AllowUnexported(flux.Spec{}),
cmp.AllowUnexported(functions.JoinOpSpec{}),
cmpopts.IgnoreUnexported(flux.Spec{}),
cmpopts.IgnoreUnexported(functions.JoinOpSpec{}),
)
func BucketAwareQueryTestHelper(t *testing.T, tc BucketAwareQueryTestCase) {
t.Helper()
now := time.Now().UTC()
got, err := flux.Compile(context.Background(), tc.Raw, now)
if (err != nil) != tc.WantErr {
t.Errorf("error compiling spec error: %v, wantErr %v", err, tc.WantErr)
return
}
if tc.WantErr {
return
}
if tc.Want != nil {
tc.Want.Now = now
if !cmp.Equal(tc.Want, got, opts...) {
t.Errorf("unexpected specs -want/+got %s", cmp.Diff(tc.Want, got, opts...))
}
}
var gotReadBuckets, gotWriteBuckets []platform.BucketFilter
if tc.WantReadBuckets != nil || tc.WantWriteBuckets != nil {
gotReadBuckets, gotWriteBuckets, err = query.BucketsAccessed(got)
}
if tc.WantReadBuckets != nil {
if diagnostic := verifyBuckets(*tc.WantReadBuckets, gotReadBuckets); diagnostic != "" {
t.Errorf("Could not verify read buckets: %v", diagnostic)
}
}
if tc.WantWriteBuckets != nil {
if diagnostic := verifyBuckets(*tc.WantWriteBuckets, gotWriteBuckets); diagnostic != "" {
t.Errorf("Could not verify write buckets: %v", diagnostic)
}
}
}
func verifyBuckets(wantBuckets, gotBuckets []platform.BucketFilter) string {
if len(wantBuckets) != len(gotBuckets) {
return fmt.Sprintf("Expected %v buckets but got %v", len(wantBuckets), len(gotBuckets))
}
for i, wantBucket := range wantBuckets {
if diagnostic := cmp.Diff(wantBucket, gotBuckets[i]); diagnostic != "" {
return fmt.Sprintf("Bucket mismatch: -want/+got:\n%v", diagnostic)
}
}
return ""
}

153
query/request.go Normal file
View File

@ -0,0 +1,153 @@
package query
import (
"context"
"encoding/json"
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/platform"
)
// Request respresents the query to run.
type Request struct {
// Scope
Authorization *platform.Authorization `json:"authorization,omitempty"`
OrganizationID platform.ID `json:"organization_id"`
// Command
// Compiler converts the query to a specification to run against the data.
Compiler flux.Compiler `json:"compiler"`
// compilerMappings maps compiler types to creation methods
compilerMappings flux.CompilerMappings
}
// WithCompilerMappings sets the query type mappings on the request.
func (r *Request) WithCompilerMappings(mappings flux.CompilerMappings) {
r.compilerMappings = mappings
}
// UnmarshalJSON populates the request from the JSON data.
// WithCompilerMappings must have been called or an error will occur.
func (r *Request) UnmarshalJSON(data []byte) error {
type Alias Request
raw := struct {
*Alias
CompilerType flux.CompilerType `json:"compiler_type"`
Compiler json.RawMessage `json:"compiler"`
}{
Alias: (*Alias)(r),
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
createCompiler, ok := r.compilerMappings[raw.CompilerType]
if !ok {
return fmt.Errorf("unsupported compiler type %q", raw.CompilerType)
}
c := createCompiler()
if err := json.Unmarshal(raw.Compiler, c); err != nil {
return err
}
r.Compiler = c
return nil
}
func (r Request) MarshalJSON() ([]byte, error) {
type Alias Request
raw := struct {
Alias
CompilerType flux.CompilerType `json:"compiler_type"`
}{
Alias: (Alias)(r),
CompilerType: r.Compiler.CompilerType(),
}
return json.Marshal(raw)
}
type contextKey struct{}
var activeContextKey = contextKey{}
// ContextWithRequest returns a new context with a reference to the request.
func ContextWithRequest(ctx context.Context, req *Request) context.Context {
return context.WithValue(ctx, activeContextKey, req)
}
//RequestFromContext retrieves a *Request from a context.
// If not request exists on the context nil is returned.
func RequestFromContext(ctx context.Context) *Request {
v := ctx.Value(activeContextKey)
if v == nil {
return nil
}
return v.(*Request)
}
// ProxyRequest specifies a query request and the dialect for the results.
type ProxyRequest struct {
// Request is the basic query request
Request Request `json:"request"`
// Dialect is the result encoder
Dialect flux.Dialect `json:"dialect"`
// dialectMappings maps dialect types to creation methods
dialectMappings flux.DialectMappings
}
// WithCompilerMappings sets the compiler type mappings on the request.
func (r *ProxyRequest) WithCompilerMappings(mappings flux.CompilerMappings) {
r.Request.WithCompilerMappings(mappings)
}
// WithDialectMappings sets the dialect type mappings on the request.
func (r *ProxyRequest) WithDialectMappings(mappings flux.DialectMappings) {
r.dialectMappings = mappings
}
// UnmarshalJSON populates the request from the JSON data.
// WithCompilerMappings and WithDialectMappings must have been called or an error will occur.
func (r *ProxyRequest) UnmarshalJSON(data []byte) error {
type Alias ProxyRequest
raw := struct {
*Alias
DialectType flux.DialectType `json:"dialect_type"`
Dialect json.RawMessage `json:"dialect"`
}{
Alias: (*Alias)(r),
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
createDialect, ok := r.dialectMappings[raw.DialectType]
if !ok {
return fmt.Errorf("unsupported dialect type %q", raw.DialectType)
}
d := createDialect()
if err := json.Unmarshal(raw.Dialect, d); err != nil {
return err
}
r.Dialect = d
return nil
}
func (r ProxyRequest) MarshalJSON() ([]byte, error) {
type Alias ProxyRequest
raw := struct {
Alias
DialectType flux.DialectType `json:"dialect_type"`
}{
Alias: (Alias)(r),
DialectType: r.Dialect.DialectType(),
}
return json.Marshal(raw)
}

30
query/service.go Normal file
View File

@ -0,0 +1,30 @@
package query
import (
"context"
"io"
"github.com/influxdata/flux"
)
// QueryService represents a type capable of performing queries.
type QueryService interface {
// Query submits a query for execution returning a results iterator.
// Cancel must be called on any returned results to free resources.
Query(ctx context.Context, req *Request) (flux.ResultIterator, error)
}
// AsyncQueryService represents a service for performing queries where the results are delivered asynchronously.
type AsyncQueryService interface {
// Query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
Query(ctx context.Context, req *Request) (flux.Query, error)
}
// ProxyQueryService performs queries and encodes the result into a writer.
// The results are opaque to a ProxyQueryService.
type ProxyQueryService interface {
// Query performs the requested query and encodes the results into w.
// The number of bytes written to w is returned __independent__ of any error.
Query(ctx context.Context, w io.Writer, req *ProxyRequest) (int64, error)
}

130
query/service_test.go Normal file
View File

@ -0,0 +1,130 @@
package query_test
import (
"context"
"encoding/json"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/flux"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
)
var CmpOpts = []cmp.Option{
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
cmpopts.IgnoreUnexported(query.Request{}),
}
type compilerA struct {
A string `json:"a"`
}
func (c compilerA) Compile(ctx context.Context) (*flux.Spec, error) {
panic("not implemented")
}
func (c compilerA) CompilerType() flux.CompilerType {
return "compilerA"
}
var compilerMappings = flux.CompilerMappings{
"compilerA": func() flux.Compiler { return new(compilerA) },
}
type dialectB struct {
B int `json:"b"`
}
func (d dialectB) Encoder() flux.MultiResultEncoder {
panic("not implemented")
}
func (d dialectB) DialectType() flux.DialectType {
return "dialectB"
}
var dialectMappings = flux.DialectMappings{
"dialectB": func() flux.Dialect { return new(dialectB) },
}
func TestRequest_JSON(t *testing.T) {
testCases := []struct {
name string
data string
want query.Request
}{
{
name: "simple",
data: `{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"}`,
want: query.Request{
OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA},
Compiler: &compilerA{
A: "my custom compiler",
},
},
},
}
for _, tc := range testCases {
var r query.Request
r.WithCompilerMappings(compilerMappings)
if err := json.Unmarshal([]byte(tc.data), &r); err != nil {
t.Fatal(err)
}
if !cmp.Equal(tc.want, r, CmpOpts...) {
t.Fatalf("unexpected request: -want/+got:\n%s", cmp.Diff(tc.want, r, CmpOpts...))
}
marshalled, err := json.Marshal(r)
if err != nil {
t.Fatal(err)
}
if got, want := string(marshalled), tc.data; got != want {
t.Fatalf("unexpected marshalled request: -want/+got:\n%s", cmp.Diff(want, got))
}
}
}
func TestProxyRequest_JSON(t *testing.T) {
testCases := []struct {
name string
data string
want query.ProxyRequest
}{
{
name: "simple",
data: `{"request":{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"},"dialect":{"b":42},"dialect_type":"dialectB"}`,
want: query.ProxyRequest{
Request: query.Request{
OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA},
Compiler: &compilerA{
A: "my custom compiler",
},
},
Dialect: &dialectB{
B: 42,
},
},
},
}
for _, tc := range testCases {
var pr query.ProxyRequest
pr.WithCompilerMappings(compilerMappings)
pr.WithDialectMappings(dialectMappings)
if err := json.Unmarshal([]byte(tc.data), &pr); err != nil {
t.Fatal(err)
}
if !cmp.Equal(tc.want, pr, CmpOpts...) {
t.Fatalf("unexpected proxy request: -want/+got:\n%s", cmp.Diff(tc.want, pr, CmpOpts...))
}
marshalled, err := json.Marshal(pr)
if err != nil {
t.Fatal(err)
}
if got, want := string(marshalled), tc.data; got != want {
t.Fatalf("unexpected marshalled proxy request: -want/+got:\n%s", cmp.Diff(want, got))
}
}
}

30
query/spec.go Normal file
View File

@ -0,0 +1,30 @@
package query
import (
"github.com/influxdata/flux"
"github.com/influxdata/platform"
)
// BucketAwareOperationSpec specifies an operation that reads or writes buckets
type BucketAwareOperationSpec interface {
BucketsAccessed() (readBuckets, writeBuckets []platform.BucketFilter)
}
// BucketsAccessed returns the set of buckets read and written by a query spec
func BucketsAccessed(q *flux.Spec) (readBuckets, writeBuckets []platform.BucketFilter, err error) {
err = q.Walk(func(o *flux.Operation) error {
bucketAwareOpSpec, ok := o.Spec.(BucketAwareOperationSpec)
if ok {
opBucketsRead, opBucketsWritten := bucketAwareOpSpec.BucketsAccessed()
readBuckets = append(readBuckets, opBucketsRead...)
writeBuckets = append(writeBuckets, opBucketsWritten...)
}
return nil
})
if err != nil {
return nil, nil, err
}
return readBuckets, writeBuckets, nil
}

View File

@ -3,14 +3,14 @@ package source
import (
"fmt"
"github.com/influxdata/flux"
"github.com/influxdata/platform"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/http/influxdb"
"github.com/influxdata/platform/query"
)
// NewQueryService creates a bucket service from a source.
func NewQueryService(s *platform.Source) (flux.ProxyQueryService, error) {
func NewQueryService(s *platform.Source) (query.ProxyQueryService, error) {
switch s.Type {
case platform.SelfSourceType:
// TODO(fntlnz): this is supposed to call a query service directly locally,

View File

@ -8,14 +8,16 @@ import (
"time"
"github.com/influxdata/flux"
"github.com/influxdata/flux/lang"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/task/backend"
"go.uber.org/zap"
)
// queryServiceExecutor is an implementation of backend.Executor that depends on a QueryService.
type queryServiceExecutor struct {
svc flux.QueryService
svc query.QueryService
st backend.Store
logger *zap.Logger
}
@ -25,7 +27,7 @@ var _ backend.Executor = (*queryServiceExecutor)(nil)
// NewQueryServiceExecutor returns a new executor based on the given QueryService.
// In general, you should prefer NewAsyncQueryServiceExecutor, as that code is smaller and simpler,
// because asynchronous queries are more in line with the Executor interface.
func NewQueryServiceExecutor(logger *zap.Logger, svc flux.QueryService, st backend.Store) backend.Executor {
func NewQueryServiceExecutor(logger *zap.Logger, svc query.QueryService, st backend.Store) backend.Executor {
return &queryServiceExecutor{logger: logger, svc: svc, st: st}
}
@ -41,7 +43,7 @@ func (e *queryServiceExecutor) Execute(ctx context.Context, run backend.QueuedRu
// syncRunPromise implements backend.RunPromise for a synchronous QueryService.
type syncRunPromise struct {
qr backend.QueuedRun
svc flux.QueryService
svc query.QueryService
t *backend.StoreTask
ctx context.Context
cancel context.CancelFunc
@ -123,9 +125,9 @@ func (p *syncRunPromise) doQuery() {
return
}
req := &flux.Request{
req := &query.Request{
OrganizationID: p.t.Org,
Compiler: flux.SpecCompiler{
Compiler: lang.SpecCompiler{
Spec: spec,
},
}
@ -159,7 +161,7 @@ func (p *syncRunPromise) cancelOnContextDone() {
// asyncQueryServiceExecutor is an implementation of backend.Executor that depends on an AsyncQueryService.
type asyncQueryServiceExecutor struct {
svc flux.AsyncQueryService
svc query.AsyncQueryService
st backend.Store
logger *zap.Logger
}
@ -167,7 +169,7 @@ type asyncQueryServiceExecutor struct {
var _ backend.Executor = (*asyncQueryServiceExecutor)(nil)
// NewQueryServiceExecutor returns a new executor based on the given AsyncQueryService.
func NewAsyncQueryServiceExecutor(logger *zap.Logger, svc flux.AsyncQueryService, st backend.Store) backend.Executor {
func NewAsyncQueryServiceExecutor(logger *zap.Logger, svc query.AsyncQueryService, st backend.Store) backend.Executor {
return &asyncQueryServiceExecutor{logger: logger, svc: svc, st: st}
}
@ -182,9 +184,9 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que
return nil, err
}
req := &flux.Request{
req := &query.Request{
OrganizationID: t.Org,
Compiler: flux.SpecCompiler{
Compiler: lang.SpecCompiler{
Spec: spec,
},
}

View File

@ -13,8 +13,10 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/lang"
"github.com/influxdata/flux/values"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/task/backend"
"github.com/influxdata/platform/task/backend/executor"
@ -27,7 +29,7 @@ type fakeQueryService struct {
queryErr error
}
var _ flux.AsyncQueryService = (*fakeQueryService)(nil)
var _ query.AsyncQueryService = (*fakeQueryService)(nil)
func makeSpec(q string) *flux.Spec {
qs, err := flux.Compile(context.Background(), q, time.Unix(123, 0))
@ -49,7 +51,7 @@ func newFakeQueryService() *fakeQueryService {
return &fakeQueryService{queries: make(map[string]*fakeQuery)}
}
func (s *fakeQueryService) Query(ctx context.Context, req *flux.Request) (flux.Query, error) {
func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (flux.Query, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.queryErr != nil {
@ -58,9 +60,9 @@ func (s *fakeQueryService) Query(ctx context.Context, req *flux.Request) (flux.Q
return nil, err
}
sc, ok := req.Compiler.(flux.SpecCompiler)
sc, ok := req.Compiler.(lang.SpecCompiler)
if !ok {
return nil, fmt.Errorf("fakeQueryService only supports the flux.SpecCompiler, got %T", req.Compiler)
return nil, fmt.Errorf("fakeQueryService only supports the SpecCompiler, got %T", req.Compiler)
}
fq := &fakeQuery{
@ -224,7 +226,7 @@ func createSyncSystem() *system {
st: st,
ex: executor.NewQueryServiceExecutor(
zap.NewNop(),
flux.QueryServiceBridge{
query.QueryServiceBridge{
AsyncQueryService: svc,
},
st,

View File

@ -4,7 +4,7 @@ import (
"context"
"io"
"github.com/influxdata/flux"
"github.com/influxdata/platform/query"
"go.uber.org/zap"
)
@ -25,7 +25,7 @@ func NewProxyQueryService(l *zap.Logger) *ProxyQueryService {
}
// Query logs the query request.
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *flux.ProxyRequest) (int64, error) {
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
if req != nil {
s.Logger.Info("query", zap.Any("request", req))
}