From 61d60a3c6113707067c3eb94dac2752e0db71823 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Thu, 13 Sep 2018 10:39:08 -0500 Subject: [PATCH 1/5] feat(http): update /v2/query client to send query.Request with auth --- http/external_query_handler.go | 4 +++- http/query.go | 11 +++++++++-- http/query_handler.go | 22 ++++++++++++++++++---- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/http/external_query_handler.go b/http/external_query_handler.go index 91b5c9bc96..b89f89fe7e 100644 --- a/http/external_query_handler.go +++ b/http/external_query_handler.go @@ -35,7 +35,9 @@ func NewExternalQueryHandler() *ExternalQueryHandler { func (h *ExternalQueryHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { ctx := r.Context() - req, err := decodeProxyQueryRequest(ctx, r, h.OrganizationService) + // this handler doesn't support authorization checks. + var nilauth *platform.Authorization + req, err := decodeProxyQueryRequest(ctx, r, nilauth, h.OrganizationService) if err != nil { EncodeError(ctx, err, w) return diff --git a/http/query.go b/http/query.go index 0ddae83399..4295c8e35f 100644 --- a/http/query.go +++ b/http/query.go @@ -149,10 +149,17 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc platform.Organ return &req, err } -func decodeProxyQueryRequest(ctx context.Context, r *http.Request, svc platform.OrganizationService) (*query.ProxyRequest, error) { +func decodeProxyQueryRequest(ctx context.Context, r *http.Request, auth *platform.Authorization, svc platform.OrganizationService) (*query.ProxyRequest, error) { req, err := decodeQueryRequest(ctx, r, svc) if err != nil { return nil, err } - return req.ProxyRequest() + + pr, err := req.ProxyRequest() + if err != nil { + return nil, err + } + + pr.Request.Authorization = auth + return pr, nil } diff --git a/http/query_handler.go b/http/query_handler.go index 08e3bdd42d..e1d2aad951 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -62,7 +62,7 @@ func (h *FluxHandler) handlePostQuery(w http.ResponseWriter, r *http.Request) { return } - req, err := decodeProxyQueryRequest(ctx, r, h.OrganizationService) + req, err := decodeProxyQueryRequest(ctx, r, auth, h.OrganizationService) if err != nil { EncodeError(ctx, err, w) return @@ -95,6 +95,8 @@ func (h *FluxHandler) PrometheusCollectors() []prometheus.Collector { return nil } +var _ query.ProxyQueryService = (*FluxService)(nil) + // FluxService connects to Influx via HTTP using tokens to run queries. type FluxService struct { URL string @@ -103,13 +105,18 @@ type FluxService struct { } // Query runs a flux query against a influx server and sends the results to the io.Writer. -func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRequest) (int64, error) { +// Will use the token from the context over the token within the service struct. +func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequest) (int64, error) { u, err := newURL(s.URL, fluxPath) if err != nil { return 0, err } + + // TODO(goller): No way to send dialect information to the flux query handler at the moment. + // the query handler needs some way of taking the dialect encoding information. + var body bytes.Buffer - if err := json.NewEncoder(&body).Encode(req); err != nil { + if err := json.NewEncoder(&body).Encode(r.Request); err != nil { return 0, err } @@ -117,8 +124,15 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, req *query.ProxyRe if err != nil { return 0, err } - SetToken(s.Token, hreq) + + tok, err := pcontext.GetToken(ctx) + if err != nil { + tok = s.Token + } + SetToken(tok, hreq) + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("Accept", "text/csv") hreq = hreq.WithContext(ctx) hc := newClient(u.Scheme, s.InsecureSkipVerify) From 02ac64fc35bc3d0606831e9324828d97cbde1de4 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Thu, 13 Sep 2018 11:56:49 -0500 Subject: [PATCH 2/5] test(http): add test for querying flux service --- http/query_handler_test.go | 82 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 http/query_handler_test.go diff --git a/http/query_handler_test.go b/http/query_handler_test.go new file mode 100644 index 0000000000..e981b3f007 --- /dev/null +++ b/http/query_handler_test.go @@ -0,0 +1,82 @@ +package http + +import ( + "bytes" + "context" + "fmt" + "github.com/influxdata/flux/lang" + "net/http" + "net/http/httptest" + "testing" + + "github.com/influxdata/platform/query" +) + +func TestFluxService_Query(t *testing.T) { + tests := []struct { + name string + token string + ctx context.Context + r *query.ProxyRequest + status int + want int64 + wantW string + wantErr bool + }{ + { + name: "query", + ctx: context.Background(), + token: "mytoken", + r: &query.ProxyRequest{ + Request: query.Request{ + Compiler: lang.FluxCompiler{ + Query: "from()", + }, + }, + }, + status: http.StatusOK, + want: 6, + wantW: "howdy\n", + }, + { + name: "error status", + token: "mytoken", + ctx: context.Background(), + r: &query.ProxyRequest{ + Request: query.Request{ + Compiler: lang.FluxCompiler{ + Query: "from()", + }, + }, + }, + status: http.StatusUnauthorized, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.status) + fmt.Fprintln(w, "howdy") + })) + defer ts.Close() + s := &FluxService{ + URL: ts.URL, + Token: tt.token, + } + + w := &bytes.Buffer{} + got, err := s.Query(tt.ctx, w, tt.r) + if (err != nil) != tt.wantErr { + t.Errorf("FluxService.Query() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("FluxService.Query() = %v, want %v", got, tt.want) + } + if gotW := w.String(); gotW != tt.wantW { + t.Errorf("FluxService.Query() = %v, want %v", gotW, tt.wantW) + } + }) + } +} From 310a64fc9736eb1c6b8c9351780422f7a0cbf632 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Thu, 13 Sep 2018 13:00:27 -0500 Subject: [PATCH 3/5] feat(http): use proxy request in flux service --- http/query_handler.go | 5 +---- http/query_handler_test.go | 6 ++++-- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/http/query_handler.go b/http/query_handler.go index e1d2aad951..61686b47bd 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -112,11 +112,8 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequ return 0, err } - // TODO(goller): No way to send dialect information to the flux query handler at the moment. - // the query handler needs some way of taking the dialect encoding information. - var body bytes.Buffer - if err := json.NewEncoder(&body).Encode(r.Request); err != nil { + if err := json.NewEncoder(&body).Encode(r); err != nil { return 0, err } diff --git a/http/query_handler_test.go b/http/query_handler_test.go index e981b3f007..27047028c3 100644 --- a/http/query_handler_test.go +++ b/http/query_handler_test.go @@ -4,12 +4,12 @@ import ( "bytes" "context" "fmt" + "github.com/influxdata/flux/csv" "github.com/influxdata/flux/lang" + "github.com/influxdata/platform/query" "net/http" "net/http/httptest" "testing" - - "github.com/influxdata/platform/query" ) func TestFluxService_Query(t *testing.T) { @@ -33,6 +33,7 @@ func TestFluxService_Query(t *testing.T) { Query: "from()", }, }, + Dialect: csv.DefaultDialect(), }, status: http.StatusOK, want: 6, @@ -48,6 +49,7 @@ func TestFluxService_Query(t *testing.T) { Query: "from()", }, }, + Dialect: csv.DefaultDialect(), }, status: http.StatusUnauthorized, wantErr: true, From 052c896fa48ebfca2ebc19b7538011659fc26137 Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Thu, 13 Sep 2018 13:21:19 -0500 Subject: [PATCH 4/5] feat(http): add flux query service client --- http/query_handler.go | 58 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/http/query_handler.go b/http/query_handler.go index 61686b47bd..197d471923 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -8,6 +8,8 @@ import ( "io" "net/http" + "github.com/influxdata/flux" + "github.com/influxdata/flux/csv" "github.com/influxdata/platform" pcontext "github.com/influxdata/platform/context" "github.com/influxdata/platform/kit/errors" @@ -138,8 +140,64 @@ func (s *FluxService) Query(ctx context.Context, w io.Writer, r *query.ProxyRequ return 0, err } defer resp.Body.Close() + if err := CheckError(resp); err != nil { return 0, err } return io.Copy(w, resp.Body) } + +var _ query.QueryService = (*FluxQueryService)(nil) + +// FluxQueryService implements query.QueryService by making HTTP requests to the /v2/query API endpoint. +type FluxQueryService struct { + URL string + Token string + InsecureSkipVerify bool +} + +// Query runs a flux query against a influx server and decodes the result +func (s *FluxQueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) { + u, err := newURL(s.URL, fluxPath) + if err != nil { + return nil, err + } + + preq := &query.ProxyRequest{ + Request: *req, + Dialect: csv.DefaultDialect(), + } + var body bytes.Buffer + if err := json.NewEncoder(&body).Encode(preq); err != nil { + return nil, err + } + + hreq, err := http.NewRequest("POST", u.String(), &body) + if err != nil { + return nil, err + } + + tok, err := pcontext.GetToken(ctx) + if err != nil { + tok = s.Token + } + SetToken(tok, hreq) + + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("Accept", "text/csv") + hreq = hreq.WithContext(ctx) + + hc := newClient(u.Scheme, s.InsecureSkipVerify) + resp, err := hc.Do(hreq) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if err := CheckError(resp); err != nil { + return nil, err + } + + decoder := csv.NewMultiResultDecoder(csv.ResultDecoderConfig{}) + return decoder.Decode(resp.Body) +} From 65fa08abca4cb9d3c9d16a819e90d67872d4164f Mon Sep 17 00:00:00 2001 From: Chris Goller Date: Thu, 13 Sep 2018 15:26:36 -0500 Subject: [PATCH 5/5] test(http): add flux query service test --- http/query_handler.go | 5 +- http/query_handler_test.go | 105 +++++++++++++++++++++++++++++++++++-- 2 files changed, 104 insertions(+), 6 deletions(-) diff --git a/http/query_handler.go b/http/query_handler.go index 197d471923..9df796bd48 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -157,14 +157,14 @@ type FluxQueryService struct { } // Query runs a flux query against a influx server and decodes the result -func (s *FluxQueryService) Query(ctx context.Context, req *query.Request) (flux.ResultIterator, error) { +func (s *FluxQueryService) Query(ctx context.Context, r *query.Request) (flux.ResultIterator, error) { u, err := newURL(s.URL, fluxPath) if err != nil { return nil, err } preq := &query.ProxyRequest{ - Request: *req, + Request: *r, Dialect: csv.DefaultDialect(), } var body bytes.Buffer @@ -192,7 +192,6 @@ func (s *FluxQueryService) Query(ctx context.Context, req *query.Request) (flux. if err != nil { return nil, err } - defer resp.Body.Close() if err := CheckError(resp); err != nil { return nil, err diff --git a/http/query_handler_test.go b/http/query_handler_test.go index 27047028c3..2ad51dc4ba 100644 --- a/http/query_handler_test.go +++ b/http/query_handler_test.go @@ -4,12 +4,15 @@ import ( "bytes" "context" "fmt" + "net/http" + "net/http/httptest" + "reflect" + "regexp" + "testing" + "github.com/influxdata/flux/csv" "github.com/influxdata/flux/lang" "github.com/influxdata/platform/query" - "net/http" - "net/http/httptest" - "testing" ) func TestFluxService_Query(t *testing.T) { @@ -82,3 +85,99 @@ func TestFluxService_Query(t *testing.T) { }) } } + +func TestFluxQueryService_Query(t *testing.T) { + tests := []struct { + name string + token string + ctx context.Context + r *query.Request + csv string + status int + want string + wantErr bool + }{ + { + name: "error status", + token: "mytoken", + ctx: context.Background(), + r: &query.Request{ + Compiler: lang.FluxCompiler{ + Query: "from()", + }, + }, + status: http.StatusUnauthorized, + wantErr: true, + }, + { + name: "returns csv", + token: "mytoken", + ctx: context.Background(), + r: &query.Request{ + Compiler: lang.FluxCompiler{ + Query: "from()", + }, + }, + status: http.StatusOK, + csv: `#datatype,string,long,dateTime:RFC3339,double,long,string,boolean,string,string,string +#group,false,false,false,false,false,false,false,true,true,true +#default,0,,,,,,,,, +,result,table,_time,usage_user,test,mystr,this,cpu,host,_measurement +,,0,2018-08-29T13:08:47Z,10.2,10,yay,true,cpu-total,a,cpui +`, + want: toCRLF(`,,,2018-08-29T13:08:47Z,10.2,10,yay,true,cpu-total,a,cpui + +`), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(tt.status) + fmt.Fprintln(w, tt.csv) + })) + s := &FluxQueryService{ + URL: ts.URL, + Token: tt.token, + } + res, err := s.Query(tt.ctx, tt.r) + if (err != nil) != tt.wantErr { + t.Errorf("FluxQueryService.Query() error = %v, wantErr %v", err, tt.wantErr) + return + } + if res != nil && res.Err() != nil { + t.Errorf("FluxQueryService.Query() result error = %v", res.Err()) + return + } + if tt.wantErr { + return + } + defer res.Cancel() + + enc := csv.NewMultiResultEncoder(csv.ResultEncoderConfig{ + NoHeader: true, + Delimiter: ',', + }) + b := bytes.Buffer{} + n, err := enc.Encode(&b, res) + if err != nil { + t.Errorf("FluxQueryService.Query() encode error = %v", err) + return + } + if n != int64(len(tt.want)) { + t.Errorf("FluxQueryService.Query() encode result = %d, want %d", n, len(tt.want)) + } + + got := b.String() + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("FluxQueryService.Query() =\n%s\n%s", got, tt.want) + } + }) + } +} + +var crlfPattern = regexp.MustCompile(`\r?\n`) + +func toCRLF(data string) string { + return crlfPattern.ReplaceAllString(data, "\r\n") +}