Support chunked queries in the Go InfluxDB client

Modify the CLI to always use chunked queries.
pull/6166/head
Jonathan A. Sternberg 2016-03-30 14:03:03 -04:00
parent d9b32ea160
commit 8752d1b1e3
4 changed files with 137 additions and 14 deletions

View File

@ -21,6 +21,7 @@
- [#5372](https://github.com/influxdata/influxdb/pull/5372): Faster shard loading
- [#6148](https://github.com/influxdata/influxdb/pull/6148): Build script is now compatible with Python 3. Added ability to create detached signatures for packages. Build script now uses Python logging facility for messages.
- [#6115](https://github.com/influxdata/influxdb/issues/6115): Support chunking query results mid-series. Limit non-chunked output.
- [#6166](https://github.com/influxdata/influxdb/pull/6166): Teach influxdb client how to use chunked queries and use in the CLI.
### Bugfixes

View File

@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
@ -32,6 +33,18 @@ const (
type Query struct {
Command string
Database string
// Chunked tells the server to send back chunked responses. This places
// less load on the server by sending back chunks of the response rather
// than waiting for the entire response all at once.
Chunked bool
// ChunkSize sets the maximum number of rows that will be returned per
// chunk. Chunks are either divided based on their series or if they hit
// the chunk size limit.
//
// Chunked must be set to true for this option to be used.
ChunkSize int
}
// ParseConnectionString will parse a string to create a valid connection URL
@ -157,6 +170,12 @@ func (c *Client) Query(q Query) (*Response, error) {
values := u.Query()
values.Set("q", q.Command)
values.Set("db", q.Database)
if q.Chunked {
values.Set("chunked", "true")
if q.ChunkSize > 0 {
values.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}
if c.precision != "" {
values.Set("epoch", c.precision)
}
@ -178,19 +197,38 @@ func (c *Client) Query(q Query) (*Response, error) {
defer resp.Body.Close()
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}
// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
if r == nil {
break
}
response.Results = append(response.Results, r.Results...)
if r.Err != nil {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
if err := dec.Decode(&response); err != nil {
// Ignore EOF errors if we got an invalid status code.
if !(err == io.EOF && resp.StatusCode != http.StatusOK) {
return nil, err
}
}
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, decErr
}
// If we don't have an error in our json response, and didn't get StatusOK, then send back an error
// If we don't have an error in our json response, and didn't get StatusOK,
// then send back an error.
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server", resp.StatusCode)
}
@ -437,7 +475,7 @@ func (r *Response) UnmarshalJSON(b []byte) error {
// Error returns the first error from any statement.
// Returns nil if no errors occurred on any statements.
func (r Response) Error() error {
func (r *Response) Error() error {
if r.Err != nil {
return r.Err
}
@ -449,6 +487,31 @@ func (r Response) Error() error {
return nil
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
}
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
dec := json.NewDecoder(r)
dec.UseNumber()
return &ChunkedResponse{dec: dec}
}
// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
return nil, err
}
return &response, nil
}
// Point defines the fields that will be written to the database
// Measurement, Time, and Fields are required
// Precision can be specified if the time is in epoch format (integer).

View File

@ -169,6 +169,30 @@ func TestClient_Query(t *testing.T) {
}
}
func TestClient_ChunkedQuery(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data client.Response
w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
_ = enc.Encode(data)
_ = enc.Encode(data)
}))
defer ts.Close()
u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
query := client.Query{Chunked: true}
_, err = c.Query(query)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
}
func TestClient_BasicAuth(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
u, p, ok := r.BasicAuth()
@ -741,3 +765,28 @@ war3JNM1mGB3o2iAtuOJlFIKLpI1x+1e8pI=
}
}
}
func TestChunkedResponse(t *testing.T) {
s := `{"results":[{},{}]}{"results":[{}]}`
r := client.NewChunkedResponse(strings.NewReader(s))
resp, err := r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if actual := len(resp.Results); actual != 2 {
t.Fatalf("unexpected number of results. expected %v, actual %v", 2, actual)
}
resp, err = r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if actual := len(resp.Results); actual != 1 {
t.Fatalf("unexpected number of results. expected %v, actual %v", 1, actual)
}
resp, err = r.NextResponse()
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
} else if resp != nil {
t.Fatalf("unexpected response. expected %v, actual %v", nil, resp)
}
}

View File

@ -56,6 +56,7 @@ type CommandLine struct {
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
Chunked bool
Quit chan struct{}
IgnoreSignals bool // Ignore signals normally caught by this process (used primarily for testing)
osSignals chan os.Signal
@ -518,9 +519,18 @@ func (c *CommandLine) Insert(stmt string) error {
return nil
}
// query creates a query struct to be used with the client.
func (c *CommandLine) query(query string, database string) client.Query {
return client.Query{
Command: query,
Database: database,
Chunked: true,
}
}
// ExecuteQuery runs any query statement
func (c *CommandLine) ExecuteQuery(query string) error {
response, err := c.Client.Query(client.Query{Command: query, Database: c.Database})
response, err := c.Client.Query(c.query(query, c.Database))
if err != nil {
fmt.Printf("ERR: %s\n", err)
return err
@ -539,7 +549,7 @@ func (c *CommandLine) ExecuteQuery(query string) error {
// DatabaseToken retrieves database token
func (c *CommandLine) DatabaseToken() (string, error) {
response, err := c.Client.Query(client.Query{Command: "SHOW DIAGNOSTICS for 'registration'"})
response, err := c.Client.Query(c.query("SHOW DIAGNOSTICS for 'registration'", ""))
if err != nil {
return "", err
}