Supprt for cancelling in-flight influx requests
If a request upstream is cancelled, we can propagate the cancellation down to our queries to Influx using context's cancellation functionality. This patch adds support for this.pull/10616/head
parent
3116731abe
commit
f6ac18b692
|
@ -29,9 +29,22 @@ func NewClient(host string) (*Client, error) {
|
||||||
|
|
||||||
// Query issues a request to a configured InfluxDB instance for time series
|
// Query issues a request to a configured InfluxDB instance for time series
|
||||||
// information specified by query. Queries must be "fully-qualified," and
|
// information specified by query. Queries must be "fully-qualified," and
|
||||||
// include both the database and retention policy.
|
// include both the database and retention policy. In-flight requests can be
|
||||||
|
// cancelled using the provided context.
|
||||||
func (c *Client) Query(ctx context.Context, query mrfusion.Query) (mrfusion.Response, error) {
|
func (c *Client) Query(ctx context.Context, query mrfusion.Query) (mrfusion.Response, error) {
|
||||||
q := ixClient.NewQuery(string(query), "", "")
|
q := ixClient.NewQuery(string(query), "", "")
|
||||||
|
resps := make(chan (response))
|
||||||
|
go func() {
|
||||||
resp, err := c.ix.Query(q)
|
resp, err := c.ix.Query(q)
|
||||||
return response{resp}, err
|
resps <- response{resp, err}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case resp := <-resps:
|
||||||
|
return resp, resp.err
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
type response struct {
|
type response struct {
|
||||||
*client.Response
|
*client.Response
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r response) MarshalJSON() ([]byte, error) {
|
func (r response) MarshalJSON() ([]byte, error) {
|
||||||
|
|
Loading…
Reference in New Issue