Add influx client

This adds a simple HTTP influx client to fetch time series results. It
implements the TimeSeries interface. Also included is an implementation
of the mrfusion.Results interface that wraps results returned from the
influx client.

We only support HTTP connections (there's also a UDP influx client). We
also require that all Queries to Influx be fully-qualified--they should
include the database and retention policies in the query.

This also makes Resuls a json.Marshaler. In order to hand results back
to clients, we need to guarantee that those results can be serialized to
json in some form. json.Marshaller is just such a guarantee!
pull/10616/head
Tim Raymond 2016-09-14 11:21:41 -04:00
parent 22e65011e1
commit 3116731abe
4 changed files with 103 additions and 1 deletions

37
influx/influx.go Normal file
View File

@ -0,0 +1,37 @@
package influx
import (
ixClient "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/mrfusion"
"golang.org/x/net/context"
)
// Client is a device for retrieving time series data from an InfluxDB instance
type Client struct {
ix ixClient.Client
}
// NewClient initializes a Client
func NewClient(host string) (*Client, error) {
cl, err := ixClient.NewHTTPClient(ixClient.HTTPConfig{
Addr: host,
})
if err != nil {
return nil, err
}
return &Client{
ix: cl,
}, nil
}
// Query issues a request to a configured InfluxDB instance for time series
// information specified by query. Queries must be "fully-qualified," and
// include both the database and retention policy.
func (c *Client) Query(ctx context.Context, query mrfusion.Query) (mrfusion.Response, error) {
q := ixClient.NewQuery(string(query), "", "")
resp, err := c.ix.Query(q)
return response{resp}, err
}

40
influx/influx_test.go Normal file
View File

@ -0,0 +1,40 @@
package influx_test
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/influxdata/mrfusion"
"github.com/influxdata/mrfusion/influx"
"golang.org/x/net/context"
)
func Test_MakesRequestsToQueryEndpoint(t *testing.T) {
t.Parallel()
called := false
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(http.StatusOK)
rw.Write([]byte(`{}`))
called = true
if path := r.URL.Path; path != "/query" {
t.Error("Expected the path to contain `/query` but was", path)
}
}))
defer ts.Close()
var series mrfusion.TimeSeries
series, err := influx.NewClient(ts.URL)
if err != nil {
t.Fatal("Unexpected error initializing client: err:", err)
}
_, err = series.Query(context.Background(), "show databases")
if err != nil {
t.Fatal("Expected no error but was", err)
}
if called == false {
t.Error("Expected http request to Influx but there was none")
}
}

20
influx/response.go Normal file
View File

@ -0,0 +1,20 @@
package influx
import (
"encoding/json"
"github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/mrfusion"
)
type response struct {
*client.Response
}
func (r response) MarshalJSON() ([]byte, error) {
return json.Marshal(r.Results)
}
func (r response) Results() ([]mrfusion.Result, error) {
return []mrfusion.Result{}, nil
}

View File

@ -1,6 +1,10 @@
package mrfusion
import "golang.org/x/net/context"
import (
"encoding/json"
"golang.org/x/net/context"
)
// Used to retrieve a Response from a TimeSeries.
type Query string
@ -28,6 +32,7 @@ type Result interface {
// Response is the result of a query against a TimeSeries
type Response interface {
Results() ([]Result, error)
json.Marshaler
}
// Represents a queryable time series database.