diff --git a/query/bridges.go b/query/bridges.go index 7af58182b9..635a036e91 100644 --- a/query/bridges.go +++ b/query/bridges.go @@ -100,12 +100,13 @@ func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, re defer results.Release() encoder := req.Dialect.Encoder() - if _, err := encoder.Encode(w, results); err != nil { - return flux.Statistics{}, tracing.LogError(span, err) - } + _, err = encoder.Encode(w, results) + // Release the results and collect the statistics regardless of the error. results.Release() - stats := results.Statistics() + if err != nil { + return stats, tracing.LogError(span, err) + } return stats, nil } diff --git a/query/bridges_test.go b/query/bridges_test.go new file mode 100644 index 0000000000..514c50d7c1 --- /dev/null +++ b/query/bridges_test.go @@ -0,0 +1,67 @@ +package query_test + +import ( + "context" + "errors" + "fmt" + "strings" + "testing" + + "github.com/influxdata/flux" + "github.com/influxdata/flux/csv" + "github.com/influxdata/flux/execute/executetest" + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxdb/query/mock" +) + +type failWriter struct { + Err error +} + +// Write returns len(p)/2, w.Err, +// simulating a partial write with some error. +func (w failWriter) Write(p []byte) (int, error) { + return len(p) / 2, w.Err +} + +func TestProxyQueryServiceAsyncBridge_StatsOnClientDisconnect(t *testing.T) { + q := mock.NewQuery(nil) + q.Metadata = flux.Metadata{ + "foo": []interface{}{"bar"}, + } + q.SetResults(map[string]flux.Result{ + "a": executetest.NewResult([]*executetest.Table{ + {}, + }), + }) + + expReq := &query.Request{OrganizationID: 0x1234} + mockAsyncSvc := &mock.AsyncQueryService{ + QueryF: func(ctx context.Context, req *query.Request) (flux.Query, error) { + if req.OrganizationID != 0x1234 { + panic(fmt.Errorf("unexpected request: %v", req)) + } + return q, nil + }, + } + + // Use an io.Writer that returns a specific error on Write. + w := failWriter{Err: errors.New("something went wrong with the write!")} + + bridge := query.ProxyQueryServiceAsyncBridge{ + AsyncQueryService: mockAsyncSvc, + } + stats, err := bridge.Query(context.Background(), w, &query.ProxyRequest{ + Request: *expReq, + Dialect: csv.DefaultDialect(), + }) + if !strings.Contains(err.Error(), w.Err.Error()) { + t.Fatalf("Query should have failed with an error wrapping failWriter.Err, got %v", err) + } + + // Even though there was an error, the statistics should be from the mock query. + md := stats.Metadata + if md["foo"] == nil || len(md["foo"]) != 1 || md["foo"][0] != "bar" { + t.Fatalf("stats were missing or had wrong metadata: exp metadata[foo]=[bar], got %v", md) + } +} diff --git a/query/mock/service.go b/query/mock/service.go index d840f0a5b2..6cc7e4b40c 100644 --- a/query/mock/service.go +++ b/query/mock/service.go @@ -61,7 +61,7 @@ type Query struct { done bool } -var _ flux.Query = &Query{} +var _ flux.Query = (*Query)(nil) // NewQuery constructs a new asynchronous query. func NewQuery(spec *flux.Spec) *Query {