commit
5c75610e3c
|
@ -35,7 +35,9 @@ func NewExternalQueryHandler() *ExternalQueryHandler {
|
||||||
|
|
||||||
func (h *ExternalQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
|
func (h *ExternalQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
req, err := decodeProxyQueryRequest(ctx, r, h.OrganizationService)
|
// this handler doesn't support authorization checks.
|
||||||
|
var nilauth *platform.Authorization
|
||||||
|
req, err := decodeProxyQueryRequest(ctx, r, nilauth, h.OrganizationService)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
return
|
return
|
||||||
|
|
|
@ -149,10 +149,17 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ
|
||||||
return &req, err
|
return &req, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*query.ProxyRequest, error) {
|
func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth *platform.Authorization, svc platform.OrganizationService) (*query.ProxyRequest, error) {
|
||||||
req, err := decodeQueryRequest(ctx, r, svc)
|
req, err := decodeQueryRequest(ctx, r, svc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return req.ProxyRequest()
|
|
||||||
|
pr, err := req.ProxyRequest()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pr.Request.Authorization = auth
|
||||||
|
return pr, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,8 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/influxdata/flux"
|
||||||
|
"github.com/influxdata/flux/csv"
|
||||||
"github.com/influxdata/platform"
|
"github.com/influxdata/platform"
|
||||||
pcontext "github.com/influxdata/platform/context"
|
pcontext "github.com/influxdata/platform/context"
|
||||||
"github.com/influxdata/platform/kit/errors"
|
"github.com/influxdata/platform/kit/errors"
|
||||||
|
@ -62,7 +64,7 @@ func (h *FluxHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := decodeProxyQueryRequest(ctx, r, h.OrganizationService)
|
req, err := decodeProxyQueryRequest(ctx, r, auth, h.OrganizationService)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
EncodeError(ctx, err, w)
|
EncodeError(ctx, err, w)
|
||||||
return
|
return
|
||||||
|
@ -95,6 +97,8 @@ func (h *FluxHandler) PrometheusCollectors() []prometheus.Collector {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ query.ProxyQueryService = (*FluxService)(nil)
|
||||||
|
|
||||||
// FluxService connects to Influx via HTTP using tokens to run queries.
|
// FluxService connects to Influx via HTTP using tokens to run queries.
|
||||||
type FluxService struct {
|
type FluxService struct {
|
||||||
URL string
|
URL string
|
||||||
|
@ -103,13 +107,15 @@ type FluxService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query runs a flux query against a influx server and sends the results to the io.Writer.
|
// Query runs a flux query against a influx server and sends the results to the io.Writer.
|
||||||
func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) {
|
// Will use the token from the context over the token within the service struct.
|
||||||
|
func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequest) (int64, error) {
|
||||||
u, err := newURL(s.URL, fluxPath)
|
u, err := newURL(s.URL, fluxPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var body bytes.Buffer
|
var body bytes.Buffer
|
||||||
if err := json.NewEncoder(&body).Encode(req); err != nil {
|
if err := json.NewEncoder(&body).Encode(r); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,8 +123,15 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRe
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
SetToken(s.Token, hreq)
|
|
||||||
|
tok, err := pcontext.GetToken(ctx)
|
||||||
|
if err != nil {
|
||||||
|
tok = s.Token
|
||||||
|
}
|
||||||
|
SetToken(tok, hreq)
|
||||||
|
|
||||||
hreq.Header.Set("Content-Type", "application/json")
|
hreq.Header.Set("Content-Type", "application/json")
|
||||||
|
hreq.Header.Set("Accept", "text/csv")
|
||||||
hreq = hreq.WithContext(ctx)
|
hreq = hreq.WithContext(ctx)
|
||||||
|
|
||||||
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
||||||
|
@ -127,8 +140,63 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRe
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if err := CheckError(resp); err != nil {
|
if err := CheckError(resp); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
return io.Copy(w, resp.Body)
|
return io.Copy(w, resp.Body)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ query.QueryService = (*FluxQueryService)(nil)
|
||||||
|
|
||||||
|
// FluxQueryService implements query.QueryService by making HTTP requests to the /v2/query API endpoint.
|
||||||
|
type FluxQueryService struct {
|
||||||
|
URL string
|
||||||
|
Token string
|
||||||
|
InsecureSkipVerify bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query runs a flux query against a influx server and decodes the result
|
||||||
|
func (s *FluxQueryService) Query(ctx context.Context, r *query.Request) (flux.ResultIterator, error) {
|
||||||
|
u, err := newURL(s.URL, fluxPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
preq := &query.ProxyRequest{
|
||||||
|
Request: *r,
|
||||||
|
Dialect: csv.DefaultDialect(),
|
||||||
|
}
|
||||||
|
var body bytes.Buffer
|
||||||
|
if err := json.NewEncoder(&body).Encode(preq); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
hreq, err := http.NewRequest("POST", u.String(), &body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
tok, err := pcontext.GetToken(ctx)
|
||||||
|
if err != nil {
|
||||||
|
tok = s.Token
|
||||||
|
}
|
||||||
|
SetToken(tok, hreq)
|
||||||
|
|
||||||
|
hreq.Header.Set("Content-Type", "application/json")
|
||||||
|
hreq.Header.Set("Accept", "text/csv")
|
||||||
|
hreq = hreq.WithContext(ctx)
|
||||||
|
|
||||||
|
hc := newClient(u.Scheme, s.InsecureSkipVerify)
|
||||||
|
resp, err := hc.Do(hreq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := CheckError(resp); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
decoder := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{})
|
||||||
|
return decoder.Decode(resp.Body)
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,183 @@
|
||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"reflect"
|
||||||
|
"regexp"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/influxdata/flux/csv"
|
||||||
|
"github.com/influxdata/flux/lang"
|
||||||
|
"github.com/influxdata/platform/query"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestFluxService_Query(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
token string
|
||||||
|
ctx context.Context
|
||||||
|
r *query.ProxyRequest
|
||||||
|
status int
|
||||||
|
want int64
|
||||||
|
wantW string
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "query",
|
||||||
|
ctx: context.Background(),
|
||||||
|
token: "mytoken",
|
||||||
|
r: &query.ProxyRequest{
|
||||||
|
Request: query.Request{
|
||||||
|
Compiler: lang.FluxCompiler{
|
||||||
|
Query: "from()",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dialect: csv.DefaultDialect(),
|
||||||
|
},
|
||||||
|
status: http.StatusOK,
|
||||||
|
want: 6,
|
||||||
|
wantW: "howdy\n",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "error status",
|
||||||
|
token: "mytoken",
|
||||||
|
ctx: context.Background(),
|
||||||
|
r: &query.ProxyRequest{
|
||||||
|
Request: query.Request{
|
||||||
|
Compiler: lang.FluxCompiler{
|
||||||
|
Query: "from()",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Dialect: csv.DefaultDialect(),
|
||||||
|
},
|
||||||
|
status: http.StatusUnauthorized,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(tt.status)
|
||||||
|
fmt.Fprintln(w, "howdy")
|
||||||
|
}))
|
||||||
|
defer ts.Close()
|
||||||
|
s := &FluxService{
|
||||||
|
URL: ts.URL,
|
||||||
|
Token: tt.token,
|
||||||
|
}
|
||||||
|
|
||||||
|
w := &bytes.Buffer{}
|
||||||
|
got, err := s.Query(tt.ctx, w, tt.r)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("FluxService.Query() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if got != tt.want {
|
||||||
|
t.Errorf("FluxService.Query() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
if gotW := w.String(); gotW != tt.wantW {
|
||||||
|
t.Errorf("FluxService.Query() = %v, want %v", gotW, tt.wantW)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestFluxQueryService_Query(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
token string
|
||||||
|
ctx context.Context
|
||||||
|
r *query.Request
|
||||||
|
csv string
|
||||||
|
status int
|
||||||
|
want string
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "error status",
|
||||||
|
token: "mytoken",
|
||||||
|
ctx: context.Background(),
|
||||||
|
r: &query.Request{
|
||||||
|
Compiler: lang.FluxCompiler{
|
||||||
|
Query: "from()",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
status: http.StatusUnauthorized,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "returns csv",
|
||||||
|
token: "mytoken",
|
||||||
|
ctx: context.Background(),
|
||||||
|
r: &query.Request{
|
||||||
|
Compiler: lang.FluxCompiler{
|
||||||
|
Query: "from()",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
status: http.StatusOK,
|
||||||
|
csv: `#datatype,string,long,dateTime:RFC3339,double,long,string,boolean,string,string,string
|
||||||
|
#group,false,false,false,false,false,false,false,true,true,true
|
||||||
|
#default,0,,,,,,,,,
|
||||||
|
,result,table,_time,usage_user,test,mystr,this,cpu,host,_measurement
|
||||||
|
,,0,2018-08-29T13:08:47Z,10.2,10,yay,true,cpu-total,a,cpui
|
||||||
|
`,
|
||||||
|
want: toCRLF(`,,,2018-08-29T13:08:47Z,10.2,10,yay,true,cpu-total,a,cpui
|
||||||
|
|
||||||
|
`),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.WriteHeader(tt.status)
|
||||||
|
fmt.Fprintln(w, tt.csv)
|
||||||
|
}))
|
||||||
|
s := &FluxQueryService{
|
||||||
|
URL: ts.URL,
|
||||||
|
Token: tt.token,
|
||||||
|
}
|
||||||
|
res, err := s.Query(tt.ctx, tt.r)
|
||||||
|
if (err != nil) != tt.wantErr {
|
||||||
|
t.Errorf("FluxQueryService.Query() error = %v, wantErr %v", err, tt.wantErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if res != nil && res.Err() != nil {
|
||||||
|
t.Errorf("FluxQueryService.Query() result error = %v", res.Err())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if tt.wantErr {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer res.Cancel()
|
||||||
|
|
||||||
|
enc := csv.NewMultiResultEncoder(csv.ResultEncoderConfig{
|
||||||
|
NoHeader: true,
|
||||||
|
Delimiter: ',',
|
||||||
|
})
|
||||||
|
b := bytes.Buffer{}
|
||||||
|
n, err := enc.Encode(&b, res)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("FluxQueryService.Query() encode error = %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if n != int64(len(tt.want)) {
|
||||||
|
t.Errorf("FluxQueryService.Query() encode result = %d, want %d", n, len(tt.want))
|
||||||
|
}
|
||||||
|
|
||||||
|
got := b.String()
|
||||||
|
if !reflect.DeepEqual(got, tt.want) {
|
||||||
|
t.Errorf("FluxQueryService.Query() =\n%s\n%s", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var crlfPattern = regexp.MustCompile(`\r?\n`)
|
||||||
|
|
||||||
|
func toCRLF(data string) string {
|
||||||
|
return crlfPattern.ReplaceAllString(data, "\r\n")
|
||||||
|
}
|
Loading…
Reference in New Issue