diff --git a/influx/influx.go b/influx/influx.go index 24838b8e2f..1e68d2bec5 100644 --- a/influx/influx.go +++ b/influx/influx.go @@ -29,9 +29,22 @@ func NewClient(host string) (*Client, error) { // Query issues a request to a configured InfluxDB instance for time series // information specified by query. Queries must be "fully-qualified," and -// include both the database and retention policy. +// include both the database and retention policy. In-flight requests can be +// cancelled using the provided context. func (c *Client) Query(ctx context.Context, query mrfusion.Query) (mrfusion.Response, error) { q := ixClient.NewQuery(string(query), "", "") - resp, err := c.ix.Query(q) - return response{resp}, err + resps := make(chan (response)) + go func() { + resp, err := c.ix.Query(q) + resps <- response{resp, err} + }() + + select { + case resp := <-resps: + return resp, resp.err + case <-ctx.Done(): + return nil, nil + } + + return nil, nil } diff --git a/influx/response.go b/influx/response.go index 5fc9078b6d..099c697729 100644 --- a/influx/response.go +++ b/influx/response.go @@ -9,6 +9,7 @@ import ( type response struct { *client.Response + err error } func (r response) MarshalJSON() ([]byte, error) {