feat(http): add flux AST and Spec endpoints to fluxd
parent
5c75610e3c
commit
9673a425ec
|
@ -24,19 +24,19 @@ export const getSuggestions = async (url: string) => {
|
||||||
|
|
||||||
interface ASTRequest {
|
interface ASTRequest {
|
||||||
url: string
|
url: string
|
||||||
body: string
|
query: string
|
||||||
}
|
}
|
||||||
|
|
||||||
export const getAST = async (request: ASTRequest) => {
|
export const getAST = async (request: ASTRequest) => {
|
||||||
const {url, body} = request
|
const {url, query} = request
|
||||||
try {
|
try {
|
||||||
const {data} = await AJAX({
|
const {data} = await AJAX({
|
||||||
method: 'POST',
|
method: 'POST',
|
||||||
url,
|
url,
|
||||||
data: {body},
|
data: {query},
|
||||||
})
|
})
|
||||||
|
|
||||||
return data
|
return data.ast
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
console.error('Could not parse query', error)
|
console.error('Could not parse query', error)
|
||||||
throw error
|
throw error
|
||||||
|
|
|
@ -52,7 +52,7 @@ class FilterArgs extends PureComponent<Props, State> {
|
||||||
public async convertStringToNodes() {
|
public async convertStringToNodes() {
|
||||||
const {links, value} = this.props
|
const {links, value} = this.props
|
||||||
|
|
||||||
const ast = await getAST({url: links.ast, body: value})
|
const ast = await getAST({url: links.ast, query: value})
|
||||||
const nodes = new Walker(ast).inOrderExpression
|
const nodes = new Walker(ast).inOrderExpression
|
||||||
this.setState({nodes, ast})
|
this.setState({nodes, ast})
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,7 +41,7 @@ export class FilterPreview extends PureComponent<Props, State> {
|
||||||
public async convertStringToNodes() {
|
public async convertStringToNodes() {
|
||||||
const {links, filterString} = this.props
|
const {links, filterString} = this.props
|
||||||
|
|
||||||
const ast = await getAST({url: links.ast, body: filterString})
|
const ast = await getAST({url: links.ast, query: filterString})
|
||||||
const nodes = new Walker(ast).inOrderExpression
|
const nodes = new Walker(ast).inOrderExpression
|
||||||
this.setState({nodes, ast})
|
this.setState({nodes, ast})
|
||||||
}
|
}
|
||||||
|
|
|
@ -599,7 +599,7 @@ export class FluxPage extends PureComponent<Props, State> {
|
||||||
const {links, notify, script} = this.props
|
const {links, notify, script} = this.props
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const ast = await getAST({url: links.ast, body: script})
|
const ast = await getAST({url: links.ast, query: script})
|
||||||
const body = bodyNodes(ast, this.state.suggestions)
|
const body = bodyNodes(ast, this.state.suggestions)
|
||||||
const status = {type: 'success', text: ''}
|
const status = {type: 'success', text: ''}
|
||||||
notify(validateSuccess())
|
notify(validateSuccess())
|
||||||
|
@ -620,7 +620,7 @@ export class FluxPage extends PureComponent<Props, State> {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const ast = await getAST({url: links.ast, body: script})
|
const ast = await getAST({url: links.ast, query: script})
|
||||||
|
|
||||||
if (update) {
|
if (update) {
|
||||||
this.props.updateScript(script)
|
this.props.updateScript(script)
|
||||||
|
@ -643,7 +643,7 @@ export class FluxPage extends PureComponent<Props, State> {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await getAST({url: links.ast, body: script})
|
await getAST({url: links.ast, query: script})
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.setState({status: this.parseError(error)})
|
this.setState({status: this.parseError(error)})
|
||||||
return console.error('Could not parse AST', error)
|
return console.error('Could not parse AST', error)
|
||||||
|
|
|
@ -124,7 +124,10 @@ func fluxF(cmd *cobra.Command, args []string) {
|
||||||
queryHandler.Logger = logger.With(zap.String("handler", "query"))
|
queryHandler.Logger = logger.With(zap.String("handler", "query"))
|
||||||
|
|
||||||
handler := http.NewHandlerFromRegistry("query", reg)
|
handler := http.NewHandlerFromRegistry("query", reg)
|
||||||
handler.Handler = queryHandler
|
handler.Handler = &Handler{
|
||||||
|
QueryHandler: queryHandler,
|
||||||
|
FluxLangHandler: http.NewFluxLangHandler(),
|
||||||
|
}
|
||||||
handler.Logger = logger
|
handler.Logger = logger
|
||||||
handler.Tracer = tracer
|
handler.Tracer = tracer
|
||||||
|
|
||||||
|
@ -235,3 +238,27 @@ func (l bucketLookup) Lookup(orgID platform.ID, name string) (platform.ID, bool)
|
||||||
// The deps.Reader will interpret this as the db/rp for the RPC call
|
// The deps.Reader will interpret this as the db/rp for the RPC call
|
||||||
return platform.ID(name), true
|
return platform.ID(name), true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handler handles the incoming http requests for fluxd.
|
||||||
|
type Handler struct {
|
||||||
|
QueryHandler *http.ExternalQueryHandler
|
||||||
|
FluxLangHandler *http.FluxLangHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
// ServeHTTP delegates a request to the appropriate subhandler.
|
||||||
|
func (h *Handler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) {
|
||||||
|
if strings.HasPrefix(r.URL.Path, "/ping") {
|
||||||
|
h.QueryHandler.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if strings.HasPrefix(r.URL.Path, "/query") {
|
||||||
|
h.QueryHandler.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.HasPrefix(r.URL.Path, "/v2/flux") {
|
||||||
|
h.FluxLangHandler.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
nethttp.NotFound(w, r)
|
||||||
|
}
|
||||||
|
|
|
@ -3,29 +3,36 @@ package http
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/flux"
|
||||||
|
"github.com/influxdata/flux/ast"
|
||||||
"github.com/influxdata/flux/complete"
|
"github.com/influxdata/flux/complete"
|
||||||
"github.com/influxdata/flux/parser"
|
"github.com/influxdata/flux/parser"
|
||||||
|
"github.com/influxdata/platform/kit/errors"
|
||||||
"github.com/julienschmidt/httprouter"
|
"github.com/julienschmidt/httprouter"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FluxLangHandler represents an HTTP API handler for buckets.
|
// FluxLangHandler represents an HTTP API handler for buckets.
|
||||||
type FluxLangHandler struct {
|
type FluxLangHandler struct {
|
||||||
*httprouter.Router
|
*httprouter.Router
|
||||||
|
Now func() time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
type astRequest struct {
|
type langRequest struct {
|
||||||
Body string `json:"body"`
|
Query string `json:"query"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewFluxLangHandler returns a new instance of FluxLangHandler.
|
// NewFluxLangHandler returns a new instance of FluxLangHandler.
|
||||||
func NewFluxLangHandler() *FluxLangHandler {
|
func NewFluxLangHandler() *FluxLangHandler {
|
||||||
h := &FluxLangHandler{
|
h := &FluxLangHandler{
|
||||||
Router: httprouter.New(),
|
Router: httprouter.New(),
|
||||||
|
Now: time.Now,
|
||||||
}
|
}
|
||||||
|
|
||||||
h.HandlerFunc("GET", "/v2/flux", h.getFlux)
|
h.HandlerFunc("GET", "/v2/flux", h.getFlux)
|
||||||
h.HandlerFunc("POST", "/v2/flux/ast", h.postFluxAST)
|
h.HandlerFunc("POST", "/v2/flux/ast", h.postFluxAST)
|
||||||
|
h.HandlerFunc("POST", "/v2/flux/spec", h.postFluxSpec)
|
||||||
h.HandlerFunc("GET", "/v2/flux/suggestions", h.getFluxSuggestions)
|
h.HandlerFunc("GET", "/v2/flux/suggestions", h.getFluxSuggestions)
|
||||||
h.HandlerFunc("GET", "/v2/flux/suggestions/:name", h.getFluxSuggestion)
|
h.HandlerFunc("GET", "/v2/flux/suggestions/:name", h.getFluxSuggestion)
|
||||||
return h
|
return h
|
||||||
|
@ -72,24 +79,63 @@ func (h *FluxLangHandler) getFlux(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type postFluxASTResponse struct {
|
||||||
|
AST *ast.Program `json:"ast"`
|
||||||
|
}
|
||||||
|
|
||||||
// postFluxAST returns a flux AST for provided flux string
|
// postFluxAST returns a flux AST for provided flux string
|
||||||
func (h *FluxLangHandler) postFluxAST(w http.ResponseWriter, r *http.Request) {
|
func (h *FluxLangHandler) postFluxAST(w http.ResponseWriter, r *http.Request) {
|
||||||
var request astRequest
|
var request langRequest
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
err := json.NewDecoder(r.Body).Decode(&request)
|
err := json.NewDecoder(r.Body).Decode(&request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, errors.MalformedDataf("invalid json: %v", err), w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ast, err := parser.NewAST(request.Body)
|
ast, err := parser.NewAST(request.Query)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, errors.InvalidDataf("invalid json: %v", err), w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := encodeResponse(ctx, w, http.StatusOK, ast); err != nil {
|
res := postFluxASTResponse{
|
||||||
|
AST: ast,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil {
|
||||||
|
EncodeError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type postFluxSpecResponse struct {
|
||||||
|
Spec *flux.Spec `json:"spec"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// postFluxSpe returns a flux Spec for provided flux string
|
||||||
|
func (h *FluxLangHandler) postFluxSpec(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var req langRequest
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
err := json.NewDecoder(r.Body).Decode(&req)
|
||||||
|
if err != nil {
|
||||||
|
EncodeError(ctx, errors.MalformedDataf("invalid json: %v", err), w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
spec, err := flux.Compile(ctx, req.Query, h.Now())
|
||||||
|
if err != nil {
|
||||||
|
EncodeError(ctx, errors.InvalidDataf("invalid json: %v", err), w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res := postFluxSpecResponse{
|
||||||
|
Spec: spec,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil {
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,121 @@
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFluxLangHandler_getFlux(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
w *httptest.ResponseRecorder
|
||||||
|
r *http.Request
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "get links",
|
||||||
|
w: httptest.NewRecorder(),
|
||||||
|
r: httptest.NewRequest("GET", "/v2/flux", nil),
|
||||||
|
want: `{"links":{"self":"/v2/flux","suggestions":"/v2/flux/suggestions","ast":"/v2/flux/ast"}}
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
h := &FluxLangHandler{}
|
||||||
|
h.getFlux(tt.w, tt.r)
|
||||||
|
if got := tt.w.Body.String(); got != tt.want {
|
||||||
|
t.Errorf("http.getFlux = got %s\nwant %s", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFluxLangHandler_postFluxAST(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
w *httptest.ResponseRecorder
|
||||||
|
r *http.Request
|
||||||
|
want string
|
||||||
|
status int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "get ast from()",
|
||||||
|
w: httptest.NewRecorder(),
|
||||||
|
r: httptest.NewRequest("GET", "/v2/flux/ast", bytes.NewBufferString(`{"query": "from()"}`)),
|
||||||
|
want: `{"ast":{"type":"Program","location":{"start":{"line":1,"column":1},"end":{"line":1,"column":7},"source":"from()"},"body":[{"type":"ExpressionStatement","location":{"start":{"line":1,"column":1},"end":{"line":1,"column":7},"source":"from()"},"expression":{"type":"CallExpression","location":{"start":{"line":1,"column":1},"end":{"line":1,"column":7},"source":"from()"},"callee":{"type":"Identifier","location":{"start":{"line":1,"column":1},"end":{"line":1,"column":5},"source":"from"},"name":"from"}}}]}}
|
||||||
|
`,
|
||||||
|
status: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error from bad json",
|
||||||
|
w: httptest.NewRecorder(),
|
||||||
|
r: httptest.NewRequest("GET", "/v2/flux/ast", bytes.NewBufferString(`error!`)),
|
||||||
|
status: http.StatusBadRequest,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
h := &FluxLangHandler{}
|
||||||
|
h.postFluxAST(tt.w, tt.r)
|
||||||
|
if got := tt.w.Body.String(); got != tt.want {
|
||||||
|
t.Errorf("http.postFluxAST = got\n%vwant\n%v", got, tt.want)
|
||||||
|
}
|
||||||
|
if got := tt.w.Code; got != tt.status {
|
||||||
|
t.Errorf("http.postFluxAST = got %d\nwant %d", got, tt.status)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFluxLangHandler_postFluxSpec(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
w *httptest.ResponseRecorder
|
||||||
|
r *http.Request
|
||||||
|
now func() time.Time
|
||||||
|
want string
|
||||||
|
status int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "get spec from()",
|
||||||
|
w: httptest.NewRecorder(),
|
||||||
|
r: httptest.NewRequest("GET", "/v2/flux/spec", bytes.NewBufferString(`{"query": "from(bucket: \"telegraf\")"}`)),
|
||||||
|
now: func() time.Time { return time.Unix(0, 0).UTC() },
|
||||||
|
want: `{"spec":{"operations":[{"kind":"from","id":"from0","spec":{"bucket":"telegraf"}}],"edges":null,"resources":{"priority":"high","concurrency_quota":0,"memory_bytes_quota":0},"now":"1970-01-01T00:00:00Z"}}
|
||||||
|
`,
|
||||||
|
status: http.StatusOK,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error from bad json",
|
||||||
|
w: httptest.NewRecorder(),
|
||||||
|
r: httptest.NewRequest("GET", "/v2/flux/spec", bytes.NewBufferString(`error!`)),
|
||||||
|
status: http.StatusBadRequest,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error from incomplete spec",
|
||||||
|
w: httptest.NewRecorder(),
|
||||||
|
r: httptest.NewRequest("GET", "/v2/flux/spec", bytes.NewBufferString(`{"query": "from()"}`)),
|
||||||
|
now: func() time.Time { return time.Unix(0, 0).UTC() },
|
||||||
|
status: http.StatusUnprocessableEntity,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
h := &FluxLangHandler{
|
||||||
|
Now: tt.now,
|
||||||
|
}
|
||||||
|
h.postFluxSpec(tt.w, tt.r)
|
||||||
|
if got := tt.w.Body.String(); got != tt.want {
|
||||||
|
t.Errorf("http.postFluxSpec = got %s\nwant %s", got, tt.want)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := tt.w.Code; got != tt.status {
|
||||||
|
t.Errorf("http.postFluxSpec = got %d\nwant %d", got, tt.status)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,11 +5,15 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
"unicode/utf8"
|
"unicode/utf8"
|
||||||
|
|
||||||
"github.com/influxdata/flux"
|
"github.com/influxdata/flux"
|
||||||
|
"github.com/influxdata/flux/ast"
|
||||||
"github.com/influxdata/flux/csv"
|
"github.com/influxdata/flux/csv"
|
||||||
"github.com/influxdata/flux/lang"
|
"github.com/influxdata/flux/lang"
|
||||||
|
"github.com/influxdata/flux/semantic"
|
||||||
|
"github.com/influxdata/flux/values"
|
||||||
"github.com/influxdata/platform"
|
"github.com/influxdata/platform"
|
||||||
"github.com/influxdata/platform/kit/errors"
|
"github.com/influxdata/platform/kit/errors"
|
||||||
"github.com/influxdata/platform/query"
|
"github.com/influxdata/platform/query"
|
||||||
|
@ -18,6 +22,7 @@ import (
|
||||||
// QueryRequest is a flux query request.
|
// QueryRequest is a flux query request.
|
||||||
type QueryRequest struct {
|
type QueryRequest struct {
|
||||||
Spec *flux.Spec `json:"spec,omitempty"`
|
Spec *flux.Spec `json:"spec,omitempty"`
|
||||||
|
AST *ast.Program `json:"ast,omitempty"`
|
||||||
Query string `json:"query"`
|
Query string `json:"query"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Dialect QueryDialect `json:"dialect"`
|
Dialect QueryDialect `json:"dialect"`
|
||||||
|
@ -54,8 +59,8 @@ func (r QueryRequest) WithDefaults() QueryRequest {
|
||||||
|
|
||||||
// Validate checks the query request and returns an error if the request is invalid.
|
// Validate checks the query request and returns an error if the request is invalid.
|
||||||
func (r QueryRequest) Validate() error {
|
func (r QueryRequest) Validate() error {
|
||||||
if r.Query == "" && r.Spec == nil {
|
if r.Query == "" && r.Spec == nil && r.AST == nil {
|
||||||
return errors.New(`request body requires either spec or query`)
|
return errors.New(`request body requires either query, spec, or AST`)
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Type != "flux" {
|
if r.Type != "flux" {
|
||||||
|
@ -92,8 +97,39 @@ func (r QueryRequest) Validate() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func nowFunc(now time.Time) values.Function {
|
||||||
|
timeVal := values.NewTimeValue(values.ConvertTime(now))
|
||||||
|
ftype := semantic.NewFunctionType(semantic.FunctionSignature{
|
||||||
|
ReturnType: semantic.Time,
|
||||||
|
})
|
||||||
|
call := func(args values.Object) (values.Value, error) {
|
||||||
|
return timeVal, nil
|
||||||
|
}
|
||||||
|
sideEffect := false
|
||||||
|
return values.NewFunction("now", ftype, call, sideEffect)
|
||||||
|
}
|
||||||
|
|
||||||
|
func toSpec(p *ast.Program, now func() time.Time) (*flux.Spec, error) {
|
||||||
|
itrp := flux.NewInterpreter()
|
||||||
|
itrp.SetOption("now", nowFunc(now()))
|
||||||
|
_, decl := flux.BuiltIns()
|
||||||
|
semProg, err := semantic.New(p, decl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := itrp.Eval(semProg); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return flux.ToSpec(itrp, itrp.SideEffects()...), nil
|
||||||
|
}
|
||||||
|
|
||||||
// ProxyRequest returns a request to proxy from the flux.
|
// ProxyRequest returns a request to proxy from the flux.
|
||||||
func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) {
|
func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) {
|
||||||
|
return r.proxyRequest(time.Now)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r QueryRequest) proxyRequest(now func() time.Time) (*query.ProxyRequest, error) {
|
||||||
if err := r.Validate(); err != nil {
|
if err := r.Validate(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -103,6 +139,15 @@ func (r QueryRequest) ProxyRequest() (*query.ProxyRequest, error) {
|
||||||
compiler = lang.FluxCompiler{
|
compiler = lang.FluxCompiler{
|
||||||
Query: r.Query,
|
Query: r.Query,
|
||||||
}
|
}
|
||||||
|
} else if r.AST != nil {
|
||||||
|
var err error
|
||||||
|
r.Spec, err = toSpec(r.AST, now)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
compiler = lang.SpecCompiler{
|
||||||
|
Spec: r.Spec,
|
||||||
|
}
|
||||||
} else if r.Spec != nil {
|
} else if r.Spec != nil {
|
||||||
compiler = lang.SpecCompiler{
|
compiler = lang.SpecCompiler{
|
||||||
Spec: r.Spec,
|
Spec: r.Spec,
|
||||||
|
|
|
@ -0,0 +1,476 @@
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"github.com/influxdata/flux/csv"
|
||||||
|
"github.com/influxdata/flux/lang"
|
||||||
|
"github.com/influxdata/platform/mock"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/influxdata/flux"
|
||||||
|
"github.com/influxdata/flux/ast"
|
||||||
|
"github.com/influxdata/platform"
|
||||||
|
"github.com/influxdata/platform/query"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestQueryRequest_WithDefaults(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
Spec *flux.Spec
|
||||||
|
AST *ast.Program
|
||||||
|
Query string
|
||||||
|
Type string
|
||||||
|
Dialect QueryDialect
|
||||||
|
org *platform.Organization
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
want QueryRequest
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty query has defaults set",
|
||||||
|
want: QueryRequest{
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
DateTimeFormat: "RFC3339",
|
||||||
|
Header: func(x bool) *bool { return &x }(true),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
r := QueryRequest{
|
||||||
|
Spec: tt.fields.Spec,
|
||||||
|
AST: tt.fields.AST,
|
||||||
|
Query: tt.fields.Query,
|
||||||
|
Type: tt.fields.Type,
|
||||||
|
Dialect: tt.fields.Dialect,
|
||||||
|
org: tt.fields.org,
|
||||||
|
}
|
||||||
|
if got := r.WithDefaults(); !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("QueryRequest.WithDefaults() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQueryRequest_Validate(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
Spec *flux.Spec
|
||||||
|
AST *ast.Program
|
||||||
|
Query string
|
||||||
|
Type string
|
||||||
|
Dialect QueryDialect
|
||||||
|
org *platform.Organization
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "requires query, spec, or ast",
|
||||||
|
fields: fields{
|
||||||
|
Type: "flux",
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "requires flux type",
|
||||||
|
fields: fields{
|
||||||
|
Query: "howdy",
|
||||||
|
Type: "doody",
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "comment must be a single character",
|
||||||
|
fields: fields{
|
||||||
|
Query: "from()",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
CommentPrefix: "error!",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "delimiter must be a single character",
|
||||||
|
fields: fields{
|
||||||
|
Query: "from()",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "characters must be unicode runes",
|
||||||
|
fields: fields{
|
||||||
|
Query: "from()",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: string([]byte{0x80}),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unknown annotations",
|
||||||
|
fields: fields{
|
||||||
|
Query: "from()",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
Annotations: []string{"error"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "unknown date time format",
|
||||||
|
fields: fields{
|
||||||
|
Query: "from()",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
DateTimeFormat: "error",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid query",
|
||||||
|
fields: fields{
|
||||||
|
Query: "from()",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
DateTimeFormat: "RFC3339",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
r := QueryRequest{
|
||||||
|
Spec: tt.fields.Spec,
|
||||||
|
AST: tt.fields.AST,
|
||||||
|
Query: tt.fields.Query,
|
||||||
|
Type: tt.fields.Type,
|
||||||
|
Dialect: tt.fields.Dialect,
|
||||||
|
org: tt.fields.org,
|
||||||
|
}
|
||||||
|
if err := r.Validate(); (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("QueryRequest.Validate() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_toSpec(t *testing.T) {
|
||||||
|
flux.FinalizeBuiltIns()
|
||||||
|
type args struct {
|
||||||
|
p *ast.Program
|
||||||
|
now func() time.Time
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want *flux.Spec
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "ast converts to spec",
|
||||||
|
args: args{
|
||||||
|
p: &ast.Program{},
|
||||||
|
now: func() time.Time { return time.Unix(0, 0) },
|
||||||
|
},
|
||||||
|
want: &flux.Spec{
|
||||||
|
Now: time.Unix(0, 0).UTC(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "bad semantics error",
|
||||||
|
args: args{
|
||||||
|
p: &ast.Program{
|
||||||
|
Body: []ast.Statement{
|
||||||
|
&ast.ReturnStatement{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
now: func() time.Time { return time.Unix(0, 0) },
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
|
||||||
|
got, err := toSpec(tt.args.p, tt.args.now)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("toSpec() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("toSpec() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestQueryRequest_proxyRequest(t *testing.T) {
|
||||||
|
type fields struct {
|
||||||
|
Spec *flux.Spec
|
||||||
|
AST *ast.Program
|
||||||
|
Query string
|
||||||
|
Type string
|
||||||
|
Dialect QueryDialect
|
||||||
|
org *platform.Organization
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
fields fields
|
||||||
|
now func() time.Time
|
||||||
|
want *query.ProxyRequest
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "requires query, spec, or ast",
|
||||||
|
fields: fields{
|
||||||
|
Type: "flux",
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid query",
|
||||||
|
fields: fields{
|
||||||
|
Query: "howdy",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
DateTimeFormat: "RFC3339",
|
||||||
|
},
|
||||||
|
org: &platform.Organization{},
|
||||||
|
},
|
||||||
|
want: &query.ProxyRequest{
|
||||||
|
Request: query.Request{
|
||||||
|
Compiler: lang.FluxCompiler{
|
||||||
|
Query: "howdy",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dialect: csv.Dialect{
|
||||||
|
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||||
|
NoHeader: false,
|
||||||
|
Delimiter: ',',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid AST",
|
||||||
|
fields: fields{
|
||||||
|
AST: &ast.Program{},
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
DateTimeFormat: "RFC3339",
|
||||||
|
},
|
||||||
|
org: &platform.Organization{},
|
||||||
|
},
|
||||||
|
now: func() time.Time { return time.Unix(0, 0).UTC() },
|
||||||
|
want: &query.ProxyRequest{
|
||||||
|
Request: query.Request{
|
||||||
|
Compiler: lang.SpecCompiler{
|
||||||
|
Spec: &flux.Spec{
|
||||||
|
Now: time.Unix(0, 0).UTC(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dialect: csv.Dialect{
|
||||||
|
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||||
|
NoHeader: false,
|
||||||
|
Delimiter: ',',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "valid spec",
|
||||||
|
fields: fields{
|
||||||
|
Type: "flux",
|
||||||
|
Spec: &flux.Spec{
|
||||||
|
Now: time.Unix(0, 0).UTC(),
|
||||||
|
},
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
DateTimeFormat: "RFC3339",
|
||||||
|
},
|
||||||
|
org: &platform.Organization{},
|
||||||
|
},
|
||||||
|
want: &query.ProxyRequest{
|
||||||
|
Request: query.Request{
|
||||||
|
Compiler: lang.SpecCompiler{
|
||||||
|
Spec: &flux.Spec{
|
||||||
|
Now: time.Unix(0, 0).UTC(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dialect: csv.Dialect{
|
||||||
|
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||||
|
NoHeader: false,
|
||||||
|
Delimiter: ',',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
r := QueryRequest{
|
||||||
|
Spec: tt.fields.Spec,
|
||||||
|
AST: tt.fields.AST,
|
||||||
|
Query: tt.fields.Query,
|
||||||
|
Type: tt.fields.Type,
|
||||||
|
Dialect: tt.fields.Dialect,
|
||||||
|
org: tt.fields.org,
|
||||||
|
}
|
||||||
|
got, err := r.proxyRequest(tt.now)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("QueryRequest.ProxyRequest() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("QueryRequest.ProxyRequest() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_decodeQueryRequest(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
ctx context.Context
|
||||||
|
r *http.Request
|
||||||
|
svc platform.OrganizationService
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want *QueryRequest
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "valid query request",
|
||||||
|
args: args{
|
||||||
|
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"query": "from()"}`)),
|
||||||
|
svc: &mock.OrganizationService{
|
||||||
|
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
|
||||||
|
return &platform.Organization{
|
||||||
|
ID: func() platform.ID { s, _ := platform.IDFromString("deadbeef"); return *s }(),
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &QueryRequest{
|
||||||
|
Query: "from()",
|
||||||
|
Type: "flux",
|
||||||
|
Dialect: QueryDialect{
|
||||||
|
Delimiter: ",",
|
||||||
|
DateTimeFormat: "RFC3339",
|
||||||
|
Header: func(x bool) *bool { return &x }(true),
|
||||||
|
},
|
||||||
|
org: &platform.Organization{
|
||||||
|
ID: func() platform.ID { s, _ := platform.IDFromString("deadbeef"); return *s }(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error decoding json",
|
||||||
|
args: args{
|
||||||
|
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`error`)),
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error validating query",
|
||||||
|
args: args{
|
||||||
|
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`{}`)),
|
||||||
|
},
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := decodeQueryRequest(tt.args.ctx, tt.args.r, tt.args.svc)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("decodeQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("decodeQueryRequest() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_decodeProxyQueryRequest(t *testing.T) {
|
||||||
|
type args struct {
|
||||||
|
ctx context.Context
|
||||||
|
r *http.Request
|
||||||
|
auth *platform.Authorization
|
||||||
|
svc platform.OrganizationService
|
||||||
|
}
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
args args
|
||||||
|
want *query.ProxyRequest
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "valid query request",
|
||||||
|
args: args{
|
||||||
|
r: httptest.NewRequest("POST", "/", bytes.NewBufferString(`{"query": "from()"}`)),
|
||||||
|
svc: &mock.OrganizationService{
|
||||||
|
FindOrganizationF: func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
|
||||||
|
return &platform.Organization{
|
||||||
|
ID: func() platform.ID { s, _ := platform.IDFromString("deadbeef"); return *s }(),
|
||||||
|
}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
want: &query.ProxyRequest{
|
||||||
|
Request: query.Request{
|
||||||
|
OrganizationID: func() platform.ID { s, _ := platform.IDFromString("deadbeef"); return *s }(),
|
||||||
|
Compiler: lang.FluxCompiler{
|
||||||
|
Query: "from()",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dialect: csv.Dialect{
|
||||||
|
ResultEncoderConfig: csv.ResultEncoderConfig{
|
||||||
|
NoHeader: false,
|
||||||
|
Delimiter: ',',
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := decodeProxyQueryRequest(tt.args.ctx, tt.args.r, tt.args.auth, tt.args.svc)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("decodeProxyQueryRequest() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("decodeProxyQueryRequest() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,49 @@
|
||||||
|
package mock
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/influxdata/platform"
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ platform.OrganizationService = &OrganizationService{}
|
||||||
|
|
||||||
|
// OrganizationService is a mock organization server.
|
||||||
|
type OrganizationService struct {
|
||||||
|
FindOrganizationByIDF func(ctx context.Context, id platform.ID) (*platform.Organization, error)
|
||||||
|
FindOrganizationF func(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error)
|
||||||
|
FindOrganizationsF func(ctx context.Context, filter platform.OrganizationFilter, opt ...platform.FindOptions) ([]*platform.Organization, int, error)
|
||||||
|
CreateOrganizationF func(ctx context.Context, b *platform.Organization) error
|
||||||
|
UpdateOrganizationF func(ctx context.Context, id platform.ID, upd platform.OrganizationUpdate) (*platform.Organization, error)
|
||||||
|
DeleteOrganizationF func(ctx context.Context, id platform.ID) error
|
||||||
|
}
|
||||||
|
|
||||||
|
//FindOrganizationByID calls FindOrganizationByIDF.
|
||||||
|
func (s *OrganizationService) FindOrganizationByID(ctx context.Context, id platform.ID) (*platform.Organization, error) {
|
||||||
|
return s.FindOrganizationByIDF(ctx, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
//FindOrganization calls FindOrganizationF.
|
||||||
|
func (s *OrganizationService) FindOrganization(ctx context.Context, filter platform.OrganizationFilter) (*platform.Organization, error) {
|
||||||
|
return s.FindOrganizationF(ctx, filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
//FindOrganizations calls FindOrganizationsF.
|
||||||
|
func (s *OrganizationService) FindOrganizations(ctx context.Context, filter platform.OrganizationFilter, opt ...platform.FindOptions) ([]*platform.Organization, int, error) {
|
||||||
|
return s.FindOrganizationsF(ctx, filter, opt...)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateOrganization calls CreateOrganizationF.
|
||||||
|
func (s *OrganizationService) CreateOrganization(ctx context.Context, b *platform.Organization) error {
|
||||||
|
return s.CreateOrganizationF(ctx, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateOrganization calls UpdateOrganizationF.
|
||||||
|
func (s *OrganizationService) UpdateOrganization(ctx context.Context, id platform.ID, upd platform.OrganizationUpdate) (*platform.Organization, error) {
|
||||||
|
return s.UpdateOrganizationF(ctx, id, upd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteOrganization calls DeleteOrganizationF.
|
||||||
|
func (s *OrganizationService) DeleteOrganization(ctx context.Context, id platform.ID) error {
|
||||||
|
return s.DeleteOrganizationF(ctx, id)
|
||||||
|
}
|
Loading…
Reference in New Issue