Implement initial Influx Enterprise client
This is an initial implementation of the Influx Enterprise client. It incorporates introspection of a cluster to determine available data nodes and issues requests in a round-robin fasion to those nodes. Currently all nodes are assumed to be healthy, so there is no pruning of the available data node pool.pull/101/head
parent
9db72a71f0
commit
f47d2f3316
|
@ -0,0 +1,56 @@
|
|||
package enterprise
|
||||
|
||||
import (
|
||||
"container/ring"
|
||||
|
||||
"github.com/influxdata/mrfusion"
|
||||
"github.com/influxdata/mrfusion/influx"
|
||||
"github.com/influxdata/plutonium/meta/control"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
var _ mrfusion.TimeSeries = &Client{}
|
||||
|
||||
// Client is a device for retrieving time series data from an Influx Enterprise
|
||||
// cluster. It is configured using the addresses of one or more meta node URLs.
|
||||
// Data node URLs are retrieved automatically from the meta nodes and queries
|
||||
// are appropriately load balanced across the cluster.
|
||||
type Client struct {
|
||||
Ctrl interface {
|
||||
ShowCluster() (*control.Cluster, error)
|
||||
}
|
||||
|
||||
dataNodes *ring.Ring
|
||||
}
|
||||
|
||||
// NewClient initializes and returns a Client.
|
||||
func (c *Client) Open() error {
|
||||
cluster, err := c.Ctrl.ShowCluster()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c.dataNodes = ring.New(len(cluster.DataNodes))
|
||||
for _, dn := range cluster.DataNodes {
|
||||
cl, err := influx.NewClient(dn.HTTPAddr)
|
||||
if err != nil {
|
||||
continue
|
||||
} else {
|
||||
c.dataNodes.Value = cl
|
||||
c.dataNodes = c.dataNodes.Next()
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Query retrieves timeseries information pertaining to a specified query. It
|
||||
// can be cancelled by using a provided context.
|
||||
func (c *Client) Query(ctx context.Context, q mrfusion.Query) (mrfusion.Response, error) {
|
||||
cl := c.dataNodes.Next().Value.(mrfusion.TimeSeries)
|
||||
return cl.Query(ctx, q)
|
||||
}
|
||||
|
||||
// MonitoredServices returns the services monitored by this Enterprise cluster.
|
||||
func (c *Client) MonitoredServices(ctx context.Context) ([]mrfusion.MonitoredService, error) {
|
||||
return []mrfusion.MonitoredService{}, nil
|
||||
}
|
|
@ -0,0 +1,68 @@
|
|||
package enterprise_test
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/influxdata/mrfusion"
|
||||
"github.com/influxdata/mrfusion/enterprise"
|
||||
"github.com/influxdata/mrfusion/mock"
|
||||
"github.com/influxdata/plutonium/meta/control"
|
||||
)
|
||||
|
||||
func Test_Enterprise_FetchesDataNodes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctrl := &mock.ControlClient{
|
||||
Cluster: &control.Cluster{},
|
||||
}
|
||||
cl := &enterprise.Client{
|
||||
Ctrl: ctrl,
|
||||
}
|
||||
|
||||
err := cl.Open()
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while creating enterprise client. err:", err)
|
||||
}
|
||||
|
||||
if ctrl.ShowClustersCalled != true {
|
||||
t.Fatal("Expected request to meta node but none was issued")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Enterprise_IssuesQueries(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
called := false
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
called = true
|
||||
if r.URL.Path != "/query" {
|
||||
t.Fatal("Expected request to '/query' but was", r.URL.Path)
|
||||
}
|
||||
rw.Write([]byte(`{}`))
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
cl := &enterprise.Client{
|
||||
Ctrl: mock.NewMockControlClient(ts.URL),
|
||||
}
|
||||
|
||||
err := cl.Open()
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error initializing client: err:", err)
|
||||
}
|
||||
|
||||
_, err = cl.Query(context.Background(), mrfusion.Query{"show shards", "_internal", "autogen"})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while querying data node: err:", err)
|
||||
}
|
||||
|
||||
if called == false {
|
||||
t.Fatal("Expected request to data node but none was received")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
package mock
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"github.com/influxdata/plutonium/meta/control"
|
||||
)
|
||||
|
||||
type ControlClient struct {
|
||||
Cluster *control.Cluster
|
||||
ShowClustersCalled bool
|
||||
}
|
||||
|
||||
func NewMockControlClient(addr string) *ControlClient {
|
||||
_, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &ControlClient{
|
||||
Cluster: &control.Cluster{
|
||||
DataNodes: []control.DataNode{
|
||||
control.DataNode{
|
||||
HTTPAddr: addr,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *ControlClient) ShowCluster() (*control.Cluster, error) {
|
||||
cc.ShowClustersCalled = true
|
||||
return cc.Cluster, nil
|
||||
}
|
Loading…
Reference in New Issue