2018-09-04 21:08:00 +00:00
|
|
|
package http
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
"encoding/json"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"net/http"
|
2018-09-20 17:29:22 +00:00
|
|
|
"net/url"
|
2018-10-04 18:21:53 +00:00
|
|
|
"time"
|
2018-09-04 21:08:00 +00:00
|
|
|
|
2018-09-13 18:21:19 +00:00
|
|
|
"github.com/influxdata/flux"
|
2018-10-04 18:21:53 +00:00
|
|
|
"github.com/influxdata/flux/ast"
|
|
|
|
"github.com/influxdata/flux/complete"
|
2018-09-13 18:21:19 +00:00
|
|
|
"github.com/influxdata/flux/csv"
|
2018-10-04 18:21:53 +00:00
|
|
|
"github.com/influxdata/flux/parser"
|
2018-11-02 00:09:01 +00:00
|
|
|
"github.com/influxdata/flux/plan"
|
2018-09-04 21:08:00 +00:00
|
|
|
"github.com/influxdata/platform"
|
|
|
|
pcontext "github.com/influxdata/platform/context"
|
|
|
|
"github.com/influxdata/platform/kit/errors"
|
2018-09-11 22:56:51 +00:00
|
|
|
"github.com/influxdata/platform/query"
|
2018-09-04 21:08:00 +00:00
|
|
|
"github.com/julienschmidt/httprouter"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
2018-09-26 08:49:19 +00:00
|
|
|
fluxPath = "/api/v2/query"
|
2018-09-04 21:08:00 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// FluxHandler implements handling flux queries.
|
|
|
|
type FluxHandler struct {
|
|
|
|
*httprouter.Router
|
|
|
|
|
|
|
|
Logger *zap.Logger
|
|
|
|
|
2018-11-20 18:56:58 +00:00
|
|
|
Now func() time.Time
|
|
|
|
OrganizationService platform.OrganizationService
|
|
|
|
ProxyQueryService query.ProxyQueryService
|
2018-09-04 21:08:00 +00:00
|
|
|
}
|
|
|
|
|
2018-09-26 08:49:19 +00:00
|
|
|
// NewFluxHandler returns a new handler at /api/v2/query for flux queries.
|
2018-09-04 21:08:00 +00:00
|
|
|
func NewFluxHandler() *FluxHandler {
|
|
|
|
h := &FluxHandler{
|
|
|
|
Router: httprouter.New(),
|
2018-10-04 18:21:53 +00:00
|
|
|
Now: time.Now,
|
2018-09-04 21:08:00 +00:00
|
|
|
Logger: zap.NewNop(),
|
|
|
|
}
|
|
|
|
|
2018-09-20 15:43:43 +00:00
|
|
|
h.HandlerFunc("POST", fluxPath, h.handlePostQuery)
|
2018-10-04 18:21:53 +00:00
|
|
|
h.HandlerFunc("POST", "/api/v2/query/ast", h.postFluxAST)
|
2018-11-02 00:09:01 +00:00
|
|
|
h.HandlerFunc("POST", "/api/v2/query/plan", h.postFluxPlan)
|
2018-10-04 18:21:53 +00:00
|
|
|
h.HandlerFunc("POST", "/api/v2/query/spec", h.postFluxSpec)
|
|
|
|
h.HandlerFunc("GET", "/api/v2/query/suggestions", h.getFluxSuggestions)
|
|
|
|
h.HandlerFunc("GET", "/api/v2/query/suggestions/:name", h.getFluxSuggestion)
|
2018-09-04 21:08:00 +00:00
|
|
|
return h
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *FluxHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
|
|
|
|
ctx := r.Context()
|
|
|
|
|
2018-11-20 18:56:58 +00:00
|
|
|
a, err := pcontext.GetAuthorizer(ctx)
|
2018-09-04 21:08:00 +00:00
|
|
|
if err != nil {
|
2018-10-03 19:13:27 +00:00
|
|
|
EncodeError(ctx, err, w)
|
2018-09-04 21:08:00 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2018-11-20 18:56:58 +00:00
|
|
|
req, err := decodeProxyQueryRequest(ctx, r, a, h.OrganizationService)
|
2018-11-20 23:20:51 +00:00
|
|
|
if err != nil && err != platform.ErrAuthorizerNotSupported {
|
2018-09-04 21:08:00 +00:00
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
hd, ok := req.Dialect.(HTTPDialect)
|
|
|
|
if !ok {
|
|
|
|
EncodeError(ctx, fmt.Errorf("unsupported dialect over HTTP %T", req.Dialect), w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
hd.SetHeaders(w)
|
|
|
|
|
|
|
|
n, err := h.ProxyQueryService.Query(ctx, w, req)
|
|
|
|
if err != nil {
|
|
|
|
if n == 0 {
|
|
|
|
// Only record the error headers IFF nothing has been written to w.
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
h.Logger.Info("Error writing response to client",
|
|
|
|
zap.String("handler", "flux"),
|
|
|
|
zap.Error(err),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-10-04 18:21:53 +00:00
|
|
|
type langRequest struct {
|
|
|
|
Query string `json:"query"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type postFluxASTResponse struct {
|
|
|
|
AST *ast.Program `json:"ast"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// postFluxAST returns a flux AST for provided flux string
|
|
|
|
func (h *FluxHandler) postFluxAST(w http.ResponseWriter, r *http.Request) {
|
|
|
|
var request langRequest
|
|
|
|
ctx := r.Context()
|
|
|
|
|
|
|
|
err := json.NewDecoder(r.Body).Decode(&request)
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, errors.MalformedDataf("invalid json: %v", err), w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
ast, err := parser.NewAST(request.Query)
|
|
|
|
if err != nil {
|
2018-11-02 00:09:01 +00:00
|
|
|
EncodeError(ctx, errors.InvalidDataf("invalid AST: %v", err), w)
|
2018-10-04 18:21:53 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
res := postFluxASTResponse{
|
|
|
|
AST: ast,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
type postFluxSpecResponse struct {
|
|
|
|
Spec *flux.Spec `json:"spec"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// postFluxSpec returns a flux Spec for provided flux string
|
|
|
|
func (h *FluxHandler) postFluxSpec(w http.ResponseWriter, r *http.Request) {
|
|
|
|
var req langRequest
|
|
|
|
ctx := r.Context()
|
|
|
|
|
|
|
|
err := json.NewDecoder(r.Body).Decode(&req)
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, errors.MalformedDataf("invalid json: %v", err), w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
spec, err := flux.Compile(ctx, req.Query, h.Now())
|
|
|
|
if err != nil {
|
2018-11-02 00:09:01 +00:00
|
|
|
EncodeError(ctx, errors.InvalidDataf("invalid spec: %v", err), w)
|
2018-10-04 18:21:53 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
res := postFluxSpecResponse{
|
|
|
|
Spec: spec,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-11-02 00:09:01 +00:00
|
|
|
type fluxPlan struct {
|
|
|
|
Roots []plan.PlanNode `json:"roots"`
|
|
|
|
Resources flux.ResourceManagement `json:"resources"`
|
|
|
|
Now time.Time `json:"now"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func newFluxPlan(p *plan.PlanSpec) *fluxPlan {
|
|
|
|
res := &fluxPlan{
|
2018-11-02 17:03:45 +00:00
|
|
|
Roots: make([]plan.PlanNode, 0, len(p.Roots)),
|
2018-11-02 00:09:01 +00:00
|
|
|
Resources: p.Resources,
|
|
|
|
Now: p.Now,
|
|
|
|
}
|
|
|
|
|
|
|
|
for node := range p.Roots {
|
|
|
|
res.Roots = append(res.Roots, node)
|
|
|
|
}
|
|
|
|
|
|
|
|
return res
|
|
|
|
}
|
|
|
|
|
|
|
|
type postFluxPlanResponse struct {
|
|
|
|
Spec *flux.Spec `json:"spec"`
|
|
|
|
Logical *fluxPlan `json:"logical"`
|
|
|
|
Physical *fluxPlan `json:"physical"`
|
|
|
|
}
|
|
|
|
|
|
|
|
type postPlanRequest struct {
|
|
|
|
Query string `json:"query,omitempty"`
|
|
|
|
Spec *flux.Spec `json:"spec,omityempty"`
|
|
|
|
}
|
|
|
|
|
2018-11-02 14:11:57 +00:00
|
|
|
// Valid check if the plan request has a query or spec defined, but not both.
|
|
|
|
func (p *postPlanRequest) Valid() error {
|
|
|
|
if p.Query == "" && p.Spec == nil {
|
|
|
|
return errors.MalformedDataf("query or spec required")
|
|
|
|
}
|
|
|
|
|
|
|
|
if p.Query != "" && p.Spec != nil {
|
|
|
|
return errors.MalformedDataf("cannot request both query and spec")
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-11-02 00:09:01 +00:00
|
|
|
// postFluxPlan returns a flux plan for provided flux string
|
|
|
|
func (h *FluxHandler) postFluxPlan(w http.ResponseWriter, r *http.Request) {
|
|
|
|
ctx := r.Context()
|
|
|
|
req, err := decodePostPlanRequest(ctx, r)
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
var spec *flux.Spec = req.Spec
|
|
|
|
if req.Query != "" {
|
|
|
|
spec, err = flux.Compile(ctx, req.Query, h.Now())
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, errors.InvalidDataf("invalid spec: %v", err), w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
logical := plan.NewLogicalPlanner()
|
|
|
|
log, err := logical.Plan(spec)
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, errors.InvalidDataf("invalid logical plan: %v", err), w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
physical := plan.NewPhysicalPlanner()
|
|
|
|
phys, err := physical.Plan(log)
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, errors.InvalidDataf("invalid physical plan: %v", err), w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
res := postFluxPlanResponse{
|
|
|
|
Logical: newFluxPlan(log),
|
|
|
|
Physical: newFluxPlan(phys),
|
|
|
|
Spec: spec,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func decodePostPlanRequest(ctx context.Context, r *http.Request) (*postPlanRequest, error) {
|
|
|
|
req := &postPlanRequest{}
|
|
|
|
err := json.NewDecoder(r.Body).Decode(req)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.MalformedDataf("invalid json: %v", err)
|
|
|
|
}
|
|
|
|
|
2018-11-02 14:11:57 +00:00
|
|
|
return req, req.Valid()
|
2018-11-02 00:09:01 +00:00
|
|
|
}
|
|
|
|
|
2018-10-04 18:21:53 +00:00
|
|
|
// fluxParams contain flux funciton parameters as defined by the semantic graph
|
|
|
|
type fluxParams map[string]string
|
|
|
|
|
|
|
|
// suggestionResponse provides the parameters available for a given Flux function
|
|
|
|
type suggestionResponse struct {
|
|
|
|
Name string `json:"name"`
|
|
|
|
Params fluxParams `json:"params"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// suggestionsResponse provides a list of available Flux functions
|
|
|
|
type suggestionsResponse struct {
|
|
|
|
Functions []suggestionResponse `json:"funcs"`
|
|
|
|
}
|
|
|
|
|
|
|
|
// getFluxSuggestions returns a list of available Flux functions for the Flux Builder
|
|
|
|
func (h *FluxHandler) getFluxSuggestions(w http.ResponseWriter, r *http.Request) {
|
|
|
|
ctx := r.Context()
|
|
|
|
completer := complete.DefaultCompleter()
|
|
|
|
names := completer.FunctionNames()
|
|
|
|
var functions []suggestionResponse
|
|
|
|
for _, name := range names {
|
|
|
|
suggestion, err := completer.FunctionSuggestion(name)
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
filteredParams := make(fluxParams)
|
|
|
|
for key, value := range suggestion.Params {
|
|
|
|
if key == "table" {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
filteredParams[key] = value
|
|
|
|
}
|
|
|
|
|
|
|
|
functions = append(functions, suggestionResponse{
|
|
|
|
Name: name,
|
|
|
|
Params: filteredParams,
|
|
|
|
})
|
|
|
|
}
|
|
|
|
res := suggestionsResponse{Functions: functions}
|
|
|
|
|
|
|
|
if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// getFluxSuggestion returns the function parameters for the requested function
|
|
|
|
func (h *FluxHandler) getFluxSuggestion(w http.ResponseWriter, r *http.Request) {
|
|
|
|
ctx := r.Context()
|
|
|
|
name := httprouter.ParamsFromContext(ctx).ByName("name")
|
|
|
|
completer := complete.DefaultCompleter()
|
|
|
|
|
|
|
|
suggestion, err := completer.FunctionSuggestion(name)
|
|
|
|
if err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
res := suggestionResponse{Name: name, Params: suggestion.Params}
|
|
|
|
if err := encodeResponse(ctx, w, http.StatusOK, res); err != nil {
|
|
|
|
EncodeError(ctx, err, w)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2018-09-04 21:08:00 +00:00
|
|
|
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
|
|
|
|
func (h *FluxHandler) PrometheusCollectors() []prometheus.Collector {
|
|
|
|
// TODO: gather and return relevant metrics.
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2018-09-13 15:39:08 +00:00
|
|
|
var _ query.ProxyQueryService = (*FluxService)(nil)
|
|
|
|
|
2018-09-04 21:08:00 +00:00
|
|
|
// FluxService connects to Influx via HTTP using tokens to run queries.
|
|
|
|
type FluxService struct {
|
2018-10-04 19:11:45 +00:00
|
|
|
Addr string
|
2018-09-04 21:08:00 +00:00
|
|
|
Token string
|
|
|
|
InsecureSkipVerify bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// Query runs a flux query against a influx server and sends the results to the io.Writer.
|
2018-09-13 15:39:08 +00:00
|
|
|
// Will use the token from the context over the token within the service struct.
|
|
|
|
func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequest) (int64, error) {
|
2018-10-04 19:11:45 +00:00
|
|
|
u, err := newURL(s.Addr, fluxPath)
|
2018-09-04 21:08:00 +00:00
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2018-09-13 15:39:08 +00:00
|
|
|
|
2018-09-12 21:10:09 +00:00
|
|
|
qreq, err := QueryRequestFromProxyRequest(r)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2018-09-04 21:08:00 +00:00
|
|
|
var body bytes.Buffer
|
2018-09-12 21:10:09 +00:00
|
|
|
if err := json.NewEncoder(&body).Encode(qreq); err != nil {
|
2018-09-04 21:08:00 +00:00
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
hreq, err := http.NewRequest("POST", u.String(), &body)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
2018-09-13 15:39:08 +00:00
|
|
|
|
2018-10-19 16:39:54 +00:00
|
|
|
SetToken(s.Token, hreq)
|
2018-09-13 15:39:08 +00:00
|
|
|
|
2018-09-04 21:08:00 +00:00
|
|
|
hreq.Header.Set("Content-Type", "application/json")
|
2018-09-13 15:39:08 +00:00
|
|
|
hreq.Header.Set("Accept", "text/csv")
|
2018-09-04 21:08:00 +00:00
|
|
|
hreq = hreq.WithContext(ctx)
|
|
|
|
|
|
|
|
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
|
|
|
resp, err := hc.Do(hreq)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
defer resp.Body.Close()
|
2018-09-13 18:21:19 +00:00
|
|
|
|
2018-09-04 21:08:00 +00:00
|
|
|
if err := CheckError(resp); err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
return io.Copy(w, resp.Body)
|
|
|
|
}
|
2018-09-13 18:21:19 +00:00
|
|
|
|
|
|
|
var _ query.QueryService = (*FluxQueryService)(nil)
|
|
|
|
|
2018-09-26 08:49:19 +00:00
|
|
|
// FluxQueryService implements query.QueryService by making HTTP requests to the /api/v2/query API endpoint.
|
2018-09-13 18:21:19 +00:00
|
|
|
type FluxQueryService struct {
|
2018-10-04 19:11:45 +00:00
|
|
|
Addr string
|
2018-09-13 18:21:19 +00:00
|
|
|
Token string
|
|
|
|
InsecureSkipVerify bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// Query runs a flux query against a influx server and decodes the result
|
2018-09-13 20:26:36 +00:00
|
|
|
func (s *FluxQueryService) Query(ctx context.Context, r *query.Request) (flux.ResultIterator, error) {
|
2018-10-04 19:11:45 +00:00
|
|
|
u, err := newURL(s.Addr, fluxPath)
|
2018-09-13 18:21:19 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-09-20 17:29:22 +00:00
|
|
|
params := url.Values{}
|
|
|
|
params.Set(OrgID, r.OrganizationID.String())
|
|
|
|
u.RawQuery = params.Encode()
|
2018-09-13 18:21:19 +00:00
|
|
|
|
|
|
|
preq := &query.ProxyRequest{
|
2018-09-13 20:26:36 +00:00
|
|
|
Request: *r,
|
2018-09-13 18:21:19 +00:00
|
|
|
Dialect: csv.DefaultDialect(),
|
|
|
|
}
|
2018-09-12 21:10:09 +00:00
|
|
|
qreq, err := QueryRequestFromProxyRequest(preq)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2018-09-13 18:21:19 +00:00
|
|
|
var body bytes.Buffer
|
2018-09-12 21:10:09 +00:00
|
|
|
if err := json.NewEncoder(&body).Encode(qreq); err != nil {
|
2018-09-13 18:21:19 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
hreq, err := http.NewRequest("POST", u.String(), &body)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-10-19 16:39:54 +00:00
|
|
|
SetToken(s.Token, hreq)
|
2018-09-13 18:21:19 +00:00
|
|
|
|
|
|
|
hreq.Header.Set("Content-Type", "application/json")
|
|
|
|
hreq.Header.Set("Accept", "text/csv")
|
|
|
|
hreq = hreq.WithContext(ctx)
|
|
|
|
|
|
|
|
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
|
|
|
resp, err := hc.Do(hreq)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2018-12-13 22:11:17 +00:00
|
|
|
if err := CheckError(resp, true); err != nil {
|
2018-09-13 18:21:19 +00:00
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
decoder := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
|
|
|
|
return decoder.Decode(resp.Body)
|
|
|
|
}
|