fix(query): return statistics even in encoder error case
The ProxyQueryServiceAsyncBridge was not returning statistics when there was an encoder error. Because the encoder was just writing to an io.Writer, it was possible that a remote disconnect could happen and statistics could not be reported.pull/13444/head
parent
9d0c91f5f0
commit
d0517f288a
|
@ -100,12 +100,13 @@ func (b ProxyQueryServiceAsyncBridge) Query(ctx context.Context, w io.Writer, re
|
|||
defer results.Release()
|
||||
|
||||
encoder := req.Dialect.Encoder()
|
||||
if _, err := encoder.Encode(w, results); err != nil {
|
||||
return flux.Statistics{}, tracing.LogError(span, err)
|
||||
}
|
||||
_, err = encoder.Encode(w, results)
|
||||
// Release the results and collect the statistics regardless of the error.
|
||||
results.Release()
|
||||
|
||||
stats := results.Statistics()
|
||||
if err != nil {
|
||||
return stats, tracing.LogError(span, err)
|
||||
}
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package query_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/influxdata/flux"
|
||||
"github.com/influxdata/flux/csv"
|
||||
"github.com/influxdata/flux/execute/executetest"
|
||||
"github.com/influxdata/influxdb/query"
|
||||
"github.com/influxdata/influxdb/query/mock"
|
||||
)
|
||||
|
||||
type failWriter struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
// Write returns len(p)/2, w.Err,
|
||||
// simulating a partial write with some error.
|
||||
func (w failWriter) Write(p []byte) (int, error) {
|
||||
return len(p) / 2, w.Err
|
||||
}
|
||||
|
||||
func TestProxyQueryServiceAsyncBridge_StatsOnClientDisconnect(t *testing.T) {
|
||||
q := mock.NewQuery(nil)
|
||||
q.Metadata = flux.Metadata{
|
||||
"foo": []interface{}{"bar"},
|
||||
}
|
||||
q.SetResults(map[string]flux.Result{
|
||||
"a": executetest.NewResult([]*executetest.Table{
|
||||
{},
|
||||
}),
|
||||
})
|
||||
|
||||
expReq := &query.Request{OrganizationID: 0x1234}
|
||||
mockAsyncSvc := &mock.AsyncQueryService{
|
||||
QueryF: func(ctx context.Context, req *query.Request) (flux.Query, error) {
|
||||
if req.OrganizationID != 0x1234 {
|
||||
panic(fmt.Errorf("unexpected request: %v", req))
|
||||
}
|
||||
return q, nil
|
||||
},
|
||||
}
|
||||
|
||||
// Use an io.Writer that returns a specific error on Write.
|
||||
w := failWriter{Err: errors.New("something went wrong with the write!")}
|
||||
|
||||
bridge := query.ProxyQueryServiceAsyncBridge{
|
||||
AsyncQueryService: mockAsyncSvc,
|
||||
}
|
||||
stats, err := bridge.Query(context.Background(), w, &query.ProxyRequest{
|
||||
Request: *expReq,
|
||||
Dialect: csv.DefaultDialect(),
|
||||
})
|
||||
if !strings.Contains(err.Error(), w.Err.Error()) {
|
||||
t.Fatalf("Query should have failed with an error wrapping failWriter.Err, got %v", err)
|
||||
}
|
||||
|
||||
// Even though there was an error, the statistics should be from the mock query.
|
||||
md := stats.Metadata
|
||||
if md["foo"] == nil || len(md["foo"]) != 1 || md["foo"][0] != "bar" {
|
||||
t.Fatalf("stats were missing or had wrong metadata: exp metadata[foo]=[bar], got %v", md)
|
||||
}
|
||||
}
|
|
@ -61,7 +61,7 @@ type Query struct {
|
|||
done bool
|
||||
}
|
||||
|
||||
var _ flux.Query = &Query{}
|
||||
var _ flux.Query = (*Query)(nil)
|
||||
|
||||
// NewQuery constructs a new asynchronous query.
|
||||
func NewQuery(spec *flux.Spec) *Query {
|
||||
|
|
Loading…
Reference in New Issue