Add the `QueryAsChunk` method to v2 client

pull/10258/head
miya-masa 2018-09-05 14:35:55 +09:00 committed by Jonathan A. Sternberg
parent 7d9830ab24
commit c19788030d
No known key found for this signature in database
GPG Key ID: 4A0C1200CB8B9D2E
4 changed files with 148 additions and 57 deletions

View File

@ -10,6 +10,7 @@ v1.7.0 [unreleased]
- [#9964](https://github.com/influxdata/influxdb/pull/9964): Enable the storage service by default.
- [#9996](https://github.com/influxdata/influxdb/pull/9996): Ensure read service regexes get optimised.
- [#10408](https://github.com/influxdata/influxdb/pull/10408): Add Flux support to the influx CLI command.
- [#10257](https://github.com/influxdata/influxdb/issues/10257): Add chunked query into the Go client v2.
### Bugfixes

View File

@ -78,6 +78,10 @@ type Client interface {
// the UDP client.
Query(q Query) (*Response, error)
// QueryAsChunk makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
QueryAsChunk(q Query) (*ChunkedResponse, error)
// Close releases any resources a Client may be using.
Close() error
}
@ -496,75 +500,26 @@ type Result struct {
// Query sends a command to the server and returns the Response.
func (c *client) Query(q Query) (*Response, error) {
u := c.url
u.Path = path.Join(u.Path, "query")
jsonParameters, err := json.Marshal(q.Parameters)
req, err := c.createDefaultRequest(q)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("q", q.Command)
params.Set("db", q.Database)
if q.RetentionPolicy != "" {
params.Set("rp", q.RetentionPolicy)
}
params.Set("params", string(jsonParameters))
if q.Chunked {
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
req.URL.RawQuery = params.Encode()
}
if q.Precision != "" {
params.Set("epoch", q.Precision)
}
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
// but instead some other service. If the error code is also a 500+ code, then some
// downstream loadbalancer/proxy/etc had an issue and we should report that.
if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
body, err := ioutil.ReadAll(resp.Body)
if err != nil || len(body) == 0 {
return nil, fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
}
return nil, fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
}
// If we get an unexpected content type, then it is also not from influx direct and therefore
// we want to know what we received and what status code was returned for debugging purposes.
if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
// Read up to 1kb of the body to help identify downstream errors and limit the impact of things
// like downstream serving a large file
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
if err != nil || len(body) == 0 {
return nil, fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
}
return nil, fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
if err := checkResponse(resp); err != nil {
return nil, err
}
var response Response
@ -573,6 +528,9 @@ func (c *client) Query(q Query) (*Response, error) {
for {
r, err := cr.NextResponse()
if err != nil {
if err == io.EOF {
break
}
// If we got an error while decoding the response, send that back.
return nil, err
}
@ -610,10 +568,99 @@ func (c *client) Query(q Query) (*Response, error) {
return &response, nil
}
// QueryAsChunk sends a command to the server and returns the Response.
func (c *client) QueryAsChunk(q Query) (*ChunkedResponse, error) {
req, err := c.createDefaultRequest(q)
if err != nil {
return nil, err
}
params := req.URL.Query()
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
if err := checkResponse(resp); err != nil {
return nil, err
}
return NewChunkedResponse(resp.Body), nil
}
func checkResponse(resp *http.Response) error {
// If we lack a X-Influxdb-Version header, then we didn't get a response from influxdb
// but instead some other service. If the error code is also a 500+ code, then some
// downstream loadbalancer/proxy/etc had an issue and we should report that.
if resp.Header.Get("X-Influxdb-Version") == "" && resp.StatusCode >= http.StatusInternalServerError {
body, err := ioutil.ReadAll(resp.Body)
if err != nil || len(body) == 0 {
return fmt.Errorf("received status code %d from downstream server", resp.StatusCode)
}
return fmt.Errorf("received status code %d from downstream server, with response body: %q", resp.StatusCode, body)
}
// If we get an unexpected content type, then it is also not from influx direct and therefore
// we want to know what we received and what status code was returned for debugging purposes.
if cType, _, _ := mime.ParseMediaType(resp.Header.Get("Content-Type")); cType != "application/json" {
// Read up to 1kb of the body to help identify downstream errors and limit the impact of things
// like downstream serving a large file
body, err := ioutil.ReadAll(io.LimitReader(resp.Body, 1024))
if err != nil || len(body) == 0 {
return fmt.Errorf("expected json response, got empty body, with status: %v", resp.StatusCode)
}
return fmt.Errorf("expected json response, got %q, with status: %v and response body: %q", cType, resp.StatusCode, body)
}
return nil
}
func (c *client) createDefaultRequest(q Query) (*http.Request, error) {
u := c.url
u.Path = path.Join(u.Path, "query")
jsonParameters, err := json.Marshal(q.Parameters)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("q", q.Command)
params.Set("db", q.Database)
if q.RetentionPolicy != "" {
params.Set("rp", q.RetentionPolicy)
}
params.Set("params", string(jsonParameters))
if q.Precision != "" {
params.Set("epoch", q.Precision)
}
req.URL.RawQuery = params.Encode()
return req, nil
}
// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.Reader
r io.ReadCloser
w io.Writer
}
@ -625,6 +672,11 @@ func (r *duplexReader) Read(p []byte) (n int, err error) {
return n, err
}
// Close closes the response.
func (r *duplexReader) Close() error {
return r.r.Close()
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
@ -635,8 +687,12 @@ type ChunkedResponse struct {
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
rc, ok := r.(io.ReadCloser)
if !ok {
rc = ioutil.NopCloser(r)
}
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: r, w: &resp.buf}
resp.duplex = &duplexReader{r: rc, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
@ -645,10 +701,9 @@ func NewChunkedResponse(r io.Reader) *ChunkedResponse {
// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
return nil, err
}
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
@ -660,3 +715,8 @@ func (r *ChunkedResponse) NextResponse() (*Response, error) {
r.buf.Reset()
return &response, nil
}
// Close closes the response.
func (r *ChunkedResponse) Close() error {
return r.duplex.Close()
}

View File

@ -911,3 +911,29 @@ func TestClientProxy(t *testing.T) {
t.Fatalf("no http request was received")
}
}
func TestClient_QueryAsChunk(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data Response
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Influxdb-Version", "1.3.1")
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
_ = enc.Encode(data)
_ = enc.Encode(data)
}))
defer ts.Close()
config := HTTPConfig{Addr: ts.URL}
c, err := NewHTTPClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
query := Query{Chunked: true}
resp, err := c.QueryAsChunk(query)
defer resp.Close()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
}

View File

@ -107,6 +107,10 @@ func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) QueryAsChunk(q Query) (*ChunkedResponse, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}