fix: Update query services to use Request type

Moves idpe.QueryService into platform/query.ProxyQueryService
Splits the Request into ProxyRequest and Request.

Changes query.QueryService and query.AsyncQueryService to use a Request
type. This means that the Compiler interface is consumed by the service
to abstract out transpilation vs Flux compilation vs raw spec.

The transpiler handler is removed.

There are separate http handlers and service implementations for each of
the three query services.

Query logging types are moved into platform.

The ResultIterator now expects Cancel to always be called.

The fluxd binary exposes the query endpoint specified in the swagger
file.
pull/10616/head
Nathaniel Cook 2018-07-25 12:27:10 -06:00
parent 31015cb322
commit 5bde0b5be6
36 changed files with 1774 additions and 1094 deletions

View File

@ -37,8 +37,7 @@ SOURCES_NO_VENDOR := $(shell find . -path ./vendor -prune -o -name "*.go" -not -
CMDS := \
bin/$(GOOS)/influx \
bin/$(GOOS)/idpd \
bin/$(GOOS)/fluxd \
bin/$(GOOS)/transpilerd
bin/$(GOOS)/fluxd
# List of utilities to build as part of the build process
UTILS := \

View File

@ -104,15 +104,17 @@ func fluxF(cmd *cobra.Command, args []string) {
orgName, err := getStrList("ORGANIZATION_NAME")
if err != nil {
return
logger.Error("failed to get organization name", zap.Error(err))
}
orgSvc := StaticOrganizationService{Name: orgName[0]}
orgSvc := &StaticOrganizationService{Name: orgName[0]}
queryHandler := http.NewQueryHandler()
queryHandler.QueryService = query.QueryServiceBridge{
AsyncQueryService: c,
queryHandler := http.NewExternalQueryHandler()
queryHandler.ProxyQueryService = query.ProxyQueryServiceBridge{
QueryService: query.QueryServiceBridge{
AsyncQueryService: c,
},
}
queryHandler.OrganizationService = &orgSvc
queryHandler.OrganizationService = orgSvc
queryHandler.Logger = logger.With(zap.String("handler", "query"))
handler := http.NewHandlerFromRegistry("query", reg)

View File

@ -1,15 +0,0 @@
# Transpilerd
Transpilerd is daemon that can execute queries from various source languages by transpiling the query and tranforming the result.
# Exposed Metrics
The `transpilerd` process exposes a Prometheus endpoint on port `8098` by default.
The following metrics are exposed:
| Metric Name | Prometheus Type | Labels | Description
| ------------- | --------------- | --------------- | --------------- |
| `http_api_requests_total` | counter | handler,method,path,status | Number of requests received on the server |
| `http_api_requests_duration_seconds` | histogram | handler,method,path,status | Histogram of times spent on all http requests |

View File

@ -1,135 +0,0 @@
package main
import (
"context"
"fmt"
"os"
"strings"
influxlogger "github.com/influxdata/influxdb/logger"
"github.com/influxdata/platform"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/kit/prom"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"go.uber.org/zap"
)
var transpileCmd = &cobra.Command{
Use: "transpilerd",
Short: "Transpiler Query Server",
Run: func(cmd *cobra.Command, args []string) {
logger := influxlogger.New(os.Stdout)
if err := transpileF(cmd, logger, args); err != nil && err != context.Canceled {
logger.Error("Encountered fatal error", zap.String("error", err.Error()))
os.Exit(1)
}
},
}
// Flags contains all the CLI flag values for transpilerd.
type Flags struct {
bindAddr string
}
var flags Flags
func init() {
viper.SetEnvPrefix("TRANSPILERD")
transpileCmd.PersistentFlags().StringVar(&flags.bindAddr, "bind-addr", ":8098", "The bind address for this daemon.")
viper.BindEnv("BIND_ADDR")
if b := viper.GetString("BIND_ADDR"); b != "" {
flags.bindAddr = b
}
// TODO(jsternberg): Connect directly to the storage hosts. There's no need to require proxying
// the requests through fluxd for this service.
transpileCmd.PersistentFlags().String("fluxd-hosts", "http://localhost:8093", "scheme://host:port address of the fluxd server.")
viper.BindEnv("FLUXD_HOSTS")
viper.BindPFlag("FLUXD_HOSTS", transpileCmd.PersistentFlags().Lookup("fluxd-hosts"))
// TODO(jsternberg): Determine how we are going to identify the organization id in open source.
transpileCmd.PersistentFlags().StringP("org-id", "", "0000000000000000", "id of the organization that owns the bucket")
viper.BindEnv("ORG_ID")
viper.BindPFlag("ORG_ID", transpileCmd.PersistentFlags().Lookup("org-id"))
}
func transpileF(cmd *cobra.Command, logger *zap.Logger, args []string) error {
hosts, err := discoverHosts()
if err != nil {
return err
} else if len(hosts) == 0 {
return errors.New("no fluxd hosts found")
}
// Retrieve the organization that we are using.
id, err := getOrganization()
if err != nil {
return err
}
reg := prom.NewRegistry()
reg.MustRegister(prometheus.NewGoCollector())
reg.WithLogger(logger)
// TODO(nathanielc): Allow QueryService to use multiple hosts.
logger.Info("Using fluxd service", zap.Strings("hosts", hosts), zap.Stringer("org-id", id))
transpileHandler := http.NewTranspilerQueryHandler(id)
transpileHandler.QueryService = &http.QueryService{
Addr: hosts[0],
}
//TODO(nathanielc): Add DBRPMappingService
transpileHandler.DBRPMappingService = nil
transpileHandler.Logger = logger
reg.MustRegister(transpileHandler.PrometheusCollectors()...)
//TODO(nathanielc): Add health checks
handler := http.NewHandlerFromRegistry("transpile", reg)
handler.Handler = transpileHandler
logger.Info("Starting transpilerd", zap.String("bind_addr", flags.bindAddr))
return http.ListenAndServe(flags.bindAddr, handler, logger)
}
func main() {
if err := transpileCmd.Execute(); err != nil {
os.Exit(1)
}
}
func getStrList(key string) ([]string, error) {
v := viper.GetViper()
valStr := v.GetString(key)
if valStr == "" {
return nil, errors.New("empty value")
}
return strings.Split(valStr, ","), nil
}
func getOrganization() (platform.ID, error) {
v := viper.GetViper()
orgID := v.GetString("ORG_ID")
if orgID == "" {
return nil, errors.New("must specify org-id")
}
var id platform.ID
if err := id.DecodeFromString(orgID); err != nil {
return nil, fmt.Errorf("unable to decode organization id: %s", err)
}
return id, nil
}
func discoverHosts() ([]string, error) {
fluxdHosts, err := getStrList("FLUXD_HOSTS")
if err != nil {
return nil, errors.Wrap(err, "failed to get fluxd hosts")
}
return fluxdHosts, nil
}

View File

@ -0,0 +1,150 @@
package http
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"unicode/utf8"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/csv"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
// ExternalQueryHandler implements the /query API endpoint defined in the swagger doc.
// This only implements the POST method and only supports Spec or Flux queries.
type ExternalQueryHandler struct {
*httprouter.Router
Logger *zap.Logger
ProxyQueryService query.ProxyQueryService
OrganizationService platform.OrganizationService
}
// NewExternalQueryHandler returns a new instance of QueryHandler.
func NewExternalQueryHandler() *ExternalQueryHandler {
h := &ExternalQueryHandler{
Router: httprouter.New(),
}
h.HandlerFunc("POST", "/query", h.handlePostQuery)
return h
}
func decodeQueryRequest(r *http.Request, req *query.ProxyRequest, orgSvc platform.OrganizationService) error {
orgName := r.FormValue("organization")
if orgName == "" {
return errors.New(`missing the "organization" parameter`)
}
o, err := orgSvc.FindOrganization(r.Context(), platform.OrganizationFilter{Name: &orgName})
if err != nil {
return err
}
req.Request.OrganizationID = o.ID
request := struct {
Spec *query.Spec `json:"spec"`
Dialect struct {
Header bool `json:"header"`
Delimiter string `json:"delimiter"`
CommentPrefix string `json:"commentPrefix"`
DateTimeFormat string `json:"dateTimeFormat"`
Annotations []string `json:"annotations"`
}
}{}
// Set defaults
request.Dialect.Header = true
request.Dialect.Delimiter = ","
// TODO(nathanielc): Set commentPrefix and dateTimeFormat defaults
// once they are supported.
switch r.Header.Get("Content-Type") {
case "application/json":
err := json.NewDecoder(r.Body).Decode(&request)
if err != nil {
return err
}
req.Request.Compiler = query.SpecCompiler{
Spec: request.Spec,
}
default:
q := r.FormValue("query")
if q == "" {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
q = string(data)
}
req.Request.Compiler = query.FluxCompiler{
Query: q,
}
}
switch r.Header.Get("Accept") {
case "text/csv":
fallthrough
default:
var delimiter rune
dialect := request.Dialect
if dialect.Delimiter != "" {
delimiter, _ = utf8.DecodeRuneInString(dialect.Delimiter)
}
if dialect.CommentPrefix != "" {
return errors.New("commentPrefix is not yet supported")
}
if dialect.DateTimeFormat != "" {
return errors.New("dateTimeFormat is not yet supported")
}
config := csv.ResultEncoderConfig{
NoHeader: !dialect.Header,
Delimiter: delimiter,
Annotations: dialect.Annotations,
}
req.Dialect = csv.Dialect{
ResultEncoderConfig: config,
}
}
return nil
}
func (h *ExternalQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req query.ProxyRequest
if err := decodeQueryRequest(r, &req, h.OrganizationService); err != nil {
EncodeError(ctx, err, w)
return
}
hd, ok := req.Dialect.(HTTPDialect)
if !ok {
EncodeError(ctx, fmt.Errorf("unsupported dialect over HTTP %T", req.Dialect), w)
return
}
hd.SetHeaders(w)
n, err := h.ProxyQueryService.Query(ctx, w, &req)
if err != nil {
if n == 0 {
// Only record the error headers IFF nothing has been written to w.
EncodeError(ctx, err, w)
return
}
h.Logger.Info("Error writing response to client",
zap.String("handler", "transpilerde"),
zap.Error(err),
)
}
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (h *ExternalQueryHandler) PrometheusCollectors() []prometheus.Collector {
// TODO: gather and return relevant metrics.
return nil
}

118
http/proxy_query_service.go Normal file
View File

@ -0,0 +1,118 @@
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/influxdata/platform/query"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
const (
proxyQueryPath = "/v1/query"
)
type ProxyQueryHandler struct {
*httprouter.Router
Logger *zap.Logger
ProxyQueryService query.ProxyQueryService
CompilerMappings query.CompilerMappings
DialectMappings query.DialectMappings
}
// NewProxyQueryHandler returns a new instance of ProxyQueryHandler.
func NewProxyQueryHandler() *ProxyQueryHandler {
h := &ProxyQueryHandler{
Router: httprouter.New(),
}
h.HandlerFunc("POST", proxyQueryPath, h.handlePostQuery)
return h
}
// HTTPDialect is an encoding dialect that can write metadata to HTTP headers
type HTTPDialect interface {
SetHeaders(w http.ResponseWriter)
}
// handlePostQuery handles query requests.
func (h *ProxyQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req query.ProxyRequest
req.WithCompilerMappings(h.CompilerMappings)
req.WithDialectMappings(h.DialectMappings)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
EncodeError(ctx, err, w)
return
}
hd, ok := req.Dialect.(HTTPDialect)
if !ok {
EncodeError(ctx, fmt.Errorf("unsupported dialect over HTTP %T", req.Dialect), w)
return
}
hd.SetHeaders(w)
n, err := h.ProxyQueryService.Query(ctx, w, &req)
if err != nil {
if n == 0 {
// Only record the error headers IFF nothing has been written to w.
EncodeError(ctx, err, w)
return
}
h.Logger.Info("Error writing response to client",
zap.String("handler", "transpilerde"),
zap.Error(err),
)
}
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (h *ProxyQueryHandler) PrometheusCollectors() []prometheus.Collector {
// TODO: gather and return relevant metrics.
return nil
}
type ProxyQueryService struct {
Addr string
Token string
InsecureSkipVerify bool
}
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
u, err := newURL(s.Addr, proxyQueryPath)
if err != nil {
return 0, err
}
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(req); err != nil {
return 0, err
}
hreq, err := http.NewRequest("POST", u.String(), &body)
if err != nil {
return 0, err
}
hreq.Header.Set("Authorization", s.Token)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(hreq)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return 0, err
}
return io.Copy(w, resp.Body)
}

View File

@ -5,13 +5,11 @@ import (
"context"
"encoding/json"
"net/http"
"net/url"
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/csv"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
@ -26,17 +24,19 @@ type QueryHandler struct {
Logger *zap.Logger
csvEncoder query.MultiResultEncoder
csvDialect csv.Dialect
QueryService query.QueryService
OrganizationService platform.OrganizationService
QueryService query.QueryService
CompilerMappings query.CompilerMappings
}
// NewQueryHandler returns a new instance of QueryHandler.
func NewQueryHandler() *QueryHandler {
h := &QueryHandler{
Router: httprouter.New(),
csvEncoder: csv.NewMultiResultEncoder(csv.DefaultEncoderConfig()),
Router: httprouter.New(),
csvDialect: csv.Dialect{
ResultEncoderConfig: csv.DefaultEncoderConfig(),
},
}
h.HandlerFunc("GET", "/ping", h.handlePing)
@ -54,66 +54,29 @@ func (h *QueryHandler) handlePing(w http.ResponseWriter, r *http.Request) {
func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var orgID platform.ID
if id := r.FormValue("orgID"); id != "" {
err := orgID.DecodeFromString(id)
if err != nil {
EncodeError(ctx, errors.Wrap(err, "failed to decode orgID", errors.MalformedData), w)
return
}
}
if name := r.FormValue("orgName"); name != "" {
org, err := h.OrganizationService.FindOrganization(ctx, platform.OrganizationFilter{
Name: &name,
})
if err != nil {
EncodeError(ctx, errors.Wrap(err, "failed to load organization", errors.MalformedData), w)
return
}
orgID = org.ID
}
if len(orgID) == 0 {
EncodeError(ctx, errors.New("must pass organization name or ID as string in orgName or orgID parameter", errors.MalformedData), w)
var req query.Request
req.WithCompilerMappings(h.CompilerMappings)
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
EncodeError(ctx, err, w)
return
}
var results query.ResultIterator
if r.Header.Get("Content-type") == "application/json" {
req, err := decodePostQueryRequest(ctx, r)
if err != nil {
EncodeError(ctx, errors.Wrap(err, "Failed to decode query request", errors.MalformedData), w)
return
}
rs, err := h.QueryService.Query(ctx, orgID, req.Spec)
if err != nil {
EncodeError(ctx, err, w)
return
}
results = rs
} else {
queryStr := r.FormValue("q")
if queryStr == "" {
EncodeError(ctx, errors.New("must pass query string in q parameter", errors.MalformedData), w)
return
}
rs, err := h.QueryService.QueryWithCompile(ctx, orgID, queryStr)
if err != nil {
EncodeError(ctx, err, w)
return
}
results = rs
results, err := h.QueryService.Query(ctx, &req)
if err != nil {
EncodeError(ctx, err, w)
return
}
// Always cancel the results to free resources.
// If all results were consumed cancelling does nothing.
defer results.Cancel()
// Setup headers
stats, hasStats := results.(query.Statisticser)
if hasStats {
w.Header().Set("Trailer", statsTrailer)
}
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
w.Header().Set("Transfer-Encoding", "chunked")
// NOTE: We do not write the headers here.
// NOTE: We do not write out the headers here.
// It is possible that if the encoding step fails
// that we can write an error header so long as
// the encoder did not write anything.
@ -124,7 +87,9 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
case "text/csv":
fallthrough
default:
n, err := h.csvEncoder.Encode(w, results)
h.csvDialect.SetHeaders(w)
encoder := h.csvDialect.Encoder()
n, err := encoder.Encode(w, results)
if err != nil {
if n == 0 {
// If the encoder did not write anything, we can write an error header.
@ -148,19 +113,10 @@ func (h *QueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
}
}
type postQueryRequest struct {
Spec *query.Spec `json:"spec"`
}
func decodePostQueryRequest(ctx context.Context, r *http.Request) (*postQueryRequest, error) {
s := new(query.Spec)
if err := json.NewDecoder(r.Body).Decode(s); err != nil {
return nil, err
}
return &postQueryRequest{
Spec: s,
}, nil
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (h *QueryHandler) PrometheusCollectors() []prometheus.Collector {
// TODO: gather and return relevant metrics.
return nil
}
type QueryService struct {
@ -169,68 +125,31 @@ type QueryService struct {
InsecureSkipVerify bool
}
func (s *QueryService) Query(ctx context.Context, orgID platform.ID, query *query.Spec) (query.ResultIterator, error) {
func (s *QueryService) Query(ctx context.Context, req *query.Request) (query.ResultIterator, error) {
u, err := newURL(s.Addr, queryPath)
if err != nil {
return nil, err
}
values := url.Values{}
values.Set("orgID", orgID.String())
u.RawQuery = values.Encode()
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(req); err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), &buf)
hreq, err := http.NewRequest("POST", u.String(), &body)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", s.Token)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "text/csv")
hreq.Header.Set("Authorization", s.Token)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
resp, err := hc.Do(hreq)
if err != nil {
return nil, err
}
return s.processResponse(resp)
}
func (s *QueryService) QueryWithCompile(ctx context.Context, orgID platform.ID, query string) (query.ResultIterator, error) {
u, err := newURL(s.Addr, queryPath)
if err != nil {
return nil, err
}
values := url.Values{}
values.Set("q", query)
values.Set("orgID", orgID.String())
u.RawQuery = values.Encode()
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", s.Token)
req.Header.Set("Accept", "text/csv")
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(req)
if err != nil {
return nil, err
}
return s.processResponse(resp)
}
func (s *QueryService) processResponse(resp *http.Response) (query.ResultIterator, error) {
if err := CheckError(resp); err != nil {
return nil, err
}
// TODO(jsternberg): Handle a 204 response?
var decoder query.MultiResultDecoder
switch resp.Header.Get("Content-Type") {
case "text/csv":
@ -238,43 +157,41 @@ func (s *QueryService) processResponse(resp *http.Response) (query.ResultIterato
default:
decoder = csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
}
result, err := decoder.Decode(resp.Body)
results, err := decoder.Decode(resp.Body)
if err != nil {
return nil, err
}
return &statsResultIterator{
result: result,
resp: resp,
}, nil
statResults := &statsResultIterator{
results: results,
resp: resp,
}
return statResults, nil
}
// statsResultIterator implements query.ResultIterator and query.Statisticser by reading the HTTP trailers.
type statsResultIterator struct {
result query.ResultIterator
results query.ResultIterator
resp *http.Response
statisitcs query.Statistics
err error
}
func (s *statsResultIterator) More() bool {
more := s.result.More()
if !more {
s.readStats()
}
return more
return s.results.More()
}
func (s *statsResultIterator) Next() query.Result {
return s.result.Next()
return s.results.Next()
}
func (s *statsResultIterator) Cancel() {
s.result.Cancel()
s.results.Cancel()
s.readStats()
}
func (s *statsResultIterator) Err() error {
err := s.result.Err()
err := s.results.Err()
if err != nil {
return err
}

View File

@ -274,6 +274,11 @@ paths:
type: string
enum:
- application/json
- in: query
name: organization
description: specifies the name of the organization executing the query.
schema:
type: string
- in: query
name: query
description: flux query string to execute; used if there is no POST body.
@ -287,6 +292,16 @@ paths:
oneOf:
- $ref: "#/components/schemas/Query"
- $ref: "#/components/schemas/QuerySpecification"
application/x-www-form-urlencoded:
schema:
type: object
properties:
organization:
description: specifies the name of the organization executing the query.
type: string
query:
description: flux query string to execute
type: string
responses:
'200':
description: query results
@ -1058,11 +1073,6 @@ components:
type: string
default: ","
maxLength: 1
quoteChar:
description: set the quoting char for strings; cannot be the same as delimiter
type: string
default: \"
maxLength: 1
annotations:
description: https://www.w3.org/TR/2015/REC-tabular-data-model-20151217/#columns
type: array

View File

@ -1,105 +0,0 @@
package http
import (
"errors"
"net/http"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
type TranspilerQueryHandler struct {
*httprouter.Router
Logger *zap.Logger
QueryService query.QueryService
DBRPMappingService platform.DBRPMappingService
OrgID platform.ID
}
// NewQueryHandler returns a new instance of QueryHandler.
func NewTranspilerQueryHandler(orgID platform.ID) *TranspilerQueryHandler {
h := &TranspilerQueryHandler{
OrgID: orgID,
Router: httprouter.New(),
Logger: zap.NewNop(),
}
h.HandlerFunc("GET", "/ping", h.servePing)
h.HandlerFunc("POST", "/query", h.handlePostQuery)
return h
}
// servePing returns a simple response to let the client know the server is running.
// This handler is only available for 1.x compatibility and is not part of the 2.0 platform.
func (h *TranspilerQueryHandler) servePing(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusNoContent)
}
// handlePostInfluxQL handles query requests mirroring the 1.x influxdb API.
func (h *TranspilerQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
queryStr := r.FormValue("q")
if queryStr == "" {
EncodeError(ctx, errors.New("must pass query string in q parameter"), w)
return
}
// Create a new transpiler from the http request.
ce := influxqlCE
transpiler := ce.transpiler(r, h.DBRPMappingService)
// Run the transpiler against the query service.
results, err := query.QueryWithTranspile(ctx, h.OrgID, queryStr, h.QueryService, transpiler)
if err != nil {
EncodeError(ctx, err, w)
return
}
// Once we have reached this stage, it is the encoder's job to encode any
// errors in the encoded format.
if err := encodeResult(w, results, ce.contentType, ce.encoder); err != nil {
h.Logger.Info("Unable to encode result", zap.Error(err))
return
}
}
// PrometheusCollectors satisfies the prom.PrometheusCollector interface.
func (h *TranspilerQueryHandler) PrometheusCollectors() []prometheus.Collector {
// TODO: return relevant prometheus collectors.
return nil
}
// crossExecute contains the components needed to execute a transpiled query and encode results.
type crossExecute struct {
transpiler func(req *http.Request, dbrpMappingSvc platform.DBRPMappingService) query.Transpiler
encoder query.MultiResultEncoder
contentType string
}
var influxqlCE = crossExecute{
transpiler: func(req *http.Request, dbrpMappingSvc platform.DBRPMappingService) query.Transpiler {
config := influxql.Config{
DefaultDatabase: req.FormValue("db"),
DefaultRetentionPolicy: req.FormValue("rp"),
}
return influxql.NewTranspilerWithConfig(dbrpMappingSvc, config)
},
encoder: influxql.NewMultiResultEncoder(),
contentType: "application/json",
}
func encodeResult(w http.ResponseWriter, results query.ResultIterator, contentType string, encoder query.MultiResultEncoder) error {
w.Header().Set("Content-Type", contentType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
_, err := encoder.Encode(w, results)
return err
}

162
query/bridges.go Normal file
View File

@ -0,0 +1,162 @@
package query
import (
"context"
"io"
"sort"
)
// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface.
type QueryServiceBridge struct {
AsyncQueryService AsyncQueryService
}
func (b QueryServiceBridge) Query(ctx context.Context, req *Request) (ResultIterator, error) {
query, err := b.AsyncQueryService.Query(ctx, req)
if err != nil {
return nil, err
}
return newResultIterator(query), nil
}
// resultIterator implements a ResultIterator while consuming a Query
type resultIterator struct {
query Query
cancel chan struct{}
ready bool
results *MapResultIterator
}
func newResultIterator(q Query) *resultIterator {
return &resultIterator{
query: q,
cancel: make(chan struct{}),
}
}
func (r *resultIterator) More() bool {
if !r.ready {
select {
case <-r.cancel:
goto DONE
case results, ok := <-r.query.Ready():
if !ok {
goto DONE
}
r.ready = true
r.results = NewMapResultIterator(results)
}
}
if r.results.More() {
return true
}
DONE:
r.query.Done()
return false
}
func (r *resultIterator) Next() Result {
return r.results.Next()
}
func (r *resultIterator) Cancel() {
select {
case <-r.cancel:
default:
close(r.cancel)
}
r.query.Cancel()
}
func (r *resultIterator) Err() error {
return r.query.Err()
}
func (r *resultIterator) Statistics() Statistics {
return r.query.Statistics()
}
type MapResultIterator struct {
results map[string]Result
order []string
}
func NewMapResultIterator(results map[string]Result) *MapResultIterator {
order := make([]string, 0, len(results))
for k := range results {
order = append(order, k)
}
sort.Strings(order)
return &MapResultIterator{
results: results,
order: order,
}
}
func (r *MapResultIterator) More() bool {
return len(r.order) > 0
}
func (r *MapResultIterator) Next() Result {
next := r.order[0]
r.order = r.order[1:]
return r.results[next]
}
func (r *MapResultIterator) Cancel() {
}
func (r *MapResultIterator) Err() error {
return nil
}
type SliceResultIterator struct {
results []Result
}
func NewSliceResultIterator(results []Result) *SliceResultIterator {
return &SliceResultIterator{
results: results,
}
}
func (r *SliceResultIterator) More() bool {
return len(r.results) > 0
}
func (r *SliceResultIterator) Next() Result {
next := r.results[0]
r.results = r.results[1:]
return next
}
func (r *SliceResultIterator) Cancel() {
r.results = nil
}
func (r *SliceResultIterator) Err() error {
return nil
}
// ProxyQueryServiceBridge implements ProxyQueryService while consuming a QueryService interface.
type ProxyQueryServiceBridge struct {
QueryService QueryService
}
func (b ProxyQueryServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (n int64, err error) {
results, err := b.QueryService.Query(ctx, &req.Request)
if err != nil {
return 0, err
}
defer results.Cancel()
encoder := req.Dialect.Encoder()
n, err = encoder.Encode(w, results)
if err != nil {
return n, err
}
// The results iterator may have had an error independent of encoding errors.
return n, results.Err()
}

49
query/compiler.go Normal file
View File

@ -0,0 +1,49 @@
package query
import (
"context"
"time"
)
const (
FluxCompilerType = "flux"
SpecCompilerType = "spec"
)
// AddCompilerMappings adds the Flux specific compiler mappings.
func AddCompilerMappings(mappings CompilerMappings) error {
if err := mappings.Add(FluxCompilerType, func() Compiler {
return new(FluxCompiler)
}); err != nil {
return err
}
return mappings.Add(SpecCompilerType, func() Compiler {
return new(SpecCompiler)
})
}
// FluxCompiler compiles a Flux script into a spec.
type FluxCompiler struct {
Query string `json:"query"`
}
func (c FluxCompiler) Compile(ctx context.Context) (*Spec, error) {
return Compile(ctx, c.Query, time.Now())
}
func (c FluxCompiler) CompilerType() CompilerType {
return FluxCompilerType
}
// SpecCompiler implements Compiler by returning a known spec.
type SpecCompiler struct {
Spec *Spec `json:"spec"`
}
func (c SpecCompiler) Compile(ctx context.Context) (*Spec, error) {
return c.Spec, nil
}
func (c SpecCompiler) CompilerType() CompilerType {
return SpecCompilerType
}

View File

@ -68,12 +68,11 @@ func New(c Config) *Controller {
return ctrl
}
// QueryWithCompile submits a query for execution returning immediately.
// The query will first be compiled before submitting for execution.
// Query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
func (c *Controller) QueryWithCompile(ctx context.Context, orgID platform.ID, queryStr string) (query.Query, error) {
q := c.createQuery(ctx, orgID)
err := c.compileQuery(q, queryStr)
func (c *Controller) Query(ctx context.Context, req *query.Request) (query.Query, error) {
q := c.createQuery(ctx, req.OrganizationID)
err := c.compileQuery(q, req.Compiler)
if err != nil {
return nil, err
}
@ -81,23 +80,6 @@ func (c *Controller) QueryWithCompile(ctx context.Context, orgID platform.ID, qu
return q, err
}
// Query submits a query for execution returning immediately. Once a query
// is submitted, that is once a query is enqueued, the query spec must not
// be modified. Done must be called on any returned Query objects.
func (c *Controller) Query(ctx context.Context, orgID platform.ID, qSpec *query.Spec) (query.Query, error) {
q := c.createQuery(ctx, orgID)
q.spec = *qSpec
// Incoming query spec may have been produced by an entity other than the
// Flux interpreter, so we must set the default Now time if not already set.
if q.spec.Now.IsZero() {
q.spec.Now = q.now
}
err := c.enqueueQuery(q)
return q, err
}
func (c *Controller) createQuery(ctx context.Context, orgID platform.ID) *Query {
id := c.nextID()
labelValues := []string{
@ -125,14 +107,21 @@ func (c *Controller) createQuery(ctx context.Context, orgID platform.ID) *Query
}
}
func (c *Controller) compileQuery(q *Query, queryStr string) error {
func (c *Controller) compileQuery(q *Query, compiler query.Compiler) error {
if !q.tryCompile() {
return errors.New("failed to transition query to compiling state")
}
spec, err := query.Compile(q.compilingCtx, queryStr, q.now, query.Verbose(c.verbose))
spec, err := compiler.Compile(q.compilingCtx)
if err != nil {
return errors.Wrap(err, "failed to compile query")
}
// Incoming query spec may have been produced by an entity other than the
// Flux interpreter, so we must set the default Now time if not already set.
if spec.Now.IsZero() {
spec.Now = q.now
}
q.spec = *spec
return nil
}
@ -328,9 +317,10 @@ type Query struct {
ready chan map[string]query.Result
mu sync.Mutex
state State
cancel func()
mu sync.Mutex
state State
cancel func()
canceled bool
parentCtx,
compilingCtx,
@ -376,6 +366,11 @@ func (q *Query) Concurrency() int {
func (q *Query) Cancel() {
q.mu.Lock()
defer q.mu.Unlock()
if q.canceled {
// We have already been canceled
return
}
q.canceled = true
// call cancel func
q.cancel()

View File

@ -6,15 +6,21 @@ import (
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/query/execute"
"github.com/influxdata/platform/query/mock"
"github.com/influxdata/platform/query/plan"
)
// testQuerySpec is a spec that can be used for queries.
var testQuerySpec *query.Spec
var mockCompiler *mock.Compiler
func init() {
mockCompiler = new(mock.Compiler)
mockCompiler.CompileFn = func(ctx context.Context) (*query.Spec, error) {
return query.Compile(ctx, `from(bucket: "telegraf") |> range(start: -5m) |> mean()`, time.Now())
}
}
func TestController_CancelQuery(t *testing.T) {
done := make(chan struct{})
@ -27,9 +33,13 @@ func TestController_CancelQuery(t *testing.T) {
ctrl := New(Config{})
ctrl.executor = executor
req := &query.Request{
OrganizationID: platform.ID("a"),
Compiler: mockCompiler,
}
// Run a query that will cause the controller to stall.
q, err := ctrl.Query(context.Background(), nil, testQuerySpec)
q, err := ctrl.Query(context.Background(), req)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
@ -52,17 +62,9 @@ func TestController_CancelQuery(t *testing.T) {
}
}()
if _, err := ctrl.Query(ctx, nil, testQuerySpec); err == nil {
if _, err := ctrl.Query(ctx, req); err == nil {
t.Fatal("expected error")
} else if got, want := err, context.Canceled; got != want {
t.Fatalf("unexpected error: got=%q want=%q", got, want)
}
}
func init() {
spec, err := query.Compile(context.Background(), `from(bucket: "telegraf") |> range(start: -5m) |> mean()`, time.Now())
if err != nil {
panic(err)
}
testQuerySpec = spec
}

41
query/csv/dialect.go Normal file
View File

@ -0,0 +1,41 @@
package csv
import (
"net/http"
"github.com/influxdata/platform/query"
)
const DialectType = "csv"
// AddDialectMappings adds the influxql specific dialect mappings.
func AddDialectMappings(mappings query.DialectMappings) error {
return mappings.Add(DialectType, func() query.Dialect {
return &Dialect{
ResultEncoderConfig: DefaultEncoderConfig(),
}
})
}
// Dialect describes the output format of queries in CSV.
type Dialect struct {
ResultEncoderConfig
}
func (d Dialect) SetHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
w.Header().Set("Transfer-Encoding", "chunked")
}
func (d Dialect) Encoder() query.MultiResultEncoder {
return NewMultiResultEncoder(d.ResultEncoderConfig)
}
func (d Dialect) DialectType() query.DialectType {
return DialectType
}
func DefaultDialect() *Dialect {
return &Dialect{
ResultEncoderConfig: DefaultEncoderConfig(),
}
}

View File

@ -2,6 +2,7 @@ package main
import (
"bufio"
"bytes"
"context"
"fmt"
"io/ioutil"
@ -9,7 +10,6 @@ import (
"path/filepath"
"regexp"
"strings"
"time"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
@ -69,29 +69,34 @@ func main() {
return
}
qs := querytest.GetQueryServiceBridge()
qspec, err := query.Compile(context.Background(), string(querytext), time.Now().UTC())
if err != nil {
fmt.Printf("error compiling. \n query: \n %s \n err: %s", string(querytext), err)
return
pqs := querytest.GetProxyQueryServiceBridge()
req := &query.ProxyRequest{
Request: query.Request{
Compiler: querytest.FromCSVCompiler{
Compiler: query.FluxCompiler{
Query: string(querytext),
},
InputFile: incsv,
},
},
Dialect: csv.DefaultDialect(),
}
querytest.ReplaceFromSpec(qspec, incsv)
enc := csv.NewMultiResultEncoder(csv.DefaultEncoderConfig())
result, err := querytest.GetQueryEncodedResults(qs, qspec, incsv, enc)
var buf bytes.Buffer
_, err = pqs.Query(context.Background(), &buf, req)
if err != nil {
fmt.Printf("error: %s", err)
return
}
fmt.Printf("FLUX:\n %s\n\n", querytext)
fmt.Printf("CHECK RESULT:\n%s\n____________________________________________________________", result)
fmt.Printf("CHECK RESULT:\n%s\n____________________________________________________________", buf.String())
reader := bufio.NewReader(os.Stdin)
fmt.Print("Results ok (y/n)?: ")
text, _ := reader.ReadString('\n')
if text == "y\n" {
fmt.Printf("writing output file: %s", testName+".out.csv")
ioutil.WriteFile(testName+".out.csv", []byte(result), 0644)
ioutil.WriteFile(testName+".out.csv", buf.Bytes(), 0644)
}
}
}

View File

@ -1,12 +1,13 @@
package functions_test
import (
"bytes"
"context"
"io/ioutil"
"os"
"path/filepath"
"strings"
"testing"
"time"
"github.com/influxdata/platform"
"github.com/influxdata/platform/mock"
@ -57,7 +58,7 @@ var skipTests = map[string]string{
"drop_referenced": `failed to run query: function references unknown column "_field"`,
}
var qs = querytest.GetQueryServiceBridge()
var pqs = querytest.GetProxyQueryServiceBridge()
func withEachFluxFile(t testing.TB, fn func(prefix, caseName string)) {
dir, err := os.Getwd()
@ -80,7 +81,6 @@ func withEachFluxFile(t testing.TB, fn func(prefix, caseName string)) {
}
func Test_QueryEndToEnd(t *testing.T) {
influxqlTranspiler := influxql.NewTranspiler(dbrpMappingSvc)
withEachFluxFile(t, func(prefix, caseName string) {
reason, skip := skipTests[caseName]
@ -90,21 +90,23 @@ func Test_QueryEndToEnd(t *testing.T) {
if skip {
t.Skip(reason)
}
queryTester(t, qs, prefix, ".flux")
testFlux(t, pqs, prefix, ".flux")
})
t.Run(influxqlName, func(t *testing.T) {
if skip {
t.Skip(reason)
}
queryTranspileTester(t, influxqlTranspiler, qs, prefix, ".influxql")
testInfluxQL(t, pqs, prefix, ".influxql")
})
})
}
func Benchmark_QueryEndToEnd(b *testing.B) {
influxqlTranspiler := influxql.NewTranspiler(dbrpMappingSvc)
withEachFluxFile(b, func(prefix, caseName string) {
reason, skip := skipTests[caseName]
if skip {
b.Skip(reason)
}
fluxName := caseName + ".flux"
influxqlName := caseName + ".influxql"
@ -112,93 +114,108 @@ func Benchmark_QueryEndToEnd(b *testing.B) {
if skip {
b.Skip(reason)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queryTester(b, qs, prefix, ".flux")
testFlux(b, pqs, prefix, ".flux")
}
})
b.Run(influxqlName, func(b *testing.B) {
if skip {
b.Skip(reason)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
queryTranspileTester(b, influxqlTranspiler, qs, prefix, ".influxql")
testInfluxQL(b, pqs, prefix, ".influxql")
}
})
})
}
func queryTester(t testing.TB, qs query.QueryService, prefix, queryExt string) {
q, err := querytest.GetTestData(prefix, queryExt)
func testFlux(t testing.TB, pqs query.ProxyQueryService, prefix, queryExt string) {
q, err := ioutil.ReadFile(prefix + queryExt)
if err != nil {
t.Fatal(err)
}
csvOut, err := querytest.GetTestData(prefix, ".out.csv")
csvInFilename := prefix + ".in.csv"
csvOut, err := ioutil.ReadFile(prefix + ".out.csv")
if err != nil {
t.Fatal(err)
}
spec, err := query.Compile(context.Background(), q, time.Now().UTC())
if err != nil {
t.Fatalf("failed to compile: %v", err)
compiler := query.FluxCompiler{
Query: string(q),
}
req := &query.ProxyRequest{
Request: query.Request{
Compiler: querytest.FromCSVCompiler{
Compiler: compiler,
InputFile: csvInFilename,
},
},
Dialect: csv.DefaultDialect(),
}
csvIn := prefix + ".in.csv"
enc := csv.NewMultiResultEncoder(csv.DefaultEncoderConfig())
QueryTestCheckSpec(t, qs, spec, csvIn, csvOut, enc)
QueryTestCheckSpec(t, pqs, req, string(csvOut))
}
func queryTranspileTester(t testing.TB, transpiler query.Transpiler, qs query.QueryService, prefix, queryExt string) {
q, err := querytest.GetTestData(prefix, queryExt)
func testInfluxQL(t testing.TB, pqs query.ProxyQueryService, prefix, queryExt string) {
q, err := ioutil.ReadFile(prefix + queryExt)
if err != nil {
if os.IsNotExist(err) {
t.Skip("query missing")
} else {
if !os.IsNotExist(err) {
t.Fatal(err)
}
t.Skip("influxql query is missing")
}
csvOut, err := querytest.GetTestData(prefix, ".out.csv")
csvInFilename := prefix + ".in.csv"
csvOut, err := ioutil.ReadFile(prefix + ".out.csv")
if err != nil {
t.Fatal(err)
}
spec, err := transpiler.Transpile(context.Background(), q)
if err != nil {
t.Fatalf("failed to transpile: %v", err)
compiler := influxql.NewCompiler(dbrpMappingSvc)
compiler.Cluster = "cluster"
compiler.DB = "db0"
compiler.Query = string(q)
req := &query.ProxyRequest{
Request: query.Request{
Compiler: querytest.FromCSVCompiler{
Compiler: compiler,
InputFile: csvInFilename,
},
},
Dialect: csv.DefaultDialect(),
}
QueryTestCheckSpec(t, pqs, req, string(csvOut))
csvIn := prefix + ".in.csv"
enc := csv.NewMultiResultEncoder(csv.DefaultEncoderConfig())
QueryTestCheckSpec(t, qs, spec, csvIn, csvOut, enc)
// Rerun test for InfluxQL JSON dialect
req.Dialect = new(influxql.Dialect)
enc = influxql.NewMultiResultEncoder()
jsonOut, err := querytest.GetTestData(prefix, ".out.json")
jsonOut, err := ioutil.ReadFile(prefix + ".out.json")
if err != nil {
t.Logf("skipping json evaluation: %s", err)
return
if !os.IsNotExist(err) {
t.Fatal(err)
}
t.Skip("influxql expected json is missing")
}
QueryTestCheckSpec(t, qs, spec, csvIn, jsonOut, enc)
QueryTestCheckSpec(t, pqs, req, string(jsonOut))
}
func QueryTestCheckSpec(t testing.TB, qs query.QueryService, spec *query.Spec, inputFile, want string, enc query.MultiResultEncoder) {
func QueryTestCheckSpec(t testing.TB, pqs query.ProxyQueryService, req *query.ProxyRequest, want string) {
t.Helper()
querytest.ReplaceFromSpec(spec, inputFile)
got, err := querytest.GetQueryEncodedResults(qs, spec, inputFile, enc)
var buf bytes.Buffer
_, err := pqs.Query(context.Background(), &buf, req)
if err != nil {
t.Errorf("failed to run query: %v", err)
return
}
got := buf.String()
if g, w := strings.TrimSpace(got), strings.TrimSpace(want); g != w {
t.Errorf("result not as expected want(-) got (+):\n%v", diff.LineDiff(w, g))
}

View File

@ -13,4 +13,4 @@
,,10,2018-05-22T19:53:26Z,2018-05-22T19:54:16Z,2018-05-22T19:53:46Z,648,io_time,diskio,host.local,disk2
,,10,2018-05-22T19:53:26Z,2018-05-22T19:54:16Z,2018-05-22T19:53:56Z,648,io_time,diskio,host.local,disk2
,,10,2018-05-22T19:53:26Z,2018-05-22T19:54:16Z,2018-05-22T19:54:06Z,648,io_time,diskio,host.local,disk2
,,10,2018-05-22T19:53:26Z,2018-05-22T19:54:16Z,2018-05-22T19:54:16Z,648,io_time,diskio,host.local,disk2
,,10,2018-05-22T19:53:26Z,2018-05-22T19:54:16Z,2018-05-22T19:54:16Z,648,io_time,diskio,host.local,disk2

1 #datatype string long dateTime:RFC3339 dateTime:RFC3339 dateTime:RFC3339 long string string string string
13 10 2018-05-22T19:53:26Z 2018-05-22T19:54:16Z 2018-05-22T19:53:46Z 648 io_time diskio host.local disk2
14 10 2018-05-22T19:53:26Z 2018-05-22T19:54:16Z 2018-05-22T19:53:56Z 648 io_time diskio host.local disk2
15 10 2018-05-22T19:53:26Z 2018-05-22T19:54:16Z 2018-05-22T19:54:06Z 648 io_time diskio host.local disk2
16 10 2018-05-22T19:53:26Z 2018-05-22T19:54:16Z 2018-05-22T19:54:16Z 648 io_time diskio host.local disk2

View File

@ -1 +1 @@
{"results":[{"statement_id":0,"series":[{"name":"swap","columns":["time","used_percent"],"values":[["2018-05-22T19:53:26Z",82.9833984375],["2018-05-22T19:53:36Z",82.598876953125],["2018-05-22T19:53:46Z",82.598876953125],["2018-05-22T19:53:56Z",82.598876953125],["2018-05-22T19:54:06Z",82.598876953125],["2018-05-22T19:54:16Z",82.6416015625]]}]}]}
{"results":[{"statement_id":0,"series":[{"name":"swap","columns":["time","used_percent"],"values":[["2018-05-22T19:53:26Z",82.9833984375],["2018-05-22T19:53:36Z",82.598876953125],["2018-05-22T19:53:46Z",82.598876953125],["2018-05-22T19:53:56Z",82.598876953125],["2018-05-22T19:54:06Z",82.598876953125],["2018-05-22T19:54:16Z",82.6416015625]]}]}]}

View File

@ -0,0 +1,49 @@
package influxql
import (
"context"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
)
const CompilerType = "influxql"
// AddCompilerMappings adds the influxql specific compiler mappings.
func AddCompilerMappings(mappings query.CompilerMappings, dbrpMappingSvc platform.DBRPMappingService) error {
return mappings.Add(CompilerType, func() query.Compiler {
return NewCompiler(dbrpMappingSvc)
})
}
// Compiler is the transpiler to convert InfluxQL to a Flux specification.
type Compiler struct {
Cluster string `json:"cluster,omitempty"`
DB string `json:"db,omitempty"`
RP string `json:"rp,omitempty"`
Query string `json:"query"`
dbrpMappingSvc platform.DBRPMappingService
}
func NewCompiler(dbrpMappingSvc platform.DBRPMappingService) *Compiler {
return &Compiler{
dbrpMappingSvc: dbrpMappingSvc,
}
}
// Compile tranpiles the query into a specification.
func (c *Compiler) Compile(ctx context.Context) (*query.Spec, error) {
transpiler := NewTranspilerWithConfig(
c.dbrpMappingSvc,
Config{
Cluster: c.Cluster,
DefaultDatabase: c.DB,
DefaultRetentionPolicy: c.RP,
},
)
return transpiler.Transpile(ctx, c.Query)
}
func (c *Compiler) CompilerType() query.CompilerType {
return CompilerType
}

View File

@ -0,0 +1,12 @@
package influxql_test
import (
"testing"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
)
func TestCompiler(t *testing.T) {
var _ query.Compiler = (*influxql.Compiler)(nil)
}

91
query/influxql/dialect.go Normal file
View File

@ -0,0 +1,91 @@
package influxql
import (
"net/http"
"github.com/influxdata/platform/query"
)
const DialectType = "influxql"
// AddDialectMappings adds the influxql specific dialect mappings.
func AddDialectMappings(mappings query.DialectMappings) error {
return mappings.Add(DialectType, func() query.Dialect {
return new(Dialect)
})
}
// Dialect describes the output format of InfluxQL queries.
type Dialect struct {
TimeFormat TimeFormat // TimeFormat is the format of the timestamp; defaults to RFC3339Nano.
Encoding EncodingFormat // Encoding is the format of the results; defaults to JSON.
ChunkSize int // Chunks is the number of points per chunk encoding batch; defaults to 0 or no chunking.
Compression CompressionFormat // Compression is the compression of the result output; defaults to None.
}
func (d *Dialect) SetHeaders(w http.ResponseWriter) {
switch d.Encoding {
case JSON, JSONPretty:
w.Header().Set("Content-Type", "application/json")
case CSV:
w.Header().Set("Content-Type", "text/csv")
case Msgpack:
w.Header().Set("Content-Type", "application/x-msgpack")
}
}
func (d *Dialect) Encoder() query.MultiResultEncoder {
switch d.Encoding {
case JSON, JSONPretty:
return new(MultiResultEncoder)
default:
panic("not implemented")
}
}
func (d *Dialect) DialectType() query.DialectType {
return DialectType
}
// TimeFormat specifies the format of the timestamp in the query results.
type TimeFormat int
const (
// RFC3339Nano is the default format for timestamps for InfluxQL.
RFC3339Nano TimeFormat = iota
// Hour formats time as the number of hours in the unix epoch.
Hour
// Minute formats time as the number of minutes in the unix epoch.
Minute
// Second formats time as the number of seconds in the unix epoch.
Second
// Millisecond formats time as the number of milliseconds in the unix epoch.
Millisecond
// Microsecond formats time as the number of microseconds in the unix epoch.
Microsecond
// Nanosecond formats time as the number of nanoseconds in the unix epoch.
Nanosecond
)
// CompressionFormat is the format to compress the query results.
type CompressionFormat int
const (
// None does not compress the results and is the default.
None CompressionFormat = iota
// Gzip compresses the query results with gzip.
Gzip
)
// EncodingFormat is the output format for the query response content.
type EncodingFormat int
const (
// JSON marshals the response to JSON octets.
JSON EncodingFormat = iota
// JSONPretty marshals the response to JSON octets with idents.
JSONPretty
// CSV marshals the response to CSV.
CSV
// Msgpack has a similar structure as the JSON response. Used?
Msgpack
)

View File

@ -0,0 +1,12 @@
package influxql_test
import (
"testing"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/influxql"
)
func TestDialect(t *testing.T) {
var _ query.Dialect = (*influxql.Dialect)(nil)
}

50
query/logger.go Normal file
View File

@ -0,0 +1,50 @@
package query
import (
"time"
"github.com/influxdata/platform"
)
// Logger persists metadata about executed queries.
type Logger interface {
Log(Log) error
}
// Log captures a query and any relevant metadata for the query execution.
type Log struct {
// Time is the time the query was completed
Time time.Time
// OrganizationID is the ID of the organization that requested the query
OrganizationID platform.ID
// Error is any error encountered by the query
Error error
// ProxyRequest is the query request
ProxyRequest *ProxyRequest
// ResponseSize is the size in bytes of the query response
ResponseSize int64
// Statistics is a set of statistics about the query execution
Statistics Statistics
}
// Redact removes any sensitive information before logging
func (q *Log) Redact() {
if q.ProxyRequest != nil && q.ProxyRequest.Request.Authorization != nil {
// Make shallow copy of request
request := new(ProxyRequest)
*request = *q.ProxyRequest
// Make shallow copy of authorization
auth := new(platform.Authorization)
*auth = *q.ProxyRequest.Request.Authorization
// Redact authorization token
auth.Token = ""
// Apply redacted authorization
request.Request.Authorization = auth
// Apply redacted request
q.ProxyRequest = request
}
}

55
query/logging.go Normal file
View File

@ -0,0 +1,55 @@
package query
import (
"context"
"fmt"
"io"
"time"
)
// LoggingServiceBridge implements ProxyQueryService and logs the queries while consuming a QueryService interface.
type LoggingServiceBridge struct {
QueryService QueryService
QueryLogger Logger
}
// Query executes and logs the query.
func (s *LoggingServiceBridge) Query(ctx context.Context, w io.Writer, req *ProxyRequest) (n int64, err error) {
var stats Statistics
defer func() {
r := recover()
if r != nil {
err = fmt.Errorf("panic: %v", r)
}
log := Log{
OrganizationID: req.Request.OrganizationID,
ProxyRequest: req,
ResponseSize: n,
Time: time.Now(),
Statistics: stats,
}
if err != nil {
log.Error = err
}
s.QueryLogger.Log(log)
}()
results, err := s.QueryService.Query(ctx, &req.Request)
if err != nil {
return 0, err
}
defer results.Cancel()
// Check for any reported statistics
if s, ok := results.(Statisticser); ok {
stats = s.Statistics()
}
encoder := req.Dialect.Encoder()
n, err = encoder.Encode(w, results)
if err != nil {
return n, err
}
// The results iterator may have had an error independent of encoding errors.
return n, results.Err()
}

22
query/mock/compiler.go Normal file
View File

@ -0,0 +1,22 @@
package mock
import (
"context"
"github.com/influxdata/platform/query"
)
type Compiler struct {
CompileFn func(ctx context.Context) (*query.Spec, error)
Type query.CompilerType
}
func (c Compiler) Compile(ctx context.Context) (*query.Spec, error) {
return c.CompileFn(ctx)
}
func (c Compiler) CompilerType() query.CompilerType {
if c.Type == "" {
return "mockCompiler"
}
return c.Type
}

38
query/mock/service.go Normal file
View File

@ -0,0 +1,38 @@
package mock
import (
"context"
"io"
"github.com/influxdata/platform/query"
)
// ProxyQueryService mocks the idep QueryService for testing.
type ProxyQueryService struct {
QueryF func(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error)
}
// Query writes the results of the query request.
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
return s.QueryF(ctx, w, req)
}
// QueryService mocks the idep QueryService for testing.
type QueryService struct {
QueryF func(ctx context.Context, req *query.Request) (query.ResultIterator, error)
}
// Query writes the results of the query request.
func (s *QueryService) Query(ctx context.Context, req *query.Request) (query.ResultIterator, error) {
return s.QueryF(ctx, req)
}
// AsyncQueryService mocks the idep QueryService for testing.
type AsyncQueryService struct {
QueryF func(ctx context.Context, req *query.Request) (query.Query, error)
}
// Query writes the results of the query request.
func (s *AsyncQueryService) Query(ctx context.Context, req *query.Request) (query.Query, error) {
return s.QueryF(ctx, req)
}

View File

@ -2,25 +2,41 @@ package query
import (
"context"
"sort"
"encoding/json"
"fmt"
"io"
"time"
"github.com/influxdata/platform"
)
// QueryService represents a service for performing queries.
// QueryService represents a type capable of performing queries.
type QueryService interface {
// Query submits a query spec for execution returning a results iterator.
Query(ctx context.Context, orgID platform.ID, query *Spec) (ResultIterator, error)
// Query submits a query string for execution returning a results iterator.
QueryWithCompile(ctx context.Context, orgID platform.ID, query string) (ResultIterator, error)
// Query submits a query for execution returning a results iterator.
// Cancel must be called on any returned results to free resources.
Query(ctx context.Context, req *Request) (ResultIterator, error)
}
// AsyncQueryService represents a service for performing queries where the results are delivered asynchronously.
type AsyncQueryService interface {
// Query submits a query for execution returning immediately.
// Done must be called on any returned Query objects.
Query(ctx context.Context, req *Request) (Query, error)
}
// ProxyQueryService performs queries and encodes the result into a writer.
// The results are opaque to a ProxyQueryService.
type ProxyQueryService interface {
// Query performs the requested query and encodes the results into w.
// The number of bytes written to w is returned __independent__ of any error.
Query(ctx context.Context, w io.Writer, req *ProxyRequest) (int64, error)
}
// ResultIterator allows iterating through all results
// Cancel must be called to free resources.
// ResultIterators may implement Statisticser.
type ResultIterator interface {
// More indicates if there are more results.
// More must be called until it returns false in order to free all resources.
More() bool
// Next returns the next result.
@ -28,13 +44,207 @@ type ResultIterator interface {
Next() Result
// Cancel discards the remaining results.
// If not all results are going to be read, Cancel must be called to free resources.
// Cancel must always be called to free resources.
// It is safe to call Cancel multiple times.
Cancel()
// Err reports the first error encountered.
// Err will not report anything unless More has returned false,
// or the query has been cancelled.
Err() error
}
// Query represents an active query.
type Query interface {
// Spec returns the spec used to execute this query.
// Spec must not be modified.
Spec() *Spec
// Ready returns a channel that will deliver the query results.
// Its possible that the channel is closed before any results arrive,
// in which case the query should be inspected for an error using Err().
Ready() <-chan map[string]Result
// Done must always be called to free resources.
Done()
// Cancel will stop the query execution.
// Done must still be called to free resources.
// It is safe to call Cancel multiple times.
Cancel()
// Err reports any error the query may have encountered.
Err() error
Statisticser
}
// Request respresents the query to run.
type Request struct {
// Scope
Authorization *platform.Authorization `json:"authorization,omitempty"`
OrganizationID platform.ID `json:"organization_id"`
// Command
// Compiler converts the query to a specification to run against the data.
Compiler Compiler `json:"compiler"`
// compilerMappings maps compiler types to creation methods
compilerMappings CompilerMappings
}
// WithCompilerMappings sets the query type mappings on the request.
func (r *Request) WithCompilerMappings(mappings CompilerMappings) {
r.compilerMappings = mappings
}
// UnmarshalJSON populates the request from the JSON data.
// WithCompilerMappings must have been called or an error will occur.
func (r *Request) UnmarshalJSON(data []byte) error {
type Alias Request
raw := struct {
*Alias
CompilerType CompilerType `json:"compiler_type"`
Compiler json.RawMessage `json:"compiler"`
}{
Alias: (*Alias)(r),
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
createCompiler, ok := r.compilerMappings[raw.CompilerType]
if !ok {
return fmt.Errorf("unsupported compiler type %q", raw.CompilerType)
}
c := createCompiler()
if err := json.Unmarshal(raw.Compiler, c); err != nil {
return err
}
r.Compiler = c
return nil
}
func (r Request) MarshalJSON() ([]byte, error) {
type Alias Request
raw := struct {
Alias
CompilerType CompilerType `json:"compiler_type"`
}{
Alias: (Alias)(r),
CompilerType: r.Compiler.CompilerType(),
}
return json.Marshal(raw)
}
// Compiler produces a specification for the query.
type Compiler interface {
// Compile produces a specification for the query.
Compile(ctx context.Context) (*Spec, error)
CompilerType() CompilerType
}
// CompilerType is the name of a query compiler.
type CompilerType string
type CreateCompiler func() Compiler
type CompilerMappings map[CompilerType]CreateCompiler
func (m CompilerMappings) Add(t CompilerType, c CreateCompiler) error {
if _, ok := m[t]; ok {
return fmt.Errorf("duplicate compiler mapping for %q", t)
}
m[t] = c
return nil
}
// ProxyRequest specifies a query request and the dialect for the results.
type ProxyRequest struct {
// Request is the basic query request
Request Request `json:"request"`
// Dialect is the result encoder
Dialect Dialect `json:"dialect"`
// dialectMappings maps dialect types to creation methods
dialectMappings DialectMappings
}
// WithCompilerMappings sets the compiler type mappings on the request.
func (r *ProxyRequest) WithCompilerMappings(mappings CompilerMappings) {
r.Request.WithCompilerMappings(mappings)
}
// WithDialectMappings sets the dialect type mappings on the request.
func (r *ProxyRequest) WithDialectMappings(mappings DialectMappings) {
r.dialectMappings = mappings
}
// UnmarshalJSON populates the request from the JSON data.
// WithCompilerMappings and WithDialectMappings must have been called or an error will occur.
func (r *ProxyRequest) UnmarshalJSON(data []byte) error {
type Alias ProxyRequest
raw := struct {
*Alias
DialectType DialectType `json:"dialect_type"`
Dialect json.RawMessage `json:"dialect"`
}{
Alias: (*Alias)(r),
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
createDialect, ok := r.dialectMappings[raw.DialectType]
if !ok {
return fmt.Errorf("unsupported dialect type %q", raw.DialectType)
}
d := createDialect()
if err := json.Unmarshal(raw.Dialect, d); err != nil {
return err
}
r.Dialect = d
return nil
}
func (r ProxyRequest) MarshalJSON() ([]byte, error) {
type Alias ProxyRequest
raw := struct {
Alias
DialectType DialectType `json:"dialect_type"`
}{
Alias: (Alias)(r),
DialectType: r.Dialect.DialectType(),
}
return json.Marshal(raw)
}
// Dialect describes how to encode results.
type Dialect interface {
// Encoder creates an encoder for the results
Encoder() MultiResultEncoder
// DialectType report the type of the dialect
DialectType() DialectType
}
// DialectType is the name of a query result dialect.
type DialectType string
type CreateDialect func() Dialect
type DialectMappings map[DialectType]CreateDialect
func (m DialectMappings) Add(t DialectType, c CreateDialect) error {
if _, ok := m[t]; ok {
return fmt.Errorf("duplicate dialect mapping for %q", t)
}
m[t] = c
return nil
}
// Statisticser reports statisitcs about query processing.
type Statisticser interface {
// Statistics reports the statisitcs for the query.
@ -62,181 +272,3 @@ type Statistics struct {
// MaxAllocated is the maximum number of bytes the query allocated.
MaxAllocated int64 `json:"max_allocated"`
}
// AsyncQueryService represents a service for performing queries where the results are delivered asynchronously.
type AsyncQueryService interface {
// Query submits a query for execution returning immediately.
// The spec must not be modified while the query is still active.
// Done must be called on any returned Query objects.
Query(ctx context.Context, orgID platform.ID, query *Spec) (Query, error)
// QueryWithCompile submits a query for execution returning immediately.
// The query string will be compiled before submitting for execution.
// Done must be called on returned Query objects.
QueryWithCompile(ctx context.Context, orgID platform.ID, query string) (Query, error)
}
// Query represents an active query.
type Query interface {
// Spec returns the spec used to execute this query.
// Spec must not be modified.
Spec() *Spec
// Ready returns a channel that will deliver the query results.
// Its possible that the channel is closed before any results arrive,
// in which case the query should be inspected for an error using Err().
Ready() <-chan map[string]Result
// Done must always be called to free resources.
Done()
// Cancel will stop the query execution.
// Done must still be called to free resources.
Cancel()
// Err reports any error the query may have encountered.
Err() error
Statisticser
}
// QueryServiceBridge implements the QueryService interface while consuming the AsyncQueryService interface.
type QueryServiceBridge struct {
AsyncQueryService AsyncQueryService
}
func (b QueryServiceBridge) Query(ctx context.Context, orgID platform.ID, spec *Spec) (ResultIterator, error) {
query, err := b.AsyncQueryService.Query(ctx, orgID, spec)
if err != nil {
return nil, err
}
return newResultIterator(query), nil
}
func (b QueryServiceBridge) QueryWithCompile(ctx context.Context, orgID platform.ID, queryStr string) (ResultIterator, error) {
query, err := b.AsyncQueryService.QueryWithCompile(ctx, orgID, queryStr)
if err != nil {
return nil, err
}
return newResultIterator(query), nil
}
// resultIterator implements a ResultIterator while consuming a Query
type resultIterator struct {
query Query
cancel chan struct{}
ready bool
results *MapResultIterator
}
func newResultIterator(q Query) *resultIterator {
return &resultIterator{
query: q,
cancel: make(chan struct{}),
}
}
func (r *resultIterator) More() bool {
if !r.ready {
select {
case <-r.cancel:
goto DONE
case results, ok := <-r.query.Ready():
if !ok {
goto DONE
}
r.ready = true
r.results = NewMapResultIterator(results)
}
}
if r.results.More() {
return true
}
DONE:
r.query.Done()
return false
}
func (r *resultIterator) Next() Result {
return r.results.Next()
}
func (r *resultIterator) Cancel() {
select {
case <-r.cancel:
default:
close(r.cancel)
}
r.query.Cancel()
}
func (r *resultIterator) Err() error {
return r.query.Err()
}
func (r *resultIterator) Statistics() Statistics {
return r.query.Statistics()
}
type MapResultIterator struct {
results map[string]Result
order []string
}
func NewMapResultIterator(results map[string]Result) *MapResultIterator {
order := make([]string, 0, len(results))
for k := range results {
order = append(order, k)
}
sort.Strings(order)
return &MapResultIterator{
results: results,
order: order,
}
}
func (r *MapResultIterator) More() bool {
return len(r.order) > 0
}
func (r *MapResultIterator) Next() Result {
next := r.order[0]
r.order = r.order[1:]
return r.results[next]
}
func (r *MapResultIterator) Cancel() {
}
func (r *MapResultIterator) Err() error {
return nil
}
type SliceResultIterator struct {
results []Result
}
func NewSliceResultIterator(results []Result) *SliceResultIterator {
return &SliceResultIterator{
results: results,
}
}
func (r *SliceResultIterator) More() bool {
return len(r.results) > 0
}
func (r *SliceResultIterator) Next() Result {
next := r.results[0]
r.results = r.results[1:]
return next
}
func (r *SliceResultIterator) Cancel() {
r.results = nil
}
func (r *SliceResultIterator) Err() error {
return nil
}

View File

@ -1,368 +1,129 @@
package query_test
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
_ "github.com/influxdata/platform/query/options"
"github.com/influxdata/platform/query/parser"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)
func init() {
query.FinalizeBuiltIns()
var CmpOpts = []cmp.Option{
cmpopts.IgnoreUnexported(query.ProxyRequest{}),
cmpopts.IgnoreUnexported(query.Request{}),
}
var ignoreUnexportedQuerySpec = cmpopts.IgnoreUnexported(query.Spec{})
func TestQuery_JSON(t *testing.T) {
srcData := []byte(`
{
"operations":[
{
"id": "from",
"kind": "from",
"spec": {
"db":"mydb"
}
},
{
"id": "range",
"kind": "range",
"spec": {
"start": "-4h",
"stop": "now"
}
},
{
"id": "sum",
"kind": "sum"
}
],
"edges":[
{"parent":"from","child":"range"},
{"parent":"range","child":"sum"}
]
}
`)
// Ensure we can properly unmarshal a query
gotQ := query.Spec{}
if err := json.Unmarshal(srcData, &gotQ); err != nil {
t.Fatal(err)
}
expQ := query.Spec{
Operations: []*query.Operation{
{
ID: "from",
Spec: &functions.FromOpSpec{
Database: "mydb",
},
},
{
ID: "range",
Spec: &functions.RangeOpSpec{
Start: query.Time{
Relative: -4 * time.Hour,
IsRelative: true,
},
Stop: query.Time{
IsRelative: true,
},
},
},
{
ID: "sum",
Spec: &functions.SumOpSpec{},
},
},
Edges: []query.Edge{
{Parent: "from", Child: "range"},
{Parent: "range", Child: "sum"},
},
}
if !cmp.Equal(gotQ, expQ, ignoreUnexportedQuerySpec) {
t.Errorf("unexpected query:\n%s", cmp.Diff(gotQ, expQ, ignoreUnexportedQuerySpec))
}
// Ensure we can properly marshal a query
data, err := json.Marshal(expQ)
if err != nil {
t.Fatal(err)
}
if err := json.Unmarshal(data, &gotQ); err != nil {
t.Fatal(err)
}
if !cmp.Equal(gotQ, expQ, ignoreUnexportedQuerySpec) {
t.Errorf("unexpected query after marshalling: -want/+got %s", cmp.Diff(expQ, gotQ, ignoreUnexportedQuerySpec))
}
type compilerA struct {
A string `json:"a"`
}
func TestQuery_Walk(t *testing.T) {
func (c compilerA) Compile(ctx context.Context) (*query.Spec, error) {
panic("not implemented")
}
func (c compilerA) CompilerType() query.CompilerType {
return "compilerA"
}
var compilerMappings = query.CompilerMappings{
"compilerA": func() query.Compiler { return new(compilerA) },
}
type dialectB struct {
B int `json:"b"`
}
func (d dialectB) Encoder() query.MultiResultEncoder {
panic("not implemented")
}
func (d dialectB) DialectType() query.DialectType {
return "dialectB"
}
var dialectMappings = query.DialectMappings{
"dialectB": func() query.Dialect { return new(dialectB) },
}
func TestRequest_JSON(t *testing.T) {
testCases := []struct {
query *query.Spec
walkOrder []query.OperationID
err error
name string
data string
want query.Request
}{
{
query: &query.Spec{},
err: errors.New("query has no root nodes"),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
name: "simple",
data: `{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"}`,
want: query.Request{
OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA},
Compiler: &compilerA{
A: "my custom compiler",
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "a", Child: "c"},
},
},
err: errors.New("edge references unknown child operation \"c\""),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "b"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "a", Child: "b"},
},
},
err: errors.New("found duplicate operation ID \"b\""),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "b"},
},
},
err: errors.New("found cycle in query"),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "d"},
{Parent: "d", Child: "b"},
},
},
err: errors.New("found cycle in query"),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "d"},
},
},
walkOrder: []query.OperationID{
"a", "b", "c", "d",
},
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "a", Child: "c"},
{Parent: "b", Child: "d"},
{Parent: "c", Child: "d"},
},
},
walkOrder: []query.OperationID{
"a", "c", "b", "d",
},
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "c"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "d"},
},
},
walkOrder: []query.OperationID{
"b", "a", "c", "d",
},
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "c"},
{Parent: "b", Child: "d"},
},
},
walkOrder: []query.OperationID{
"b", "d", "a", "c",
},
},
}
for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i), func(t *testing.T) {
var gotOrder []query.OperationID
err := tc.query.Walk(func(o *query.Operation) error {
gotOrder = append(gotOrder, o.ID)
return nil
})
if tc.err == nil {
if err != nil {
t.Fatal(err)
}
} else {
if err == nil {
t.Fatalf("expected error: %q", tc.err)
} else if got, exp := err.Error(), tc.err.Error(); got != exp {
t.Fatalf("unexpected errors: got %q exp %q", got, exp)
}
}
for _, tc := range testCases {
var r query.Request
r.WithCompilerMappings(compilerMappings)
if !cmp.Equal(gotOrder, tc.walkOrder) {
t.Fatalf("unexpected walk order -want/+got %s", cmp.Diff(tc.walkOrder, gotOrder))
}
})
if err := json.Unmarshal([]byte(tc.data), &r); err != nil {
t.Fatal(err)
}
if !cmp.Equal(tc.want, r, CmpOpts...) {
t.Fatalf("unexpected request: -want/+got:\n%s", cmp.Diff(tc.want, r, CmpOpts...))
}
marshalled, err := json.Marshal(r)
if err != nil {
t.Fatal(err)
}
if got, want := string(marshalled), tc.data; got != want {
t.Fatalf("unexpected marshalled request: -want/+got:\n%s", cmp.Diff(want, got))
}
}
}
// Example_option demonstrates retrieving an option from the Flux interpreter
func Example_option() {
// Instantiate a new Flux interpreter with pre-populated option and global scopes
itrp := query.NewInterpreter()
// Retrieve the default value for an option
nowFunc := itrp.Option("now")
// The now option is a function value whose default behavior is to return
// the current system time when called. The function now() doesn't take
// any arguments so can be called with nil.
nowTime, _ := nowFunc.Function().Call(nil)
fmt.Fprintf(os.Stderr, "The current system time (UTC) is: %v\n", nowTime)
// Output:
}
// Example_setOption demonstrates setting an option from the Flux interpreter
func Example_setOption() {
// Instantiate a new Flux interpreter with pre-populated option and global scopes
itrp := query.NewInterpreter()
// Set a new option from the interpreter
itrp.SetOption("dummy_option", values.NewIntValue(3))
fmt.Printf("dummy_option = %d", itrp.Option("dummy_option").Int())
// Output: dummy_option = 3
}
// Example_overrideDefaultOptionExternally demonstrates how declaring an option
// in a Flux script will change that option's binding in the options scope of the interpreter.
func Example_overrideDefaultOptionExternally() {
queryString := `
now = () => 2018-07-13T00:00:00Z
what_time_is_it = now()`
itrp := query.NewInterpreter()
_, declarations := query.BuiltIns()
ast, _ := parser.NewAST(queryString)
semanticProgram, _ := semantic.New(ast, declarations)
// Evaluate program
itrp.Eval(semanticProgram)
// After evaluating the program, lookup the value of what_time_is_it
now, _ := itrp.GlobalScope().Lookup("what_time_is_it")
// what_time_is_it? Why it's ....
fmt.Printf("The new current time (UTC) is: %v", now)
// Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
}
// Example_overrideDefaultOptionInternally demonstrates how one can override a default
// option that is used in a query before that query is evaluated by the interpreter.
func Example_overrideDefaultOptionInternally() {
queryString := `what_time_is_it = now()`
itrp := query.NewInterpreter()
_, declarations := query.BuiltIns()
ast, _ := parser.NewAST(queryString)
semanticProgram, _ := semantic.New(ast, declarations)
// Define a new now function which returns a static time value of 2018-07-13T00:00:00.000000000Z
timeValue := time.Date(2018, 7, 13, 0, 0, 0, 0, time.UTC)
functionName := "newTime"
functionType := semantic.NewFunctionType(semantic.FunctionSignature{
ReturnType: semantic.Time,
})
functionCall := func(args values.Object) (values.Value, error) {
return values.NewTimeValue(values.ConvertTime(timeValue)), nil
func TestProxyRequest_JSON(t *testing.T) {
testCases := []struct {
name string
data string
want query.ProxyRequest
}{
{
name: "simple",
data: `{"request":{"organization_id":"aaaaaaaaaaaaaaaa","compiler":{"a":"my custom compiler"},"compiler_type":"compilerA"},"dialect":{"b":42},"dialect_type":"dialectB"}`,
want: query.ProxyRequest{
Request: query.Request{
OrganizationID: platform.ID{0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA},
Compiler: &compilerA{
A: "my custom compiler",
},
},
Dialect: &dialectB{
B: 42,
},
},
},
}
sideEffect := false
for _, tc := range testCases {
var pr query.ProxyRequest
pr.WithCompilerMappings(compilerMappings)
pr.WithDialectMappings(dialectMappings)
newNowFunc := values.NewFunction(functionName, functionType, functionCall, sideEffect)
// Override the default now function with the new one
itrp.SetOption("now", newNowFunc)
// Evaluate program
itrp.Eval(semanticProgram)
// After evaluating the program, lookup the value of what_time_is_it
now, _ := itrp.GlobalScope().Lookup("what_time_is_it")
// what_time_is_it? Why it's ....
fmt.Printf("The new current time (UTC) is: %v", now)
// Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
if err := json.Unmarshal([]byte(tc.data), &pr); err != nil {
t.Fatal(err)
}
if !cmp.Equal(tc.want, pr, CmpOpts...) {
t.Fatalf("unexpected proxy request: -want/+got:\n%s", cmp.Diff(tc.want, pr, CmpOpts...))
}
marshalled, err := json.Marshal(pr)
if err != nil {
t.Fatal(err)
}
if got, want := string(marshalled), tc.data; got != want {
t.Fatalf("unexpected marshalled proxy request: -want/+got:\n%s", cmp.Diff(want, got))
}
}
}

View File

@ -0,0 +1,22 @@
package querytest
import (
"context"
"github.com/influxdata/platform/query"
)
// FromCSVCompiler wraps a compiler and replaces all From operations with FromCSV
type FromCSVCompiler struct {
query.Compiler
InputFile string
}
func (c FromCSVCompiler) Compile(ctx context.Context) (*query.Spec, error) {
spec, err := c.Compiler.Compile(ctx)
if err != nil {
return nil, err
}
ReplaceFromSpec(spec, c.InputFile)
return spec, nil
}

View File

@ -1,12 +1,7 @@
package querytest
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"math"
"os"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
@ -22,7 +17,7 @@ func init() {
staticResultID.DecodeFromString("1")
}
func GetQueryServiceBridge() *query.QueryServiceBridge {
func GetProxyQueryServiceBridge() query.ProxyQueryServiceBridge {
config := control.Config{
ConcurrencyQuota: 1,
MemoryBytesQuota: math.MaxInt64,
@ -30,36 +25,13 @@ func GetQueryServiceBridge() *query.QueryServiceBridge {
c := control.New(config)
return &query.QueryServiceBridge{
AsyncQueryService: c,
return query.ProxyQueryServiceBridge{
QueryService: query.QueryServiceBridge{
AsyncQueryService: c,
},
}
}
func GetQueryEncodedResults(qs query.QueryService, spec *query.Spec, inputFile string, enc query.MultiResultEncoder) (string, error) {
results, err := qs.Query(context.Background(), staticResultID, spec)
if err != nil {
return "", err
}
buf := new(bytes.Buffer)
if _, err := enc.Encode(buf, results); err != nil {
return "", err
}
return buf.String(), results.Err()
}
func GetTestData(prefix, suffix string) (string, error) {
datafile := prefix + suffix
if _, err := os.Stat(datafile); err != nil {
return "", err
}
csv, err := ioutil.ReadFile(datafile)
if err != nil {
return "", fmt.Errorf("failed to open file: %s", datafile)
}
return string(csv), nil
}
func ReplaceFromSpec(q *query.Spec, csvSrc string) {
for _, op := range q.Operations {
if op.Spec.Kind() == functions.FromKind {

View File

@ -202,7 +202,14 @@ func (r *REPL) doQuery(spec *query.Spec) error {
defer cancelFunc()
defer r.clearCancel()
q, err := r.c.Query(ctx, r.orgID, spec)
req := &query.Request{
OrganizationID: r.orgID,
Compiler: query.SpecCompiler{
Spec: spec,
},
}
q, err := r.c.Query(ctx, req)
if err != nil {
return err
}

368
query/spec_test.go Normal file
View File

@ -0,0 +1,368 @@
package query_test
import (
"encoding/json"
"errors"
"fmt"
"os"
"strconv"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/functions"
_ "github.com/influxdata/platform/query/options"
"github.com/influxdata/platform/query/parser"
"github.com/influxdata/platform/query/semantic"
"github.com/influxdata/platform/query/values"
)
func init() {
query.FinalizeBuiltIns()
}
var ignoreUnexportedQuerySpec = cmpopts.IgnoreUnexported(query.Spec{})
func TestSpec_JSON(t *testing.T) {
srcData := []byte(`
{
"operations":[
{
"id": "from",
"kind": "from",
"spec": {
"db":"mydb"
}
},
{
"id": "range",
"kind": "range",
"spec": {
"start": "-4h",
"stop": "now"
}
},
{
"id": "sum",
"kind": "sum"
}
],
"edges":[
{"parent":"from","child":"range"},
{"parent":"range","child":"sum"}
]
}
`)
// Ensure we can properly unmarshal a query
gotQ := query.Spec{}
if err := json.Unmarshal(srcData, &gotQ); err != nil {
t.Fatal(err)
}
expQ := query.Spec{
Operations: []*query.Operation{
{
ID: "from",
Spec: &functions.FromOpSpec{
Database: "mydb",
},
},
{
ID: "range",
Spec: &functions.RangeOpSpec{
Start: query.Time{
Relative: -4 * time.Hour,
IsRelative: true,
},
Stop: query.Time{
IsRelative: true,
},
},
},
{
ID: "sum",
Spec: &functions.SumOpSpec{},
},
},
Edges: []query.Edge{
{Parent: "from", Child: "range"},
{Parent: "range", Child: "sum"},
},
}
if !cmp.Equal(gotQ, expQ, ignoreUnexportedQuerySpec) {
t.Errorf("unexpected query:\n%s", cmp.Diff(gotQ, expQ, ignoreUnexportedQuerySpec))
}
// Ensure we can properly marshal a query
data, err := json.Marshal(expQ)
if err != nil {
t.Fatal(err)
}
if err := json.Unmarshal(data, &gotQ); err != nil {
t.Fatal(err)
}
if !cmp.Equal(gotQ, expQ, ignoreUnexportedQuerySpec) {
t.Errorf("unexpected query after marshalling: -want/+got %s", cmp.Diff(expQ, gotQ, ignoreUnexportedQuerySpec))
}
}
func TestSpec_Walk(t *testing.T) {
testCases := []struct {
query *query.Spec
walkOrder []query.OperationID
err error
}{
{
query: &query.Spec{},
err: errors.New("query has no root nodes"),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "a", Child: "c"},
},
},
err: errors.New("edge references unknown child operation \"c\""),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "b"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "a", Child: "b"},
},
},
err: errors.New("found duplicate operation ID \"b\""),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "b"},
},
},
err: errors.New("found cycle in query"),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "d"},
{Parent: "d", Child: "b"},
},
},
err: errors.New("found cycle in query"),
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "d"},
},
},
walkOrder: []query.OperationID{
"a", "b", "c", "d",
},
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "b"},
{Parent: "a", Child: "c"},
{Parent: "b", Child: "d"},
{Parent: "c", Child: "d"},
},
},
walkOrder: []query.OperationID{
"a", "c", "b", "d",
},
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "c"},
{Parent: "b", Child: "c"},
{Parent: "c", Child: "d"},
},
},
walkOrder: []query.OperationID{
"b", "a", "c", "d",
},
},
{
query: &query.Spec{
Operations: []*query.Operation{
{ID: "a"},
{ID: "b"},
{ID: "c"},
{ID: "d"},
},
Edges: []query.Edge{
{Parent: "a", Child: "c"},
{Parent: "b", Child: "d"},
},
},
walkOrder: []query.OperationID{
"b", "d", "a", "c",
},
},
}
for i, tc := range testCases {
tc := tc
t.Run(strconv.Itoa(i), func(t *testing.T) {
var gotOrder []query.OperationID
err := tc.query.Walk(func(o *query.Operation) error {
gotOrder = append(gotOrder, o.ID)
return nil
})
if tc.err == nil {
if err != nil {
t.Fatal(err)
}
} else {
if err == nil {
t.Fatalf("expected error: %q", tc.err)
} else if got, exp := err.Error(), tc.err.Error(); got != exp {
t.Fatalf("unexpected errors: got %q exp %q", got, exp)
}
}
if !cmp.Equal(gotOrder, tc.walkOrder) {
t.Fatalf("unexpected walk order -want/+got %s", cmp.Diff(tc.walkOrder, gotOrder))
}
})
}
}
// Example_option demonstrates retrieving an option from the Flux interpreter
func Example_option() {
// Instantiate a new Flux interpreter with pre-populated option and global scopes
itrp := query.NewInterpreter()
// Retrieve the default value for an option
nowFunc := itrp.Option("now")
// The now option is a function value whose default behavior is to return
// the current system time when called. The function now() doesn't take
// any arguments so can be called with nil.
nowTime, _ := nowFunc.Function().Call(nil)
fmt.Fprintf(os.Stderr, "The current system time (UTC) is: %v\n", nowTime)
// Output:
}
// Example_setOption demonstrates setting an option from the Flux interpreter
func Example_setOption() {
// Instantiate a new Flux interpreter with pre-populated option and global scopes
itrp := query.NewInterpreter()
// Set a new option from the interpreter
itrp.SetOption("dummy_option", values.NewIntValue(3))
fmt.Printf("dummy_option = %d", itrp.Option("dummy_option").Int())
// Output: dummy_option = 3
}
// Example_overrideDefaultOptionExternally demonstrates how declaring an option
// in a Flux script will change that option's binding in the options scope of the interpreter.
func Example_overrideDefaultOptionExternally() {
queryString := `
now = () => 2018-07-13T00:00:00Z
what_time_is_it = now()`
itrp := query.NewInterpreter()
_, declarations := query.BuiltIns()
ast, _ := parser.NewAST(queryString)
semanticProgram, _ := semantic.New(ast, declarations)
// Evaluate program
itrp.Eval(semanticProgram)
// After evaluating the program, lookup the value of what_time_is_it
now, _ := itrp.GlobalScope().Lookup("what_time_is_it")
// what_time_is_it? Why it's ....
fmt.Printf("The new current time (UTC) is: %v", now)
// Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
}
// Example_overrideDefaultOptionInternally demonstrates how one can override a default
// option that is used in a query before that query is evaluated by the interpreter.
func Example_overrideDefaultOptionInternally() {
queryString := `what_time_is_it = now()`
itrp := query.NewInterpreter()
_, declarations := query.BuiltIns()
ast, _ := parser.NewAST(queryString)
semanticProgram, _ := semantic.New(ast, declarations)
// Define a new now function which returns a static time value of 2018-07-13T00:00:00.000000000Z
timeValue := time.Date(2018, 7, 13, 0, 0, 0, 0, time.UTC)
functionName := "newTime"
functionType := semantic.NewFunctionType(semantic.FunctionSignature{
ReturnType: semantic.Time,
})
functionCall := func(args values.Object) (values.Value, error) {
return values.NewTimeValue(values.ConvertTime(timeValue)), nil
}
sideEffect := false
newNowFunc := values.NewFunction(functionName, functionType, functionCall, sideEffect)
// Override the default now function with the new one
itrp.SetOption("now", newNowFunc)
// Evaluate program
itrp.Eval(semanticProgram)
// After evaluating the program, lookup the value of what_time_is_it
now, _ := itrp.GlobalScope().Lookup("what_time_is_it")
// what_time_is_it? Why it's ....
fmt.Printf("The new current time (UTC) is: %v", now)
// Output: The new current time (UTC) is: 2018-07-13T00:00:00.000000000Z
}

