From 43117a94d6b7ce546832963f19c375ed57215d49 Mon Sep 17 00:00:00 2001 From: jgeiger Date: Fri, 10 Feb 2017 10:03:04 -0700 Subject: [PATCH] Add chunked processing back into v2 client - Moving the to v2 client removed this functionality. This copies code back into the client. The tests were also added back into the test suite. --- CHANGELOG.md | 1 + client/v2/client.go | 103 +++++++++++++++++++++++++++++++++++---- client/v2/client_test.go | 23 +++++++++ 3 files changed, 117 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9beb25ca5e..386ce381cf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ - [#7776](https://github.com/influxdata/influxdb/issues/7776): Add system information to /debug/vars. - [#7948](https://github.com/influxdata/influxdb/pull/7948): Reduce memory allocations by reusing gzip.Writers across requests - [#7553](https://github.com/influxdata/influxdb/issues/7553): Add modulo operator to the query language. +- [#7977](https://github.com/influxdata/influxdb/issues/7977): Add chunked request processing back into the Go client v2 ## v1.2.1 [unreleased] diff --git a/client/v2/client.go b/client/v2/client.go index 4d0ce60f0d..6a72e8d15d 100644 --- a/client/v2/client.go +++ b/client/v2/client.go @@ -7,9 +7,12 @@ import ( "encoding/json" "errors" "fmt" + "io" "io/ioutil" "net/http" "net/url" + "strconv" + "strings" "time" "github.com/influxdata/influxdb/models" @@ -405,6 +408,8 @@ type Query struct { Command string Database string Precision string + Chunked bool + ChunkSize int Parameters map[string]interface{} } @@ -491,6 +496,12 @@ func (c *client) Query(q Query) (*Response, error) { params.Set("q", q.Command) params.Set("db", q.Database) params.Set("params", string(jsonParameters)) + if q.Chunked { + params.Set("chunked", "true") + if q.ChunkSize > 0 { + params.Set("chunk_size", strconv.Itoa(q.ChunkSize)) + } + } if q.Precision != "" { params.Set("epoch", q.Precision) @@ -504,17 +515,38 @@ func (c *client) Query(q Query) (*Response, error) { defer resp.Body.Close() var response Response - dec := json.NewDecoder(resp.Body) - dec.UseNumber() - decErr := dec.Decode(&response) + if q.Chunked { + cr := NewChunkedResponse(resp.Body) + for { + r, err := cr.NextResponse() + if err != nil { + // If we got an error while decoding the response, send that back. + return nil, err + } - // ignore this error if we got an invalid status code - if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { - decErr = nil - } - // If we got a valid decode error, send that back - if decErr != nil { - return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr) + if r == nil { + break + } + + response.Results = append(response.Results, r.Results...) + if r.Err != "" { + response.Err = r.Err + break + } + } + } else { + dec := json.NewDecoder(resp.Body) + dec.UseNumber() + decErr := dec.Decode(&response) + + // ignore this error if we got an invalid status code + if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK { + decErr = nil + } + // If we got a valid decode error, send that back + if decErr != nil { + return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr) + } } // If we don't have an error in our json response, and didn't get statusOK // then send back an error @@ -524,3 +556,54 @@ func (c *client) Query(q Query) (*Response, error) { } return &response, nil } + +// duplexReader reads responses and writes it to another writer while +// satisfying the reader interface. +type duplexReader struct { + r io.Reader + w io.Writer +} + +func (r *duplexReader) Read(p []byte) (n int, err error) { + n, err = r.r.Read(p) + if err == nil { + r.w.Write(p[:n]) + } + return n, err +} + +// ChunkedResponse represents a response from the server that +// uses chunking to stream the output. +type ChunkedResponse struct { + dec *json.Decoder + duplex *duplexReader + buf bytes.Buffer +} + +// NewChunkedResponse reads a stream and produces responses from the stream. +func NewChunkedResponse(r io.Reader) *ChunkedResponse { + resp := &ChunkedResponse{} + resp.duplex = &duplexReader{r: r, w: &resp.buf} + resp.dec = json.NewDecoder(resp.duplex) + resp.dec.UseNumber() + return resp +} + +// NextResponse reads the next line of the stream and returns a response. +func (r *ChunkedResponse) NextResponse() (*Response, error) { + var response Response + + if err := r.dec.Decode(&response); err != nil { + if err == io.EOF { + return nil, nil + } + // A decoding error happened. This probably means the server crashed + // and sent a last-ditch error message to us. Ensure we have read the + // entirety of the connection to get any remaining error text. + io.Copy(ioutil.Discard, r.duplex) + return nil, errors.New(strings.TrimSpace(r.buf.String())) + } + + r.buf.Reset() + return &response, nil +} diff --git a/client/v2/client_test.go b/client/v2/client_test.go index 42d8dda05e..dfc9d1a94c 100644 --- a/client/v2/client_test.go +++ b/client/v2/client_test.go @@ -154,6 +154,29 @@ func TestClient_Query(t *testing.T) { } } +func TestClient_ChunkedQuery(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var data Response + w.WriteHeader(http.StatusOK) + enc := json.NewEncoder(w) + _ = enc.Encode(data) + _ = enc.Encode(data) + })) + defer ts.Close() + + config := HTTPConfig{Addr: ts.URL} + c, err := NewHTTPClient(config) + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } + + query := Query{Chunked: true} + _, err = c.Query(query) + if err != nil { + t.Fatalf("unexpected error. expected %v, actual %v", nil, err) + } +} + func TestClient_BoundParameters(t *testing.T) { var parameterString string ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {