diff --git a/http/query_handler.go b/http/query_handler.go index 61686b47bd..197d471923 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -8,6 +8,8 @@ import ( "io" "net/http" + "github.com/influxdata/flux" + "github.com/influxdata/flux/csv" "github.com/influxdata/platform" pcontext "github.com/influxdata/platform/context" "github.com/influxdata/platform/kit/errors" @@ -138,8 +140,64 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequ return 0, err } defer resp.Body.Close() + if err := CheckError(resp); err != nil { return 0, err } return io.Copy(w, resp.Body) } + +var _ query.QueryService = (*FluxQueryService)(nil) + +// FluxQueryService implements query.QueryService by making HTTP requests to the /v2/query API endpoint. +type FluxQueryService struct { + URL string + Token string + InsecureSkipVerify bool +} + +// Query runs a flux query against a influx server and decodes the result +func (s *FluxQueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) { + u, err := newURL(s.URL, fluxPath) + if err != nil { + return nil, err + } + + preq := &query.ProxyRequest{ + Request: *req, + Dialect: csv.DefaultDialect(), + } + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(preq); err != nil { + return nil, err + } + + hreq, err := http.NewRequest("POST", u.String(), &body) + if err != nil { + return nil, err + } + + tok, err := pcontext.GetToken(ctx) + if err != nil { + tok = s.Token + } + SetToken(tok, hreq) + + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("Accept", "text/csv") + hreq = hreq.WithContext(ctx) + + hc := newClient(u.Scheme, s.InsecureSkipVerify) + resp, err := hc.Do(hreq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return nil, err + } + + decoder := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{}) + return decoder.Decode(resp.Body) +}