2018-09-04 21:08:00 +00:00
|
|
|
package http
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
2019-01-24 00:15:42 +00:00
|
|
|
"errors"
|
2018-09-04 21:08:00 +00:00
|
|
|
"fmt"
|
2019-04-10 21:08:22 +00:00
|
|
|
"io"
|
2019-01-17 18:36:05 +00:00
|
|
|
"io/ioutil"
|
|
|
|
"mime"
|
2018-09-04 21:08:00 +00:00
|
|
|
"net/http"
|
2018-12-05 18:13:26 +00:00
|
|
|
"regexp"
|
|
|
|
"strconv"
|
2018-09-14 04:01:07 +00:00
|
|
|
"time"
|
2018-09-04 21:08:00 +00:00
|
|
|
"unicode/utf8"
|
|
|
|
|
2018-09-06 18:09:52 +00:00
|
|
|
"github.com/influxdata/flux"
|
2018-09-14 04:01:07 +00:00
|
|
|
"github.com/influxdata/flux/ast"
|
2018-09-06 18:09:52 +00:00
|
|
|
"github.com/influxdata/flux/csv"
|
2018-09-11 22:56:51 +00:00
|
|
|
"github.com/influxdata/flux/lang"
|
2018-12-05 18:13:26 +00:00
|
|
|
"github.com/influxdata/flux/parser"
|
2019-04-10 20:25:02 +00:00
|
|
|
"github.com/influxdata/flux/repl"
|
2019-02-21 21:11:50 +00:00
|
|
|
"github.com/influxdata/influxdb"
|
2019-11-22 09:12:36 +00:00
|
|
|
"github.com/influxdata/influxdb/jsonweb"
|
2019-01-08 00:37:16 +00:00
|
|
|
"github.com/influxdata/influxdb/query"
|
2018-12-05 18:13:26 +00:00
|
|
|
"github.com/influxdata/influxql"
|
2018-09-04 21:08:00 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// QueryRequest is a flux query request.
|
|
|
|
type QueryRequest struct {
|
2019-02-21 21:11:50 +00:00
|
|
|
Extern *ast.File `json:"extern,omitempty"`
|
2018-09-06 18:09:52 +00:00
|
|
|
Spec *flux.Spec `json:"spec,omitempty"`
|
2019-01-04 18:08:07 +00:00
|
|
|
AST *ast.Package `json:"ast,omitempty"`
|
2018-09-04 21:08:00 +00:00
|
|
|
Query string `json:"query"`
|
|
|
|
Type string `json:"type"`
|
|
|
|
Dialect QueryDialect `json:"dialect"`
|
|
|
|
|
2019-02-21 21:11:50 +00:00
|
|
|
Org *influxdb.Organization `json:"-"`
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// QueryDialect is the formatting options for the query response.
|
|
|
|
type QueryDialect struct {
|
|
|
|
Header *bool `json:"header"`
|
|
|
|
Delimiter string `json:"delimiter"`
|
|
|
|
CommentPrefix string `json:"commentPrefix"`
|
|
|
|
DateTimeFormat string `json:"dateTimeFormat"`
|
|
|
|
Annotations []string `json:"annotations"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// WithDefaults adds default values to the request.
|
|
|
|
func (r QueryRequest) WithDefaults() QueryRequest {
|
|
|
|
if r.Type == "" {
|
|
|
|
r.Type = "flux"
|
|
|
|
}
|
|
|
|
if r.Dialect.Delimiter == "" {
|
|
|
|
r.Dialect.Delimiter = ","
|
|
|
|
}
|
|
|
|
if r.Dialect.DateTimeFormat == "" {
|
|
|
|
r.Dialect.DateTimeFormat = "RFC3339"
|
|
|
|
}
|
|
|
|
if r.Dialect.Header == nil {
|
|
|
|
header := true
|
|
|
|
r.Dialect.Header = &header
|
|
|
|
}
|
|
|
|
return r
|
|
|
|
}
|
|
|
|
|
|
|
|
// Validate checks the query request and returns an error if the request is invalid.
|
|
|
|
func (r QueryRequest) Validate() error {
|
2019-04-09 14:35:42 +00:00
|
|
|
// TODO(jsternberg): Remove this, but we are going to not mention
|
|
|
|
// the spec in the error if it is being used.
|
2018-09-14 04:01:07 +00:00
|
|
|
if r.Query == "" && r.Spec == nil && r.AST == nil {
|
2019-04-09 14:35:42 +00:00
|
|
|
return errors.New(`request body requires either query or AST`)
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
|
|
|
|
2019-02-21 21:11:50 +00:00
|
|
|
if r.Spec != nil && r.Extern != nil {
|
|
|
|
return &influxdb.Error{
|
|
|
|
Code: influxdb.EInvalid,
|
|
|
|
Msg: "request body cannot specify both a spec and external declarations",
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-04 21:08:00 +00:00
|
|
|
if r.Type != "flux" {
|
|
|
|
return fmt.Errorf(`unknown query type: %s`, r.Type)
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(r.Dialect.CommentPrefix) > 1 {
|
|
|
|
return fmt.Errorf("invalid dialect comment prefix: must be length 0 or 1")
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(r.Dialect.Delimiter) != 1 {
|
|
|
|
return fmt.Errorf("invalid dialect delimeter: must be length 1")
|
|
|
|
}
|
|
|
|
|
|
|
|
rune, size := utf8.DecodeRuneInString(r.Dialect.Delimiter)
|
|
|
|
if rune == utf8.RuneError && size == 1 {
|
|
|
|
return fmt.Errorf("invalid dialect delimeter character")
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, a := range r.Dialect.Annotations {
|
|
|
|
switch a {
|
|
|
|
case "group", "datatype", "default":
|
|
|
|
default:
|
|
|
|
return fmt.Errorf(`unknown dialect annotation type: %s`, a)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
switch r.Dialect.DateTimeFormat {
|
|
|
|
case "RFC3339", "RFC3339Nano":
|
|
|
|
default:
|
|
|
|
return fmt.Errorf(`unknown dialect date time format: %s`, r.Dialect.DateTimeFormat)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-12-05 18:13:26 +00:00
|
|
|
// QueryAnalysis is a structured response of errors.
|
|
|
|
type QueryAnalysis struct {
|
|
|
|
Errors []queryParseError `json:"errors"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type queryParseError struct {
|
|
|
|
Line int `json:"line"`
|
|
|
|
Column int `json:"column"`
|
|
|
|
Character int `json:"character"`
|
|
|
|
Message string `json:"message"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// Analyze attempts to parse the query request and returns any errors
|
|
|
|
// encountered in a structured way.
|
|
|
|
func (r QueryRequest) Analyze() (*QueryAnalysis, error) {
|
|
|
|
switch r.Type {
|
|
|
|
case "flux":
|
|
|
|
return r.analyzeFluxQuery()
|
|
|
|
case "influxql":
|
|
|
|
return r.analyzeInfluxQLQuery()
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil, fmt.Errorf("unknown query request type %s", r.Type)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r QueryRequest) analyzeFluxQuery() (*QueryAnalysis, error) {
|
|
|
|
a := &QueryAnalysis{}
|
2019-01-04 18:08:07 +00:00
|
|
|
pkg := parser.ParseSource(r.Query)
|
|
|
|
errCount := ast.Check(pkg)
|
|
|
|
if errCount == 0 {
|
2018-12-05 18:13:26 +00:00
|
|
|
a.Errors = []queryParseError{}
|
|
|
|
return a, nil
|
|
|
|
}
|
2019-01-04 18:08:07 +00:00
|
|
|
a.Errors = make([]queryParseError, 0, errCount)
|
|
|
|
ast.Walk(ast.CreateVisitor(func(node ast.Node) {
|
|
|
|
loc := node.Location()
|
|
|
|
for _, err := range node.Errs() {
|
|
|
|
a.Errors = append(a.Errors, queryParseError{
|
|
|
|
Line: loc.Start.Line,
|
|
|
|
Column: loc.Start.Column,
|
|
|
|
Message: err.Msg,
|
|
|
|
})
|
2018-12-05 18:13:26 +00:00
|
|
|
}
|
2019-01-04 18:08:07 +00:00
|
|
|
}), pkg)
|
2018-12-05 18:13:26 +00:00
|
|
|
return a, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r QueryRequest) analyzeInfluxQLQuery() (*QueryAnalysis, error) {
|
|
|
|
a := &QueryAnalysis{}
|
|
|
|
_, err := influxql.ParseQuery(r.Query)
|
|
|
|
if err == nil {
|
|
|
|
a.Errors = []queryParseError{}
|
|
|
|
return a, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
ms := influxqlParseErrorRE.FindAllStringSubmatch(err.Error(), -1)
|
|
|
|
a.Errors = make([]queryParseError, 0, len(ms))
|
|
|
|
for _, m := range ms {
|
|
|
|
if len(m) != 4 {
|
|
|
|
return nil, fmt.Errorf("influxql query error is not formatted as expected: got %d matches expected 4", len(m))
|
|
|
|
}
|
|
|
|
msg := m[1]
|
|
|
|
lineStr := m[2]
|
|
|
|
line, err := strconv.Atoi(lineStr)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to parse line number from error mesage: %s -> %v", lineStr, err)
|
|
|
|
}
|
|
|
|
charStr := m[3]
|
|
|
|
char, err := strconv.Atoi(charStr)
|
|
|
|
if err != nil {
|
|
|
|
return nil, fmt.Errorf("failed to parse character number from error mesage: %s -> %v", charStr, err)
|
|
|
|
}
|
|
|
|
|
|
|
|
a.Errors = append(a.Errors, queryParseError{
|
|
|
|
Line: line,
|
|
|
|
Column: columnFromCharacter(r.Query, char),
|
|
|
|
Character: char,
|
|
|
|
Message: msg,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
return a, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func columnFromCharacter(q string, char int) int {
|
|
|
|
col := 0
|
|
|
|
for i, c := range q {
|
|
|
|
if c == '\n' {
|
|
|
|
col = 0
|
|
|
|
}
|
|
|
|
|
|
|
|
if i == char {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
col++
|
|
|
|
}
|
|
|
|
|
|
|
|
return col
|
|
|
|
}
|
|
|
|
|
|
|
|
var influxqlParseErrorRE = regexp.MustCompile(`^(.+) at line (\d+), char (\d+)$`)
|
|
|
|
|
2018-09-06 18:09:52 +00:00
|
|
|
// ProxyRequest returns a request to proxy from the flux.
|
2018-09-11 22:56:51 +00:00
|
|
|
func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) {
|
2018-09-14 04:01:07 +00:00
|
|
|
return r.proxyRequest(time.Now)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r QueryRequest) proxyRequest(now func() time.Time) (*query.ProxyRequest, error) {
|
2018-09-05 14:33:10 +00:00
|
|
|
if err := r.Validate(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2019-04-09 14:35:42 +00:00
|
|
|
// Query is preferred over AST
|
2018-09-06 18:09:52 +00:00
|
|
|
var compiler flux.Compiler
|
2018-09-04 21:08:00 +00:00
|
|
|
if r.Query != "" {
|
2019-06-13 22:15:24 +00:00
|
|
|
compiler = lang.FluxCompiler{
|
|
|
|
Now: now(),
|
|
|
|
Extern: r.Extern,
|
|
|
|
Query: r.Query,
|
2019-02-21 21:11:50 +00:00
|
|
|
}
|
|
|
|
} else if r.AST != nil {
|
|
|
|
c := lang.ASTCompiler{
|
|
|
|
AST: r.AST,
|
2019-03-01 16:53:24 +00:00
|
|
|
Now: now(),
|
2019-02-21 21:11:50 +00:00
|
|
|
}
|
|
|
|
if r.Extern != nil {
|
|
|
|
c.PrependFile(r.Extern)
|
|
|
|
}
|
|
|
|
compiler = c
|
2018-09-04 21:08:00 +00:00
|
|
|
} else if r.Spec != nil {
|
2019-04-10 20:25:02 +00:00
|
|
|
compiler = repl.Compiler{
|
2018-09-04 21:08:00 +00:00
|
|
|
Spec: r.Spec,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
delimiter, _ := utf8.DecodeRuneInString(r.Dialect.Delimiter)
|
|
|
|
|
|
|
|
noHeader := false
|
|
|
|
if r.Dialect.Header != nil {
|
|
|
|
noHeader = !*r.Dialect.Header
|
|
|
|
}
|
|
|
|
|
|
|
|
// TODO(nathanielc): Use commentPrefix and dateTimeFormat
|
|
|
|
// once they are supported.
|
2018-09-11 22:56:51 +00:00
|
|
|
return &query.ProxyRequest{
|
|
|
|
Request: query.Request{
|
2018-11-07 16:31:42 +00:00
|
|
|
OrganizationID: r.Org.ID,
|
2018-09-04 21:08:00 +00:00
|
|
|
Compiler: compiler,
|
|
|
|
},
|
2018-11-07 16:31:42 +00:00
|
|
|
Dialect: &csv.Dialect{
|
2018-09-04 21:08:00 +00:00
|
|
|
ResultEncoderConfig: csv.ResultEncoderConfig{
|
|
|
|
NoHeader: noHeader,
|
|
|
|
Delimiter: delimiter,
|
|
|
|
Annotations: r.Dialect.Annotations,
|
|
|
|
},
|
|
|
|
},
|
2018-09-05 14:33:10 +00:00
|
|
|
}, nil
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
|
|
|
|
2018-09-12 21:10:09 +00:00
|
|
|
// QueryRequestFromProxyRequest converts a query.ProxyRequest into a QueryRequest.
|
|
|
|
// The ProxyRequest must contain supported compilers and dialects otherwise an error occurs.
|
|
|
|
func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error) {
|
|
|
|
qr := new(QueryRequest)
|
|
|
|
switch c := req.Request.Compiler.(type) {
|
|
|
|
case lang.FluxCompiler:
|
|
|
|
qr.Type = "flux"
|
|
|
|
qr.Query = c.Query
|
2019-06-27 21:33:22 +00:00
|
|
|
qr.Extern = c.Extern
|
2019-04-10 20:25:02 +00:00
|
|
|
case repl.Compiler:
|
2018-09-12 21:10:09 +00:00
|
|
|
qr.Type = "flux"
|
|
|
|
qr.Spec = c.Spec
|
2019-02-21 21:11:50 +00:00
|
|
|
case lang.ASTCompiler:
|
|
|
|
qr.Type = "flux"
|
|
|
|
qr.AST = c.AST
|
2018-09-12 21:10:09 +00:00
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported compiler %T", c)
|
|
|
|
}
|
|
|
|
switch d := req.Dialect.(type) {
|
|
|
|
case *csv.Dialect:
|
|
|
|
var header = !d.ResultEncoderConfig.NoHeader
|
|
|
|
qr.Dialect.Header = &header
|
|
|
|
qr.Dialect.Delimiter = string(d.ResultEncoderConfig.Delimiter)
|
|
|
|
qr.Dialect.CommentPrefix = "#"
|
|
|
|
qr.Dialect.DateTimeFormat = "RFC3339"
|
|
|
|
qr.Dialect.Annotations = d.ResultEncoderConfig.Annotations
|
|
|
|
default:
|
|
|
|
return nil, fmt.Errorf("unsupported dialect %T", d)
|
|
|
|
}
|
|
|
|
|
|
|
|
return qr, nil
|
|
|
|
}
|
|
|
|
|
2019-04-10 21:08:22 +00:00
|
|
|
func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.OrganizationService) (*QueryRequest, int, error) {
|
2018-09-04 21:08:00 +00:00
|
|
|
var req QueryRequest
|
2019-04-10 21:08:22 +00:00
|
|
|
body := &countReader{Reader: r.Body}
|
2019-01-17 18:36:05 +00:00
|
|
|
|
|
|
|
var contentType = "application/json"
|
|
|
|
if ct := r.Header.Get("Content-Type"); ct != "" {
|
|
|
|
contentType = ct
|
|
|
|
}
|
|
|
|
mt, _, err := mime.ParseMediaType(contentType)
|
|
|
|
if err != nil {
|
2019-04-10 21:08:22 +00:00
|
|
|
return nil, body.bytesRead, err
|
2019-01-17 18:36:05 +00:00
|
|
|
}
|
|
|
|
switch mt {
|
|
|
|
case "application/vnd.flux":
|
2019-04-10 21:08:22 +00:00
|
|
|
octets, err := ioutil.ReadAll(body)
|
2019-01-17 18:36:05 +00:00
|
|
|
if err != nil {
|
2019-04-10 21:08:22 +00:00
|
|
|
return nil, body.bytesRead, err
|
2019-01-02 19:36:16 +00:00
|
|
|
}
|
2019-04-10 21:08:22 +00:00
|
|
|
req.Query = string(octets)
|
2019-01-17 18:36:05 +00:00
|
|
|
case "application/json":
|
|
|
|
fallthrough
|
|
|
|
default:
|
2019-04-10 21:08:22 +00:00
|
|
|
if err := json.NewDecoder(body).Decode(&req); err != nil {
|
|
|
|
return nil, body.bytesRead, err
|
2019-01-02 19:36:16 +00:00
|
|
|
}
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
req = req.WithDefaults()
|
2019-01-17 18:36:05 +00:00
|
|
|
if err := req.Validate(); err != nil {
|
2019-04-10 21:08:22 +00:00
|
|
|
return nil, body.bytesRead, err
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
|
|
|
|
2018-11-07 16:31:42 +00:00
|
|
|
req.Org, err = queryOrganization(ctx, r, svc)
|
2019-04-10 21:08:22 +00:00
|
|
|
return &req, body.bytesRead, err
|
|
|
|
}
|
|
|
|
|
|
|
|
type countReader struct {
|
|
|
|
bytesRead int
|
|
|
|
io.Reader
|
|
|
|
}
|
|
|
|
|
|
|
|
func (r *countReader) Read(p []byte) (n int, err error) {
|
|
|
|
n, err = r.Reader.Read(p)
|
|
|
|
r.bytesRead += n
|
|
|
|
return n, err
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
|
|
|
|
2019-04-10 21:08:22 +00:00
|
|
|
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth influxdb.Authorizer, svc influxdb.OrganizationService) (*query.ProxyRequest, int, error) {
|
|
|
|
req, n, err := decodeQueryRequest(ctx, r, svc)
|
2018-09-04 21:08:00 +00:00
|
|
|
if err != nil {
|
2019-04-10 21:08:22 +00:00
|
|
|
return nil, n, err
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
2018-09-13 15:39:08 +00:00
|
|
|
|
|
|
|
pr, err := req.ProxyRequest()
|
|
|
|
if err != nil {
|
2019-04-10 21:08:22 +00:00
|
|
|
return nil, n, err
|
2018-09-13 15:39:08 +00:00
|
|
|
}
|
|
|
|
|
2019-03-07 21:32:48 +00:00
|
|
|
var token *influxdb.Authorization
|
|
|
|
switch a := auth.(type) {
|
|
|
|
case *influxdb.Authorization:
|
|
|
|
token = a
|
|
|
|
case *influxdb.Session:
|
|
|
|
token = a.EphemeralAuth(req.Org.ID)
|
2019-11-22 09:12:36 +00:00
|
|
|
case *jsonweb.Token:
|
|
|
|
token = a.EphemeralAuth(req.Org.ID)
|
2019-03-07 21:32:48 +00:00
|
|
|
default:
|
2019-04-10 21:08:22 +00:00
|
|
|
return pr, n, influxdb.ErrAuthorizerNotSupported
|
2018-11-20 23:20:51 +00:00
|
|
|
}
|
|
|
|
|
2019-03-07 21:32:48 +00:00
|
|
|
pr.Request.Authorization = token
|
2019-04-10 21:08:22 +00:00
|
|
|
return pr, n, nil
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|