Add template variable substitution to influx queries
parent
c2cd7887e3
commit
0902f23f96
|
@ -68,17 +68,20 @@ func (c *Client) query(u *url.URL, q chronograf.Query) (chronograf.Response, err
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
command := q.Command
|
||||||
c.Logger.
|
if len(q.TemplateVars) > 0 {
|
||||||
|
command = TemplateReplace(q.Command, q.TemplateVars)
|
||||||
|
}
|
||||||
|
logs := c.Logger.
|
||||||
WithField("component", "proxy").
|
WithField("component", "proxy").
|
||||||
WithField("host", req.Host).
|
WithField("host", req.Host).
|
||||||
WithField("command", q.Command).
|
WithField("command", command).
|
||||||
WithField("db", q.DB).
|
WithField("db", q.DB).
|
||||||
WithField("rp", q.RP).
|
WithField("rp", q.RP)
|
||||||
Debug("query")
|
logs.Debug("query")
|
||||||
|
|
||||||
params := req.URL.Query()
|
params := req.URL.Query()
|
||||||
params.Set("q", q.Command)
|
params.Set("q", command)
|
||||||
params.Set("db", q.DB)
|
params.Set("db", q.DB)
|
||||||
params.Set("rp", q.RP)
|
params.Set("rp", q.RP)
|
||||||
params.Set("epoch", "ms")
|
params.Set("epoch", "ms")
|
||||||
|
@ -111,13 +114,7 @@ func (c *Client) query(u *url.URL, q chronograf.Query) (chronograf.Response, err
|
||||||
|
|
||||||
// If we got a valid decode error, send that back
|
// If we got a valid decode error, send that back
|
||||||
if decErr != nil {
|
if decErr != nil {
|
||||||
c.Logger.
|
logs.WithField("influx_status", resp.StatusCode).
|
||||||
WithField("component", "proxy").
|
|
||||||
WithField("host", req.Host).
|
|
||||||
WithField("command", q.Command).
|
|
||||||
WithField("db", q.DB).
|
|
||||||
WithField("rp", q.RP).
|
|
||||||
WithField("influx_status", resp.StatusCode).
|
|
||||||
Error("Error parsing results from influxdb: err:", decErr)
|
Error("Error parsing results from influxdb: err:", decErr)
|
||||||
return nil, decErr
|
return nil, decErr
|
||||||
}
|
}
|
||||||
|
@ -125,12 +122,7 @@ func (c *Client) query(u *url.URL, q chronograf.Query) (chronograf.Response, err
|
||||||
// If we don't have an error in our json response, and didn't get statusOK
|
// If we don't have an error in our json response, and didn't get statusOK
|
||||||
// then send back an error
|
// then send back an error
|
||||||
if resp.StatusCode != http.StatusOK && response.Err != "" {
|
if resp.StatusCode != http.StatusOK && response.Err != "" {
|
||||||
c.Logger.
|
logs.
|
||||||
WithField("component", "proxy").
|
|
||||||
WithField("host", req.Host).
|
|
||||||
WithField("command", q.Command).
|
|
||||||
WithField("db", q.DB).
|
|
||||||
WithField("rp", q.RP).
|
|
||||||
WithField("influx_status", resp.StatusCode).
|
WithField("influx_status", resp.StatusCode).
|
||||||
Error("Received non-200 response from influxdb")
|
Error("Received non-200 response from influxdb")
|
||||||
|
|
||||||
|
|
|
@ -82,6 +82,7 @@ func Test_Influx_HTTPS_Failure(t *testing.T) {
|
||||||
func Test_Influx_HTTPS_InsecureSkipVerify(t *testing.T) {
|
func Test_Influx_HTTPS_InsecureSkipVerify(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
called := false
|
called := false
|
||||||
|
q := ""
|
||||||
ts := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
ts := httptest.NewTLSServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||||
rw.WriteHeader(http.StatusOK)
|
rw.WriteHeader(http.StatusOK)
|
||||||
rw.Write([]byte(`{}`))
|
rw.Write([]byte(`{}`))
|
||||||
|
@ -89,6 +90,8 @@ func Test_Influx_HTTPS_InsecureSkipVerify(t *testing.T) {
|
||||||
if path := r.URL.Path; path != "/query" {
|
if path := r.URL.Path; path != "/query" {
|
||||||
t.Error("Expected the path to contain `/query` but was", path)
|
t.Error("Expected the path to contain `/query` but was", path)
|
||||||
}
|
}
|
||||||
|
values := r.URL.Query()
|
||||||
|
q = values.Get("q")
|
||||||
}))
|
}))
|
||||||
defer ts.Close()
|
defer ts.Close()
|
||||||
|
|
||||||
|
@ -118,6 +121,34 @@ func Test_Influx_HTTPS_InsecureSkipVerify(t *testing.T) {
|
||||||
if called == false {
|
if called == false {
|
||||||
t.Error("Expected http request to Influx but there was none")
|
t.Error("Expected http request to Influx but there was none")
|
||||||
}
|
}
|
||||||
|
called = false
|
||||||
|
q = ""
|
||||||
|
query = chronograf.Query{
|
||||||
|
Command: "select $field from cpu",
|
||||||
|
TemplateVars: []chronograf.TemplateVar{
|
||||||
|
{
|
||||||
|
Var: "$field",
|
||||||
|
Values: []chronograf.TemplateValue{
|
||||||
|
{
|
||||||
|
Value: "usage_user",
|
||||||
|
Type: "fieldKey",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
_, err = series.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal("Expected no error but was", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if called == false {
|
||||||
|
t.Error("Expected http request to Influx but there was none")
|
||||||
|
}
|
||||||
|
|
||||||
|
if q != `select "usage_user" from cpu` {
|
||||||
|
t.Errorf("Unexpected query: %s", q)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_Influx_CancelsInFlightRequests(t *testing.T) {
|
func Test_Influx_CancelsInFlightRequests(t *testing.T) {
|
||||||
|
|
Loading…
Reference in New Issue