From 1d0ea8ab640e6b87e0a32a80f3ac9b909c93476b Mon Sep 17 00:00:00 2001 From: Lorenzo Affetti Date: Wed, 15 Jan 2020 16:03:40 +0100 Subject: [PATCH] feat(query): add 'Prefer: return-no-content-with-error' behavior --- http/query.go | 37 +++++++++--- http/query_handler.go | 27 +++++---- query/encode.go | 88 +++++++++++++++++++++++++--- query/encode_test.go | 129 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 251 insertions(+), 30 deletions(-) create mode 100644 query/encode_test.go diff --git a/http/query.go b/http/query.go index 0ea0e1461f..0f1712406a 100644 --- a/http/query.go +++ b/http/query.go @@ -27,8 +27,9 @@ import ( ) const ( - PreferHeaderKey = "Prefer" - PreferNoContentHeaderValue = "return-no-content" + PreferHeaderKey = "Prefer" + PreferNoContentHeaderValue = "return-no-content" + PreferNoContentWErrHeaderValue = "return-no-content-with-error" ) // QueryRequest is a flux query request. @@ -52,6 +53,12 @@ type QueryRequest struct { // To obtain a QueryRequest with no result, add the header // `Prefer: return-no-content` to the HTTP request. PreferNoContent bool + // PreferNoContentWithError is the same as above, but it forces the + // Response to contain an error if that is a Flux runtime error encoded + // in the response body. + // To obtain a QueryRequest with no result but runtime errors, + // add the header `Prefer: return-no-content-with-error` to the HTTP request. + PreferNoContentWithError bool } // QueryDialect is the formatting options for the query response. @@ -277,12 +284,19 @@ func (r QueryRequest) proxyRequest(now func() time.Time) (*query.ProxyRequest, e } else { // TODO(nathanielc): Use commentPrefix and dateTimeFormat // once they are supported. - dialect = &csv.Dialect{ - ResultEncoderConfig: csv.ResultEncoderConfig{ - NoHeader: noHeader, - Delimiter: delimiter, - Annotations: r.Dialect.Annotations, - }, + encConfig := csv.ResultEncoderConfig{ + NoHeader: noHeader, + Delimiter: delimiter, + Annotations: r.Dialect.Annotations, + } + if r.PreferNoContentWithError { + dialect = &query.NoContentWithErrorDialect{ + ResultEncoderConfig: encConfig, + } + } else { + dialect = &csv.Dialect{ + ResultEncoderConfig: encConfig, + } } } @@ -323,6 +337,8 @@ func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error qr.Dialect.Annotations = d.ResultEncoderConfig.Annotations case *query.NoContentDialect: qr.PreferNoContent = true + case *query.NoContentWithErrorDialect: + qr.PreferNoContentWithError = true default: return nil, fmt.Errorf("unsupported dialect %T", d) } @@ -356,8 +372,11 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.Organ } } - if r.Header.Get(PreferHeaderKey) == PreferNoContentHeaderValue { + switch hv := r.Header.Get(PreferHeaderKey); hv { + case PreferNoContentHeaderValue: req.PreferNoContent = true + case PreferNoContentWErrHeaderValue: + req.PreferNoContentWithError = true } req = req.WithDefaults() diff --git a/http/query_handler.go b/http/query_handler.go index f1a52c2a78..b0e41ccd7b 100644 --- a/http/query_handler.go +++ b/http/query_handler.go @@ -491,7 +491,7 @@ func (s FluxQueryService) Check(ctx context.Context) check.Response { } // GetQueryResponse runs a flux query with common parameters and returns the response from the query service. -func GetQueryResponse(addr, flux, org, token string, headers ...string) (*http.Response, error) { +func GetQueryResponse(qr *QueryRequest, addr, org, token string, headers ...string) (*http.Response, error) { if len(headers)%2 != 0 { return nil, fmt.Errorf("headers must be key value pairs") } @@ -503,18 +503,6 @@ func GetQueryResponse(addr, flux, org, token string, headers ...string) (*http.R params.Set(Org, org) u.RawQuery = params.Encode() - header := true - qr := &QueryRequest{ - Type: "flux", - Query: flux, - Dialect: QueryDialect{ - Header: &header, - Delimiter: ",", - CommentPrefix: "#", - DateTimeFormat: "RFC3339", - }, - } - var body bytes.Buffer if err := json.NewEncoder(&body).Encode(qr); err != nil { return nil, err @@ -552,7 +540,18 @@ func GetQueryResponseBody(res *http.Response) ([]byte, error) { // SimpleQuery runs a flux query with common parameters and returns CSV results. func SimpleQuery(addr, flux, org, token string, headers ...string) ([]byte, error) { - res, err := GetQueryResponse(addr, flux, org, token, headers...) + header := true + qr := &QueryRequest{ + Type: "flux", + Query: flux, + Dialect: QueryDialect{ + Header: &header, + Delimiter: ",", + CommentPrefix: "#", + DateTimeFormat: "RFC3339", + }, + } + res, err := GetQueryResponse(qr, addr, org, token, headers...) if err != nil { return nil, err } diff --git a/query/encode.go b/query/encode.go index c4fd2328e9..da9a8b0e0d 100644 --- a/query/encode.go +++ b/query/encode.go @@ -5,14 +5,23 @@ import ( "net/http" "github.com/influxdata/flux" + "github.com/influxdata/flux/csv" ) -const DialectType = "no-content" +const ( + NoContentDialectType = "no-content" + NoContentWErrDialectType = "no-content-with-error" +) -// AddDialectMappings adds the no-content dialect mapping. +// AddDialectMappings adds the no-content dialects mappings. func AddDialectMappings(mappings flux.DialectMappings) error { - return mappings.Add(DialectType, func() flux.Dialect { + if err := mappings.Add(NoContentDialectType, func() flux.Dialect { return NewNoContentDialect() + }); err != nil { + return err + } + return mappings.Add(NoContentWErrDialectType, func() flux.Dialect { + return NewNoContentWithErrorDialect() }) } @@ -31,16 +40,14 @@ func (d *NoContentDialect) Encoder() flux.MultiResultEncoder { } func (d *NoContentDialect) DialectType() flux.DialectType { - return DialectType + return NoContentDialectType } func (d *NoContentDialect) SetHeaders(w http.ResponseWriter) { w.WriteHeader(http.StatusNoContent) } -type NoContentEncoder struct { - flux.MultiResultEncoder -} +type NoContentEncoder struct{} func (e *NoContentEncoder) Encode(w io.Writer, results flux.ResultIterator) (int64, error) { defer results.Release() @@ -58,3 +65,70 @@ func (e *NoContentEncoder) Encode(w io.Writer, results flux.ResultIterator) (int // Do not write anything. return 0, nil } + +// NoContentWithErrorDialect is a dialect that provides an Encoder that discards query results, +// but it encodes runtime errors from the Flux query in CSV format. +// To discover if there was any runtime error in the query, one should check the response size. +// If it is equal to zero, then no error was present. +// Otherwise one can decode the response body to get the error. For example: +// ``` +// _, err = csv.NewResultDecoder(csv.ResultDecoderConfig{}).Decode(bytes.NewReader(res)) +// if err != nil { +// // we got some runtime error +// } +// ``` +type NoContentWithErrorDialect struct { + csv.ResultEncoderConfig +} + +func NewNoContentWithErrorDialect() *NoContentWithErrorDialect { + return &NoContentWithErrorDialect{ + ResultEncoderConfig: csv.DefaultEncoderConfig(), + } +} + +func (d *NoContentWithErrorDialect) Encoder() flux.MultiResultEncoder { + return &NoContentWithErrorEncoder{ + errorEncoder: csv.NewResultEncoder(d.ResultEncoderConfig), + } +} + +func (d *NoContentWithErrorDialect) DialectType() flux.DialectType { + return NoContentWErrDialectType +} + +func (d *NoContentWithErrorDialect) SetHeaders(w http.ResponseWriter) { + w.Header().Set("Content-Type", "text/csv; charset=utf-8") + w.Header().Set("Transfer-Encoding", "chunked") +} + +type NoContentWithErrorEncoder struct { + errorEncoder *csv.ResultEncoder +} + +func (e *NoContentWithErrorEncoder) Encode(w io.Writer, results flux.ResultIterator) (int64, error) { + // Make sure we release results. + // Remember, it is safe to call `Release` multiple times. + defer results.Release() + // Consume and discard results, but keep an eye on errors. + for results.More() { + if err := results.Next().Tables().Do(func(tbl flux.Table) error { + return tbl.Do(func(cr flux.ColReader) error { + cr.Release() + return nil + }) + }); err != nil { + // If there is an error, then encode it in the response. + if encErr := e.errorEncoder.EncodeError(w, err); encErr != nil { + return 0, encErr + } + } + } + // Now Release in order to populate the error, if present. + results.Release() + err := results.Err() + if err != nil { + return 0, e.errorEncoder.EncodeError(w, err) + } + return 0, nil +} diff --git a/query/encode_test.go b/query/encode_test.go new file mode 100644 index 0000000000..74e7019147 --- /dev/null +++ b/query/encode_test.go @@ -0,0 +1,129 @@ +package query_test + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/influxdata/flux" + "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/execute" + "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/query/mock" +) + +func TestReturnNoContent(t *testing.T) { + getMockResult := func() flux.Result { + // Some random data. + r := executetest.NewResult([]*executetest.Table{{ + KeyCols: []string{"t1"}, + ColMeta: []flux.ColMeta{ + {Label: "_time", Type: flux.TTime}, + {Label: "_value", Type: flux.TFloat}, + {Label: "t1", Type: flux.TString}, + {Label: "t2", Type: flux.TString}, + }, + Data: [][]interface{}{ + {execute.Time(0), 1.0, "a", "y"}, + {execute.Time(10), 2.0, "a", "x"}, + {execute.Time(20), 3.0, "a", "y"}, + {execute.Time(30), 4.0, "a", "x"}, + {execute.Time(40), 5.0, "a", "y"}, + }, + }}) + r.Nm = "foo" + return r + } + assertNoContent := func(t *testing.T, respBody []byte, stats flux.Statistics, reqErr error) { + if reqErr != nil { + t.Fatalf("unexpected error on query: %v", reqErr) + } + if body := string(respBody); len(body) > 0 { + t.Fatalf("response body should be empty, but was: %s", body) + } + } + + testCases := []struct { + name string + queryFn func(ctx context.Context, req *query.Request) (flux.Query, error) + dialect flux.Dialect + assertFn func(t *testing.T, respBody []byte, stats flux.Statistics, reqErr error) + }{ + { + name: "no-content - no error", + queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) { + q := mock.NewQuery() + q.SetResults(getMockResult()) + return q, nil + }, + dialect: query.NewNoContentDialect(), + assertFn: assertNoContent, + }, + { + name: "no-content - error", + queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) { + q := mock.NewQuery() + q.SetResults(getMockResult()) + q.SetErr(fmt.Errorf("I am a runtime error")) + return q, nil + }, + dialect: query.NewNoContentDialect(), + assertFn: assertNoContent, + }, + { + name: "no-content-with-error - no error", + queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) { + q := mock.NewQuery() + q.SetResults(getMockResult()) + return q, nil + }, + dialect: query.NewNoContentWithErrorDialect(), + assertFn: assertNoContent, + }, + { + name: "no-content-with-error - error", + queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) { + q := mock.NewQuery() + q.SetResults(getMockResult()) + q.SetErr(fmt.Errorf("I am a runtime error")) + return q, nil + }, + dialect: query.NewNoContentWithErrorDialect(), + assertFn: func(t *testing.T, respBody []byte, stats flux.Statistics, reqErr error) { + if reqErr != nil { + t.Fatalf("unexpected error on query: %v", reqErr) + } + if len(respBody) == 0 { + t.Fatalf("response body should not be empty, but it was") + } + _, err := csv.NewResultDecoder(csv.ResultDecoderConfig{}).Decode(bytes.NewReader(respBody)) + if err == nil { + t.Fatalf("expected error got none") + } else if diff := cmp.Diff(err.Error(), "I am a runtime error"); diff != "" { + t.Fatalf("unexpected error, -want/+got:\n\t%s", diff) + } + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + mockAsyncSvc := &mock.AsyncQueryService{ + QueryF: tc.queryFn, + } + w := bytes.NewBuffer([]byte{}) + bridge := query.ProxyQueryServiceAsyncBridge{ + AsyncQueryService: mockAsyncSvc, + } + stats, err := bridge.Query(context.Background(), w, &query.ProxyRequest{ + Request: query.Request{}, + Dialect: tc.dialect, + }) + tc.assertFn(t, w.Bytes(), stats, err) + }) + } +}