feat(http): add flux query service client
parent
310a64fc97
commit
052c896fa4
|
|
@ -8,6 +8,8 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/platform"
|
||||
pcontext "github.com/influxdata/platform/context"
|
||||
"github.com/influxdata/platform/kit/errors"
|
||||
|
|
@ -138,8 +140,64 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequ
|
|||
return 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := CheckError(resp); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return io.Copy(w, resp.Body)
|
||||
}
|
||||
|
||||
var _ query.QueryService = (*FluxQueryService)(nil)
|
||||
|
||||
// FluxQueryService implements query.QueryService by making HTTP requests to the /v2/query API endpoint.
|
||||
type FluxQueryService struct {
|
||||
URL string
|
||||
Token string
|
||||
InsecureSkipVerify bool
|
||||
}
|
||||
|
||||
// Query runs a flux query against a influx server and decodes the result
|
||||
func (s *FluxQueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) {
|
||||
u, err := newURL(s.URL, fluxPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
preq := &query.ProxyRequest{
|
||||
Request: *req,
|
||||
Dialect: csv.DefaultDialect(),
|
||||
}
|
||||
var body bytes.Buffer
|
||||
if err := json.NewEncoder(&body).Encode(preq); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hreq, err := http.NewRequest("POST", u.String(), &body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tok, err := pcontext.GetToken(ctx)
|
||||
if err != nil {
|
||||
tok = s.Token
|
||||
}
|
||||
SetToken(tok, hreq)
|
||||
|
||||
hreq.Header.Set("Content-Type", "application/json")
|
||||
hreq.Header.Set("Accept", "text/csv")
|
||||
hreq = hreq.WithContext(ctx)
|
||||
|
||||
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
||||
resp, err := hc.Do(hreq)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err := CheckError(resp); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
decoder := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
|
||||
return decoder.Decode(resp.Body)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue