Add retention policy query parameter to queries

pull/9753/head
Jonathan A. Sternberg 2018-04-24 11:21:12 -05:00
parent 1bf0d16dbf
commit 373506e335
4 changed files with 89 additions and 6 deletions

View File

@ -38,6 +38,10 @@ type Query struct {
Command string Command string
Database string Database string
// RetentionPolicy tells the server which retention policy to use by default.
// This option is only effective when querying a server of version 1.6.0 or later.
RetentionPolicy string
// Chunked tells the server to send back chunked responses. This places // Chunked tells the server to send back chunked responses. This places
// less load on the server by sending back chunks of the response rather // less load on the server by sending back chunks of the response rather
// than waiting for the entire response all at once. // than waiting for the entire response all at once.
@ -208,6 +212,9 @@ func (c *Client) QueryContext(ctx context.Context, q Query) (*Response, error) {
values := u.Query() values := u.Query()
values.Set("q", q.Command) values.Set("q", q.Command)
values.Set("db", q.Database) values.Set("db", q.Database)
if q.RetentionPolicy != "" {
values.Set("rp", q.RetentionPolicy)
}
if q.Chunked { if q.Chunked {
values.Set("chunked", "true") values.Set("chunked", "true")
if q.ChunkSize > 0 { if q.ChunkSize > 0 {

View File

@ -170,6 +170,38 @@ func TestClient_Query(t *testing.T) {
} }
} }
func TestClient_Query_RP(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
if got, exp := params.Get("db"), "db0"; got != exp {
t.Errorf("unexpected db query parameter: %s != %s", exp, got)
}
if got, exp := params.Get("rp"), "rp0"; got != exp {
t.Errorf("unexpected rp query parameter: %s != %s", exp, got)
}
var data client.Response
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(data)
}))
defer ts.Close()
u, _ := url.Parse(ts.URL)
config := client.Config{URL: *u}
c, err := client.NewClient(config)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
query := client.Query{
Database: "db0",
RetentionPolicy: "rp0",
}
_, err = c.Query(query)
if err != nil {
t.Fatalf("unexpected error. expected %v, actual %v", nil, err)
}
}
func TestClient_ChunkedQuery(t *testing.T) { func TestClient_ChunkedQuery(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var data client.Response var data client.Response

View File

@ -415,6 +415,7 @@ func (c *client) Write(bp BatchPoints) error {
type Query struct { type Query struct {
Command string Command string
Database string Database string
RetentionPolicy string
Precision string Precision string
Chunked bool Chunked bool
ChunkSize int ChunkSize int
@ -432,6 +433,19 @@ func NewQuery(command, database, precision string) Query {
} }
} }
// NewQueryWithRP returns a query object.
// The database, retention policy, and precision arguments can be empty strings if they are not needed
// for the query. Setting the retention policy only works on InfluxDB versions 1.6 or greater.
func NewQueryWithRP(command, database, retentionPolicy, precision string) Query {
return Query{
Command: command,
Database: database,
RetentionPolicy: retentionPolicy,
Precision: precision,
Parameters: make(map[string]interface{}),
}
}
// NewQueryWithParameters returns a query object. // NewQueryWithParameters returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query. // The database and precision arguments can be empty strings if they are not needed for the query.
// parameters is a map of the parameter names used in the command to their values. // parameters is a map of the parameter names used in the command to their values.
@ -503,6 +517,9 @@ func (c *client) Query(q Query) (*Response, error) {
params := req.URL.Query() params := req.URL.Query()
params.Set("q", q.Command) params.Set("q", q.Command)
params.Set("db", q.Database) params.Set("db", q.Database)
if q.RetentionPolicy != "" {
params.Set("rp", q.RetentionPolicy)
}
params.Set("params", string(jsonParameters)) params.Set("params", string(jsonParameters))
if q.Chunked { if q.Chunked {
params.Set("chunked", "true") params.Set("chunked", "true")

View File

@ -158,6 +158,33 @@ func TestClient_Query(t *testing.T) {
} }
} }
func TestClient_QueryWithRP(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
params := r.URL.Query()
if got, exp := params.Get("db"), "db0"; got != exp {
t.Errorf("unexpected db query parameter: %s != %s", exp, got)
}
if got, exp := params.Get("rp"), "rp0"; got != exp {
t.Errorf("unexpected rp query parameter: %s != %s", exp, got)
}
var data Response
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_ = json.NewEncoder(w).Encode(data)
}))
defer ts.Close()
config := HTTPConfig{Addr: ts.URL}
c, _ := NewHTTPClient(config)
defer c.Close()
query := NewQueryWithRP("SELECT * FROM m0", "db0", "rp0", "")
_, err := c.Query(query)
if err != nil {
t.Errorf("unexpected error. expected %v, actual %v", nil, err)
}
}
func TestClientDownstream500WithBody_Query(t *testing.T) { func TestClientDownstream500WithBody_Query(t *testing.T) {
const err500page = `<html> const err500page = `<html>
<head> <head>