fix(http): keep the http response body open until the result has been read
This modifies the `MultiResultDecoder` interface to accept an `io.ReadCloser` so the `ResultIterator` can close the `io.Reader` instead of doing it through a defer call. It then makes it so the `Cancel()` method will close the reader or the reader will be automatically closed when `More()` returns false.pull/10616/head
parent
65c510ec3b
commit
911f053dd6
|
@ -156,7 +156,6 @@ func (s *QueryService) Query(ctx context.Context, orgID platform.ID, query *ifql
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
return s.processResponse(resp)
|
||||
}
|
||||
|
||||
|
@ -182,7 +181,6 @@ func (s *QueryService) QueryWithCompile(ctx context.Context, orgID platform.ID,
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
return s.processResponse(resp)
|
||||
}
|
||||
|
||||
|
|
|
@ -75,7 +75,7 @@ func (d *ResultDecoder) Decode(r io.Reader) (query.Result, error) {
|
|||
// Results are delimited by an empty line.
|
||||
type MultiResultDecoder struct {
|
||||
c ResultDecoderConfig
|
||||
r io.Reader
|
||||
r io.ReadCloser
|
||||
}
|
||||
|
||||
// NewMultiResultDecoder creates a new MultiResultDecoder.
|
||||
|
@ -88,7 +88,7 @@ func NewMultiResultDecoder(c ResultDecoderConfig) *MultiResultDecoder {
|
|||
}
|
||||
}
|
||||
|
||||
func (d *MultiResultDecoder) Decode(r io.Reader) (query.ResultIterator, error) {
|
||||
func (d *MultiResultDecoder) Decode(r io.ReadCloser) (query.ResultIterator, error) {
|
||||
return &resultIterator{
|
||||
c: d.c,
|
||||
r: r,
|
||||
|
@ -98,9 +98,11 @@ func (d *MultiResultDecoder) Decode(r io.Reader) (query.ResultIterator, error) {
|
|||
// resultIterator iterates through the results encoded in r.
|
||||
type resultIterator struct {
|
||||
c ResultDecoderConfig
|
||||
r io.Reader
|
||||
r io.ReadCloser
|
||||
next *resultDecoder
|
||||
err error
|
||||
|
||||
canceled bool
|
||||
}
|
||||
|
||||
func (r *resultIterator) More() bool {
|
||||
|
@ -110,8 +112,13 @@ func (r *resultIterator) More() bool {
|
|||
extraMeta = r.next.extraMeta
|
||||
}
|
||||
r.next, r.err = newResultDecoder(r.r, r.c, extraMeta)
|
||||
return r.err == nil
|
||||
if r.err == nil {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Release the resources for this query.
|
||||
r.Cancel()
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -120,6 +127,14 @@ func (r *resultIterator) Next() query.Result {
|
|||
}
|
||||
|
||||
func (r *resultIterator) Cancel() {
|
||||
if r.canceled {
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.r.Close(); err != nil && r.err == nil {
|
||||
r.err = err
|
||||
}
|
||||
r.canceled = true
|
||||
}
|
||||
|
||||
func (r *resultIterator) Err() error {
|
||||
|
|
|
@ -139,7 +139,7 @@ func (e *MultiResultEncoder) Encode(w io.Writer, results query.ResultIterator) e
|
|||
resp.Results = append(resp.Results, result)
|
||||
}
|
||||
|
||||
if err := results.Err(); err != nil {
|
||||
if err := results.Err(); err != nil && resp.Err == "" {
|
||||
resp.error(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -198,6 +198,7 @@ func (r *SliceResultIterator) Next() Result {
|
|||
}
|
||||
|
||||
func (r *SliceResultIterator) Cancel() {
|
||||
r.results = nil
|
||||
}
|
||||
|
||||
func (r *SliceResultIterator) Err() error {
|
||||
|
|
|
@ -124,7 +124,7 @@ type ResultEncoder interface {
|
|||
// MultiResultDecoder can decode multiple results from a reader.
|
||||
type MultiResultDecoder interface {
|
||||
// Decode decodes multiple results from r.
|
||||
Decode(r io.Reader) (ResultIterator, error)
|
||||
Decode(r io.ReadCloser) (ResultIterator, error)
|
||||
}
|
||||
|
||||
// MultiResultEncoder can encode multiple results into a writer.
|
||||
|
|
Loading…
Reference in New Issue