Merge pull request #16548 from influxdata/feat/ret-no-cont-with-err

feat(query): add 'Prefer: return-no-content-with-error' behavior
pull/16559/head
Lorenzo Affetti 2020-01-17 16:24:55 +01:00 committed by GitHub
commit ceb6598cbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 251 additions and 30 deletions

View File

@ -27,8 +27,9 @@ import (
)
const (
PreferHeaderKey = "Prefer"
PreferNoContentHeaderValue = "return-no-content"
PreferHeaderKey = "Prefer"
PreferNoContentHeaderValue = "return-no-content"
PreferNoContentWErrHeaderValue = "return-no-content-with-error"
)
// QueryRequest is a flux query request.
@ -52,6 +53,12 @@ type QueryRequest struct {
// To obtain a QueryRequest with no result, add the header
// `Prefer: return-no-content` to the HTTP request.
PreferNoContent bool
// PreferNoContentWithError is the same as above, but it forces the
// Response to contain an error if that is a Flux runtime error encoded
// in the response body.
// To obtain a QueryRequest with no result but runtime errors,
// add the header `Prefer: return-no-content-with-error` to the HTTP request.
PreferNoContentWithError bool
}
// QueryDialect is the formatting options for the query response.
@ -277,12 +284,19 @@ func (r QueryRequest) proxyRequest(now func() time.Time) (*query.ProxyRequest, e
} else {
// TODO(nathanielc): Use commentPrefix and dateTimeFormat
// once they are supported.
dialect = &csv.Dialect{
ResultEncoderConfig: csv.ResultEncoderConfig{
NoHeader: noHeader,
Delimiter: delimiter,
Annotations: r.Dialect.Annotations,
},
encConfig := csv.ResultEncoderConfig{
NoHeader: noHeader,
Delimiter: delimiter,
Annotations: r.Dialect.Annotations,
}
if r.PreferNoContentWithError {
dialect = &query.NoContentWithErrorDialect{
ResultEncoderConfig: encConfig,
}
} else {
dialect = &csv.Dialect{
ResultEncoderConfig: encConfig,
}
}
}
@ -323,6 +337,8 @@ func QueryRequestFromProxyRequest(req *query.ProxyRequest) (*QueryRequest, error
qr.Dialect.Annotations = d.ResultEncoderConfig.Annotations
case *query.NoContentDialect:
qr.PreferNoContent = true
case *query.NoContentWithErrorDialect:
qr.PreferNoContentWithError = true
default:
return nil, fmt.Errorf("unsupported dialect %T", d)
}
@ -356,8 +372,11 @@ func decodeQueryRequest(ctx context.Context, r *http.Request, svc influxdb.Organ
}
}
if r.Header.Get(PreferHeaderKey) == PreferNoContentHeaderValue {
switch hv := r.Header.Get(PreferHeaderKey); hv {
case PreferNoContentHeaderValue:
req.PreferNoContent = true
case PreferNoContentWErrHeaderValue:
req.PreferNoContentWithError = true
}
req = req.WithDefaults()

View File

@ -491,7 +491,7 @@ func (s FluxQueryService) Check(ctx context.Context) check.Response {
}
// GetQueryResponse runs a flux query with common parameters and returns the response from the query service.
func GetQueryResponse(addr, flux, org, token string, headers ...string) (*http.Response, error) {
func GetQueryResponse(qr *QueryRequest, addr, org, token string, headers ...string) (*http.Response, error) {
if len(headers)%2 != 0 {
return nil, fmt.Errorf("headers must be key value pairs")
}
@ -503,18 +503,6 @@ func GetQueryResponse(addr, flux, org, token string, headers ...string) (*http.R
params.Set(Org, org)
u.RawQuery = params.Encode()
header := true
qr := &QueryRequest{
Type: "flux",
Query: flux,
Dialect: QueryDialect{
Header: &header,
Delimiter: ",",
CommentPrefix: "#",
DateTimeFormat: "RFC3339",
},
}
var body bytes.Buffer
if err := json.NewEncoder(&body).Encode(qr); err != nil {
return nil, err
@ -552,7 +540,18 @@ func GetQueryResponseBody(res *http.Response) ([]byte, error) {
// SimpleQuery runs a flux query with common parameters and returns CSV results.
func SimpleQuery(addr, flux, org, token string, headers ...string) ([]byte, error) {
res, err := GetQueryResponse(addr, flux, org, token, headers...)
header := true
qr := &QueryRequest{
Type: "flux",
Query: flux,
Dialect: QueryDialect{
Header: &header,
Delimiter: ",",
CommentPrefix: "#",
DateTimeFormat: "RFC3339",
},
}
res, err := GetQueryResponse(qr, addr, org, token, headers...)
if err != nil {
return nil, err
}

View File

@ -5,14 +5,23 @@ import (
"net/http"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
)
const DialectType = "no-content"
const (
NoContentDialectType = "no-content"
NoContentWErrDialectType = "no-content-with-error"
)
// AddDialectMappings adds the no-content dialect mapping.
// AddDialectMappings adds the no-content dialects mappings.
func AddDialectMappings(mappings flux.DialectMappings) error {
return mappings.Add(DialectType, func() flux.Dialect {
if err := mappings.Add(NoContentDialectType, func() flux.Dialect {
return NewNoContentDialect()
}); err != nil {
return err
}
return mappings.Add(NoContentWErrDialectType, func() flux.Dialect {
return NewNoContentWithErrorDialect()
})
}
@ -31,16 +40,14 @@ func (d *NoContentDialect) Encoder() flux.MultiResultEncoder {
}
func (d *NoContentDialect) DialectType() flux.DialectType {
return DialectType
return NoContentDialectType
}
func (d *NoContentDialect) SetHeaders(w http.ResponseWriter) {
w.WriteHeader(http.StatusNoContent)
}
type NoContentEncoder struct {
flux.MultiResultEncoder
}
type NoContentEncoder struct{}
func (e *NoContentEncoder) Encode(w io.Writer, results flux.ResultIterator) (int64, error) {
defer results.Release()
@ -58,3 +65,70 @@ func (e *NoContentEncoder) Encode(w io.Writer, results flux.ResultIterator) (int
// Do not write anything.
return 0, nil
}
// NoContentWithErrorDialect is a dialect that provides an Encoder that discards query results,
// but it encodes runtime errors from the Flux query in CSV format.
// To discover if there was any runtime error in the query, one should check the response size.
// If it is equal to zero, then no error was present.
// Otherwise one can decode the response body to get the error. For example:
// ```
// _, err = csv.NewResultDecoder(csv.ResultDecoderConfig{}).Decode(bytes.NewReader(res))
// if err != nil {
// // we got some runtime error
// }
// ```
type NoContentWithErrorDialect struct {
csv.ResultEncoderConfig
}
func NewNoContentWithErrorDialect() *NoContentWithErrorDialect {
return &NoContentWithErrorDialect{
ResultEncoderConfig: csv.DefaultEncoderConfig(),
}
}
func (d *NoContentWithErrorDialect) Encoder() flux.MultiResultEncoder {
return &NoContentWithErrorEncoder{
errorEncoder: csv.NewResultEncoder(d.ResultEncoderConfig),
}
}
func (d *NoContentWithErrorDialect) DialectType() flux.DialectType {
return NoContentWErrDialectType
}
func (d *NoContentWithErrorDialect) SetHeaders(w http.ResponseWriter) {
w.Header().Set("Content-Type", "text/csv; charset=utf-8")
w.Header().Set("Transfer-Encoding", "chunked")
}
type NoContentWithErrorEncoder struct {
errorEncoder *csv.ResultEncoder
}
func (e *NoContentWithErrorEncoder) Encode(w io.Writer, results flux.ResultIterator) (int64, error) {
// Make sure we release results.
// Remember, it is safe to call `Release` multiple times.
defer results.Release()
// Consume and discard results, but keep an eye on errors.
for results.More() {
if err := results.Next().Tables().Do(func(tbl flux.Table) error {
return tbl.Do(func(cr flux.ColReader) error {
cr.Release()
return nil
})
}); err != nil {
// If there is an error, then encode it in the response.
if encErr := e.errorEncoder.EncodeError(w, err); encErr != nil {
return 0, encErr
}
}
}
// Now Release in order to populate the error, if present.
results.Release()
err := results.Err()
if err != nil {
return 0, e.errorEncoder.EncodeError(w, err)
}
return 0, nil
}

129
query/encode_test.go Normal file
View File

@ -0,0 +1,129 @@
package query_test
import (
"bytes"
"context"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/influxdata/flux"
"github.com/influxdata/flux/csv"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/execute/executetest"
"github.com/influxdata/influxdb/query"
"github.com/influxdata/influxdb/query/mock"
)
func TestReturnNoContent(t *testing.T) {
getMockResult := func() flux.Result {
// Some random data.
r := executetest.NewResult([]*executetest.Table{{
KeyCols: []string{"t1"},
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TFloat},
{Label: "t1", Type: flux.TString},
{Label: "t2", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(0), 1.0, "a", "y"},
{execute.Time(10), 2.0, "a", "x"},
{execute.Time(20), 3.0, "a", "y"},
{execute.Time(30), 4.0, "a", "x"},
{execute.Time(40), 5.0, "a", "y"},
},
}})
r.Nm = "foo"
return r
}
assertNoContent := func(t *testing.T, respBody []byte, stats flux.Statistics, reqErr error) {
if reqErr != nil {
t.Fatalf("unexpected error on query: %v", reqErr)
}
if body := string(respBody); len(body) > 0 {
t.Fatalf("response body should be empty, but was: %s", body)
}
}
testCases := []struct {
name string
queryFn func(ctx context.Context, req *query.Request) (flux.Query, error)
dialect flux.Dialect
assertFn func(t *testing.T, respBody []byte, stats flux.Statistics, reqErr error)
}{
{
name: "no-content - no error",
queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) {
q := mock.NewQuery()
q.SetResults(getMockResult())
return q, nil
},
dialect: query.NewNoContentDialect(),
assertFn: assertNoContent,
},
{
name: "no-content - error",
queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) {
q := mock.NewQuery()
q.SetResults(getMockResult())
q.SetErr(fmt.Errorf("I am a runtime error"))
return q, nil
},
dialect: query.NewNoContentDialect(),
assertFn: assertNoContent,
},
{
name: "no-content-with-error - no error",
queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) {
q := mock.NewQuery()
q.SetResults(getMockResult())
return q, nil
},
dialect: query.NewNoContentWithErrorDialect(),
assertFn: assertNoContent,
},
{
name: "no-content-with-error - error",
queryFn: func(ctx context.Context, req *query.Request) (flux.Query, error) {
q := mock.NewQuery()
q.SetResults(getMockResult())
q.SetErr(fmt.Errorf("I am a runtime error"))
return q, nil
},
dialect: query.NewNoContentWithErrorDialect(),
assertFn: func(t *testing.T, respBody []byte, stats flux.Statistics, reqErr error) {
if reqErr != nil {
t.Fatalf("unexpected error on query: %v", reqErr)
}
if len(respBody) == 0 {
t.Fatalf("response body should not be empty, but it was")
}
_, err := csv.NewResultDecoder(csv.ResultDecoderConfig{}).Decode(bytes.NewReader(respBody))
if err == nil {
t.Fatalf("expected error got none")
} else if diff := cmp.Diff(err.Error(), "I am a runtime error"); diff != "" {
t.Fatalf("unexpected error, -want/+got:\n\t%s", diff)
}
},
},
}
for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
mockAsyncSvc := &mock.AsyncQueryService{
QueryF: tc.queryFn,
}
w := bytes.NewBuffer([]byte{})
bridge := query.ProxyQueryServiceAsyncBridge{
AsyncQueryService: mockAsyncSvc,
}
stats, err := bridge.Query(context.Background(), w, &query.ProxyRequest{
Request: query.Request{},
Dialect: tc.dialect,
})
tc.assertFn(t, w.Bytes(), stats, err)
})
}
}