feat(http): add flux endpoint /v2/query influx

pull/10616/head
Chris Goller 2018-09-04 16:08:00 -05:00
parent 41a0ad5364
commit 37bd273acd
8 changed files with 388 additions and 166 deletions

View File

@ -33,6 +33,7 @@ import (
taskbolt "github.com/influxdata/platform/task/backend/bolt"
"github.com/influxdata/platform/task/backend/coordinator"
taskexecutor "github.com/influxdata/platform/task/backend/executor"
pzap "github.com/influxdata/platform/zap"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
@ -283,6 +284,12 @@ func platformF(cmd *cobra.Command, args []string) {
writeHandler.BucketService = bucketSvc
writeHandler.Logger = logger.With(zap.String("handler", "write"))
queryHandler := http.NewFluxHandler()
queryHandler.AuthorizationService = authSvc
queryHandler.OrganizationService = orgSvc
queryHandler.Logger = logger.With(zap.String("handler", "query"))
queryHandler.ProxyQueryService = pzap.NewProxyQueryService(queryHandler.Logger)
// TODO(desa): what to do about idpe.
chronografHandler := http.NewChronografHandler(chronografSvc)
@ -298,6 +305,7 @@ func platformF(cmd *cobra.Command, args []string) {
SourceHandler: sourceHandler,
TaskHandler: taskHandler,
ViewHandler: cellHandler,
QueryHandler: queryHandler,
WriteHandler: writeHandler,
}
reg.MustRegister(platformHandler.PrometheusCollectors()...)

View File

@ -1,16 +1,11 @@
package http
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"unicode/utf8"
"github.com/influxdata/platform"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/csv"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@ -31,157 +26,17 @@ type ExternalQueryHandler struct {
func NewExternalQueryHandler() *ExternalQueryHandler {
h := &ExternalQueryHandler{
Router: httprouter.New(),
Logger: zap.NewNop(),
}
h.HandlerFunc("POST", "/query", h.handlePostQuery)
return h
}
func decodeQueryRequest(r *http.Request, req *query.ProxyRequest, orgSvc platform.OrganizationService) error {
request := struct {
Spec *query.Spec `json:"spec"`
Query string `json:"query"`
Type string `json:"type"`
Dialect struct {
Header *bool `json:"header"`
Delimiter string `json:"delimiter"`
CommentPrefix string `json:"commentPrefix"`
DateTimeFormat string `json:"dateTimeFormat"`
Annotations []string `json:"annotations"`
}
}{}
switch r.Header.Get("Content-Type") {
case "application/json":
orgName := r.URL.Query().Get("organization")
orgID := r.URL.Query().Get("organizationID")
filter := platform.OrganizationFilter{}
if orgID != "" {
var id platform.ID
err := id.DecodeFromString(orgID)
if err != nil {
return err
}
filter.ID = &id
}
if orgName != "" {
filter.Name = &orgName
}
o, err := orgSvc.FindOrganization(r.Context(), filter)
if err != nil {
return err
}
req.Request.OrganizationID = o.ID
err = json.NewDecoder(r.Body).Decode(&request)
if err != nil {
return err
}
// Set defaults
if request.Type == "" {
request.Type = "flux"
}
if request.Dialect.Header == nil {
header := true
request.Dialect.Header = &header
}
if request.Dialect.Delimiter == "" {
request.Dialect.Delimiter = ","
}
if request.Dialect.DateTimeFormat == "" {
request.Dialect.DateTimeFormat = "RFC3339"
}
if request.Type != "flux" {
return fmt.Errorf(`unknown query type: %s`, request.Type)
}
if len(request.Dialect.CommentPrefix) > 1 {
return fmt.Errorf("invalid dialect comment prefix: must be length 0 or 1")
}
if len(request.Dialect.Delimiter) != 1 {
return fmt.Errorf("invalid dialect delimeter: must be length 1")
}
for _, a := range request.Dialect.Annotations {
switch a {
case "group", "datatype", "default":
default:
return fmt.Errorf(`unknown dialect annotation type: %s`, a)
}
}
switch request.Dialect.DateTimeFormat {
case "RFC3339", "RFC3339Nano":
default:
return fmt.Errorf(`unknown dialect date time format: %s`, request.Dialect.DateTimeFormat)
}
if request.Query != "" {
req.Request.Compiler = query.FluxCompiler{Query: request.Query}
} else if request.Spec != nil {
req.Request.Compiler = query.SpecCompiler{
Spec: request.Spec,
}
} else {
return errors.New(`request body requires either spec or query`)
}
default:
orgName := r.FormValue("organization")
if orgName == "" {
return errors.New(`missing the "organization" parameter`)
}
o, err := orgSvc.FindOrganization(r.Context(), platform.OrganizationFilter{Name: &orgName})
if err != nil {
return err
}
req.Request.OrganizationID = o.ID
q := r.FormValue("query")
if q == "" {
data, err := ioutil.ReadAll(r.Body)
if err != nil {
return err
}
q = string(data)
}
req.Request.Compiler = query.FluxCompiler{
Query: q,
}
}
switch r.Header.Get("Accept") {
case "text/csv":
fallthrough
default:
var delimiter rune
dialect := request.Dialect
if dialect.Delimiter != "" {
delimiter, _ = utf8.DecodeRuneInString(dialect.Delimiter)
}
noHeader := false
if dialect.Header != nil {
noHeader = !*dialect.Header
}
// TODO(nathanielc): Use commentPrefix and dateTimeFormat
// once they are supported.
config := csv.ResultEncoderConfig{
NoHeader: noHeader,
Delimiter: delimiter,
Annotations: dialect.Annotations,
}
req.Dialect = csv.Dialect{
ResultEncoderConfig: config,
}
}
return nil
}
func (h *ExternalQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
var req query.ProxyRequest
if err := decodeQueryRequest(r, &req, h.OrganizationService); err != nil {
req, err := decodeProxyQueryRequest(ctx, r, h.OrganizationService)
if err != nil {
EncodeError(ctx, err, w)
return
}
@ -193,7 +48,7 @@ func (h *ExternalQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Re
}
hd.SetHeaders(w)
n, err := h.ProxyQueryService.Query(ctx, w, &req)
n, err := h.ProxyQueryService.Query(ctx, w, req)
if err != nil {
if n == 0 {
// Only record the error headers IFF nothing has been written to w.

View File

@ -22,6 +22,7 @@ type PlatformHandler struct {
SourceHandler *SourceHandler
TaskHandler *TaskHandler
FluxLangHandler *FluxLangHandler
QueryHandler *FluxHandler
WriteHandler *WriteHandler
}
@ -36,6 +37,7 @@ func setCORSResponseHeaders(w nethttp.ResponseWriter, r *nethttp.Request) {
var platformLinks = map[string]interface{}{
"sources": "/v2/sources",
"dashboards": "/v2/dashboards",
"query": "/v2/query",
"flux": map[string]string{
"self": "/v2/flux",
"ast": "/v2/flux/ast",
@ -76,16 +78,6 @@ func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request
return
}
if strings.HasPrefix(r.URL.Path, "/v2/flux") {
h.FluxLangHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/chronograf/") {
h.ChronografHandler.ServeHTTP(w, r)
return
}
ctx := r.Context()
var err error
if ctx, err = extractAuthorization(ctx, r); err != nil {
@ -94,6 +86,16 @@ func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request
}
r = r.WithContext(ctx)
if strings.HasPrefix(r.URL.Path, "/v2/write") {
h.WriteHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/v2/query") {
h.QueryHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/v1/buckets") {
h.BucketHandler.ServeHTTP(w, r)
return
@ -129,13 +131,18 @@ func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request
return
}
if strings.HasSuffix(r.URL.Path, "/write") {
h.WriteHandler.ServeHTTP(w, r)
if strings.HasPrefix(r.URL.Path, "/v2/views") {
h.ViewHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/v2/views") {
h.ViewHandler.ServeHTTP(w, r)
if strings.HasPrefix(r.URL.Path, "/v2/flux") {
h.FluxLangHandler.ServeHTTP(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/chronograf/") {
h.ChronografHandler.ServeHTTP(w, r)
return
}

153
http/query.go Normal file
View File

@ -0,0 +1,153 @@
package http
import (
"context"
"encoding/json"
"fmt"
"net/http"
"unicode/utf8"
"github.com/influxdata/platform"
"github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/query"
"github.com/influxdata/platform/query/csv"
)
// QueryRequest is a flux query request.
type QueryRequest struct {
Spec *query.Spec `json:"spec,omitempty"`
Query string `json:"query"`
Type string `json:"type"`
Dialect QueryDialect `json:"dialect"`
org *platform.Organization
}
// QueryDialect is the formatting options for the query response.
type QueryDialect struct {
Header *bool `json:"header"`
Delimiter string `json:"delimiter"`
CommentPrefix string `json:"commentPrefix"`
DateTimeFormat string `json:"dateTimeFormat"`
Annotations []string `json:"annotations"`
}
// WithDefaults adds default values to the request.
func (r QueryRequest) WithDefaults() QueryRequest {
if r.Type == "" {
r.Type = "flux"
}
if r.Dialect.Delimiter == "" {
r.Dialect.Delimiter = ","
}
if r.Dialect.DateTimeFormat == "" {
r.Dialect.DateTimeFormat = "RFC3339"
}
if r.Dialect.Header == nil {
header := true
r.Dialect.Header = &header
}
return r
}
// Validate checks the query request and returns an error if the request is invalid.
func (r QueryRequest) Validate() error {
if r.Query == "" && r.Spec == nil {
return errors.New(`request body requires either spec or query`)
}
if r.Type != "flux" {
return fmt.Errorf(`unknown query type: %s`, r.Type)
}
if len(r.Dialect.CommentPrefix) > 1 {
return fmt.Errorf("invalid dialect comment prefix: must be length 0 or 1")
}
if len(r.Dialect.Delimiter) != 1 {
return fmt.Errorf("invalid dialect delimeter: must be length 1")
}
rune, size := utf8.DecodeRuneInString(r.Dialect.Delimiter)
if rune == utf8.RuneError && size == 1 {
return fmt.Errorf("invalid dialect delimeter character")
}
for _, a := range r.Dialect.Annotations {
switch a {
case "group", "datatype", "default":
default:
return fmt.Errorf(`unknown dialect annotation type: %s`, a)
}
}
switch r.Dialect.DateTimeFormat {
case "RFC3339", "RFC3339Nano":
default:
return fmt.Errorf(`unknown dialect date time format: %s`, r.Dialect.DateTimeFormat)
}
return nil
}
// ProxyRequest returns a request to proxy from the query.
func (r QueryRequest) ProxyRequest() *query.ProxyRequest {
// Query is preferred over spec
var compiler query.Compiler
if r.Query != "" {
compiler = query.FluxCompiler{
Query: r.Query,
}
} else if r.Spec != nil {
compiler = query.SpecCompiler{
Spec: r.Spec,
}
}
delimiter, _ := utf8.DecodeRuneInString(r.Dialect.Delimiter)
noHeader := false
if r.Dialect.Header != nil {
noHeader = !*r.Dialect.Header
}
// TODO(nathanielc): Use commentPrefix and dateTimeFormat
// once they are supported.
return &query.ProxyRequest{
Request: query.Request{
OrganizationID: r.org.ID,
Compiler: compiler,
},
Dialect: csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: noHeader,
Delimiter: delimiter,
Annotations: r.Dialect.Annotations,
},
},
}
}
func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*QueryRequest, error) {
var req QueryRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return nil, err
}
req = req.WithDefaults()
err := req.Validate()
if err != nil {
return nil, err
}
req.org, err = queryOrganization(ctx, r, svc)
return &req, err
}
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*query.ProxyRequest, error) {
req, err := decodeQueryRequest(ctx, r, svc)
if err != nil {
return nil, err
}
return req.ProxyRequest(), nil
}

134
http/query_handler.go Normal file
View File

@ -0,0 +1,134 @@
package http
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"github.com/influxdata/platform"
pcontext "github.com/influxdata/platform/context"
"github.com/influxdata/platform/kit/errors"
"github.com/influxdata/platform/query"
"github.com/julienschmidt/httprouter"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
const (
fluxPath = "/v2/query"
)
// FluxHandler implements handling flux queries.
type FluxHandler struct {
*httprouter.Router
Logger *zap.Logger
AuthorizationService platform.AuthorizationService
OrganizationService platform.OrganizationService
ProxyQueryService query.ProxyQueryService
}
// NewFluxHandler returns a new handler at /v2/query for flux queries.
func NewFluxHandler() *FluxHandler {
h := &FluxHandler{
Router: httprouter.New(),
Logger: zap.NewNop(),
}
h.HandlerFunc("POST", "/v2/query", h.handlePostQuery)
return h
}
func (h *FluxHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
tok, err := pcontext.GetToken(ctx)
if err != nil {
EncodeError(ctx, err, w)
return
}
auth, err := h.AuthorizationService.FindAuthorizationByToken(ctx, tok)
if err != nil {
EncodeError(ctx, errors.Wrap(err, "invalid token", errors.InvalidData), w)
return
}
if !platform.IsActive(auth) {
EncodeError(ctx, errors.Forbiddenf("insufficient permissions for write"), w)
return
}
req, err := decodeProxyQueryRequest(ctx, r, h.OrganizationService)
if err != nil {
EncodeError(ctx, err, w)
return
}
hd, ok := req.Dialect.(HTTPDialect)
if !ok {
EncodeError(ctx, fmt.Errorf("unsupported dialect over HTTP %T", req.Dialect), w)
return
}
hd.SetHeaders(w)
n, err := h.ProxyQueryService.Query(ctx, w, req)
if err != nil {
if n == 0 {
// Only record the error headers IFF nothing has been written to w.
EncodeError(ctx, err, w)
return
}
h.Logger.Info("Error writing response to client",
zap.String("handler", "flux"),
zap.Error(err),
)
}
}
// PrometheusCollectors satisifies the prom.PrometheusCollector interface.
func (h *FluxHandler) PrometheusCollectors() []prometheus.Collector {
// TODO: gather and return relevant metrics.
return nil
}
// FluxService connects to Influx via HTTP using tokens to run queries.
type FluxService struct {
URL string
Token string
InsecureSkipVerify bool
}
// Query runs a flux query against a influx server and sends the results to the io.Writer.
func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
u, err := newURL(s.URL, fluxPath)
if err != nil {
return 0, err
}
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(req); err != nil {
return 0, err
}
hreq, err := http.NewRequest("POST", u.String(), &body)
if err != nil {
return 0, err
}
SetToken(s.Token, hreq)
hreq.Header.Set("Content-Type", "application/json")
hreq = hreq.WithContext(ctx)
hc := newClient(u.Scheme, s.InsecureSkipVerify)
resp, err := hc.Do(hreq)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if err := CheckError(resp); err != nil {
return 0, err
}
return io.Copy(w, resp.Body)
}

32
http/requests.go Normal file
View File

@ -0,0 +1,32 @@
package http
import (
"context"
"net/http"
"github.com/influxdata/platform"
)
const (
// OrgName is the http query parameter to specify an organization by name.
OrgName = "organization"
// OrgID is the http query parameter to specify an organization by ID.
OrgID = "organizationID"
)
// queryOrganization returns the organization for any http request.
func queryOrganization(ctx context.Context, r *http.Request, svc platform.OrganizationService) (o *platform.Organization, err error) {
filter := platform.OrganizationFilter{}
if reqID := r.URL.Query().Get(OrgID); reqID != "" {
filter.ID, err = platform.IDFromString(reqID)
if err != nil {
return nil, err
}
}
if name := r.URL.Query().Get(OrgName); name != "" {
filter.Name = &name
}
return svc.FindOrganization(ctx, filter)
}

View File

@ -14,9 +14,7 @@ import (
"go.uber.org/zap"
)
const NatsServerID = "nats"
const NatsClientID = "nats-client"
// WriteHandler receives line protocol and sends to a publish function.
type WriteHandler struct {
*httprouter.Router
@ -29,6 +27,7 @@ type WriteHandler struct {
Publish func(io.Reader) error
}
// NewWriteHandler creates a new handler at /v2/write to receive line protocol.
func NewWriteHandler(publishFn func(io.Reader) error) *WriteHandler {
h := &WriteHandler{
Router: httprouter.New(),

View File

@ -0,0 +1,34 @@
package zap
import (
"context"
"io"
"github.com/influxdata/platform/query"
"go.uber.org/zap"
)
// ProxyQueryService logs the request but does not write to the writer.
type ProxyQueryService struct {
Logger *zap.Logger
}
// NewProxyQueryService creates a new proxy query service with a logger.
// If the logger is nil, then it will use a noop logger.
func NewProxyQueryService(l *zap.Logger) *ProxyQueryService {
if l == nil {
l = zap.NewNop()
}
return &ProxyQueryService{
Logger: l,
}
}
// Query logs the query request.
func (s *ProxyQueryService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
if req != nil {
s.Logger.Info("query", zap.Any("request", req))
}
n, err := w.Write([]byte{})
return int64(n), err
}