View File

@ -1,23 +0,0 @@
package query
import (
"context"
"github.com/influxdata/platform"
)
// Transpiler can convert a query from a source lanague into a query spec.
type Transpiler interface {
// Transpile will perform the transpilation.
Transpile(ctx context.Context, txt string) (*Spec, error)
}
// QueryWithTranspile executes a query by first transpiling the query.
func QueryWithTranspile(ctx context.Context, orgID platform.ID, q string, qs QueryService, transpiler Transpiler) (ResultIterator, error) {
spec, err := transpiler.Transpile(ctx, q)
if err != nil {
return nil, err
}
return qs.Query(ctx, orgID, spec)
}

View File

@ -123,7 +123,13 @@ func (p *syncRunPromise) doQuery() {
return
}
it, err := p.svc.Query(p.ctx, p.t.Org, spec)
req := &query.Request{
OrganizationID: p.t.Org,
Compiler: query.SpecCompiler{
Spec: spec,
},
}
it, err := p.svc.Query(p.ctx, req)
if err != nil {
// Assume the error should not be part of the runResult.
p.finish(nil, err)
@ -176,7 +182,13 @@ func (e *asyncQueryServiceExecutor) Execute(ctx context.Context, run backend.Que
return nil, err
}
q, err := e.svc.Query(ctx, t.Org, spec)
req := &query.Request{
OrganizationID: t.Org,
Compiler: query.SpecCompiler{
Spec: spec,
},
}
q, err := e.svc.Query(ctx, req)
if err != nil {
return nil, err
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"reflect"
"sync"
@ -48,7 +49,7 @@ func newFakeQueryService() *fakeQueryService {
return &fakeQueryService{queries: make(map[string]*fakeQuery)}
}
func (s *fakeQueryService) Query(ctx context.Context, orgID platform.ID, q *query.Spec) (query.Query, error) {
func (s *fakeQueryService) Query(ctx context.Context, req *query.Request) (query.Query, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.queryErr != nil {
@ -57,30 +58,17 @@ func (s *fakeQueryService) Query(ctx context.Context, orgID platform.ID, q *quer
return nil, err
}
fq := &fakeQuery{
wait: make(chan struct{}),
ready: make(chan map[string]query.Result),
}
s.queries[makeSpecString(q)] = fq
go fq.run()
return fq, nil
}
func (s *fakeQueryService) QueryWithCompile(ctx context.Context, orgID platform.ID, q string) (query.Query, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.queryErr != nil {
err := s.queryErr
s.queryErr = nil
return nil, err
sc, ok := req.Compiler.(query.SpecCompiler)
if !ok {
return nil, fmt.Errorf("fakeQueryService only supports the query.SpecCompiler, got %T", req.Compiler)
}
fq := &fakeQuery{
wait: make(chan struct{}),
ready: make(chan map[string]query.Result),
}
s.queries[makeSpecString(makeSpec(q))] = fq
s.queries[makeSpecString(sc.Spec)] = fq
go fq.run()
return fq, nil