Update Enterprise Client
When this was originally developed, many exisiting patterns in the application weren't established yet. This adds support for the chronograf.Logger as well as removes some cruft that just isn't used anymore. Interfaces have also been updated to align with what they are today.pull/10616/head
parent
9459f731e7
commit
6e8a0f9309
|
@ -5,13 +5,13 @@ import (
|
|||
"net/url"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/mrfusion"
|
||||
"github.com/influxdata/mrfusion/influx"
|
||||
"github.com/influxdata/plutonium/meta/control"
|
||||
"golang.org/x/net/context"
|
||||
"context"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
"github.com/influxdata/chronograf/influx"
|
||||
)
|
||||
|
||||
var _ mrfusion.TimeSeries = &Client{}
|
||||
var _ chronograf.TimeSeries = &Client{}
|
||||
|
||||
// Client is a device for retrieving time series data from an Influx Enterprise
|
||||
// cluster. It is configured using the addresses of one or more meta node URLs.
|
||||
|
@ -19,15 +19,16 @@ var _ mrfusion.TimeSeries = &Client{}
|
|||
// are appropriately load balanced across the cluster.
|
||||
type Client struct {
|
||||
Ctrl interface {
|
||||
ShowCluster() (*control.Cluster, error)
|
||||
ShowCluster() (*Cluster, error)
|
||||
}
|
||||
Logger chronograf.Logger
|
||||
|
||||
dataNodes *ring.Ring
|
||||
opened bool
|
||||
}
|
||||
|
||||
// NewClientWithTimeSeries initializes a Client with a known set of TimeSeries.
|
||||
func NewClientWithTimeSeries(series ...mrfusion.TimeSeries) *Client {
|
||||
func NewClientWithTimeSeries(lg chronograf.Logger, series ...chronograf.TimeSeries) *Client {
|
||||
c := &Client{}
|
||||
|
||||
c.dataNodes = ring.New(len(series))
|
||||
|
@ -44,19 +45,20 @@ func NewClientWithTimeSeries(series ...mrfusion.TimeSeries) *Client {
|
|||
// Acceptable URLs include host:port combinations as well as scheme://host:port
|
||||
// varieties. TLS is used when the URL contains "https" or when the TLS
|
||||
// parameter is set. The latter option is provided for host:port combinations
|
||||
func NewClientWithURL(mu string, tls bool) (*Client, error) {
|
||||
func NewClientWithURL(mu string, tls bool, lg chronograf.Logger) (*Client, error) {
|
||||
metaURL, err := parseMetaURL(mu, tls)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Client{
|
||||
Ctrl: control.NewClient(metaURL.Host),
|
||||
Ctrl: &MetaClient{metaURL.Host},
|
||||
Logger: lg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Open prepares a Client to process queries. It must be called prior to calling Query
|
||||
func (c *Client) Open() error {
|
||||
// Connect prepares a Client to process queries. It must be called prior to calling Query
|
||||
func (c *Client) Connect(ctx context.Context, src *chronograf.Source) error {
|
||||
c.opened = true
|
||||
// return early if we already have dataNodes
|
||||
if c.dataNodes != nil {
|
||||
|
@ -69,7 +71,7 @@ func (c *Client) Open() error {
|
|||
|
||||
c.dataNodes = ring.New(len(cluster.DataNodes))
|
||||
for _, dn := range cluster.DataNodes {
|
||||
cl, err := influx.NewClient(dn.HTTPAddr)
|
||||
cl, err := influx.NewClient(dn.HTTPAddr, c.Logger)
|
||||
if err != nil {
|
||||
continue
|
||||
} else {
|
||||
|
@ -82,22 +84,17 @@ func (c *Client) Open() error {
|
|||
|
||||
// Query retrieves timeseries information pertaining to a specified query. It
|
||||
// can be cancelled by using a provided context.
|
||||
func (c *Client) Query(ctx context.Context, q mrfusion.Query) (mrfusion.Response, error) {
|
||||
func (c *Client) Query(ctx context.Context, q chronograf.Query) (chronograf.Response, error) {
|
||||
if !c.opened {
|
||||
return nil, mrfusion.ErrUninitialized
|
||||
return nil, chronograf.ErrUninitialized
|
||||
}
|
||||
return c.nextDataNode().Query(ctx, q)
|
||||
}
|
||||
|
||||
// MonitoredServices returns the services monitored by this Enterprise cluster.
|
||||
func (c *Client) MonitoredServices(ctx context.Context) ([]mrfusion.MonitoredService, error) {
|
||||
return []mrfusion.MonitoredService{}, nil
|
||||
}
|
||||
|
||||
// nextDataNode retrieves the next available data node
|
||||
func (c *Client) nextDataNode() mrfusion.TimeSeries {
|
||||
func (c *Client) nextDataNode() chronograf.TimeSeries {
|
||||
c.dataNodes = c.dataNodes.Next()
|
||||
return c.dataNodes.Value.(mrfusion.TimeSeries)
|
||||
return c.dataNodes.Value.(chronograf.TimeSeries)
|
||||
}
|
||||
|
||||
// parseMetaURL constructs a url from either a host:port combination or a
|
||||
|
|
|
@ -7,23 +7,23 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/influxdata/mrfusion"
|
||||
"github.com/influxdata/mrfusion/enterprise"
|
||||
"github.com/influxdata/mrfusion/mock"
|
||||
"github.com/influxdata/plutonium/meta/control"
|
||||
"github.com/influxdata/chronograf"
|
||||
"github.com/influxdata/chronograf/enterprise"
|
||||
"github.com/influxdata/chronograf/log"
|
||||
)
|
||||
|
||||
func Test_Enterprise_FetchesDataNodes(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ctrl := &mock.ControlClient{
|
||||
Cluster: &control.Cluster{},
|
||||
ctrl := &ControlClient{
|
||||
Cluster: &enterprise.Cluster{},
|
||||
}
|
||||
cl := &enterprise.Client{
|
||||
Ctrl: ctrl,
|
||||
}
|
||||
|
||||
err := cl.Open()
|
||||
bg := context.Background()
|
||||
err := cl.Connect(bg, &chronograf.Source{})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while creating enterprise client. err:", err)
|
||||
|
@ -48,15 +48,16 @@ func Test_Enterprise_IssuesQueries(t *testing.T) {
|
|||
defer ts.Close()
|
||||
|
||||
cl := &enterprise.Client{
|
||||
Ctrl: mock.NewMockControlClient(ts.URL),
|
||||
Ctrl: NewMockControlClient(ts.URL),
|
||||
Logger: log.New(log.DebugLevel),
|
||||
}
|
||||
|
||||
err := cl.Open()
|
||||
err := cl.Connect(context.Background(), &chronograf.Source{})
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error initializing client: err:", err)
|
||||
}
|
||||
|
||||
_, err = cl.Query(context.Background(), mrfusion.Query{"show shards", "_internal", "autogen"})
|
||||
_, err = cl.Query(context.Background(), chronograf.Query{Command: "show shards", DB: "_internal", RP: "autogen"})
|
||||
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while querying data node: err:", err)
|
||||
|
@ -68,21 +69,21 @@ func Test_Enterprise_IssuesQueries(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Enterprise_AdvancesDataNodes(t *testing.T) {
|
||||
m1 := mock.NewTimeSeries([]string{"http://host-1.example.com:8086"}, &mock.Response{})
|
||||
m2 := mock.NewTimeSeries([]string{"http://host-2.example.com:8086"}, &mock.Response{})
|
||||
cl := enterprise.NewClientWithTimeSeries(mrfusion.TimeSeries(m1), mrfusion.TimeSeries(m2))
|
||||
m1 := NewMockTimeSeries("http://host-1.example.com:8086")
|
||||
m2 := NewMockTimeSeries("http://host-2.example.com:8086")
|
||||
cl := enterprise.NewClientWithTimeSeries(log.New(log.DebugLevel), chronograf.TimeSeries(m1), chronograf.TimeSeries(m2))
|
||||
|
||||
err := cl.Open()
|
||||
err := cl.Connect(context.Background(), &chronograf.Source{})
|
||||
if err != nil {
|
||||
t.Error("Unexpected error while initializing client: err:", err)
|
||||
}
|
||||
|
||||
_, err = cl.Query(context.Background(), mrfusion.Query{"show shards", "_internal", "autogen"})
|
||||
_, err = cl.Query(context.Background(), chronograf.Query{Command: "show shards", DB: "_internal", RP: "autogen"})
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while issuing query: err:", err)
|
||||
}
|
||||
|
||||
_, err = cl.Query(context.Background(), mrfusion.Query{"show shards", "_internal", "autogen"})
|
||||
_, err = cl.Query(context.Background(), chronograf.Query{Command: "show shards", DB: "_internal", RP: "autogen"})
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while issuing query: err:", err)
|
||||
}
|
||||
|
@ -113,7 +114,7 @@ func Test_Enterprise_NewClientWithURL(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, testURL := range urls {
|
||||
_, err := enterprise.NewClientWithURL(testURL.url, testURL.tls)
|
||||
_, err := enterprise.NewClientWithURL(testURL.url, testURL.tls, log.New(log.DebugLevel))
|
||||
if err != nil && !testURL.shouldErr {
|
||||
t.Errorf("Unexpected error creating Client with URL %s and TLS preference %t. err: %s", testURL.url, testURL.tls, err.Error())
|
||||
} else if err == nil && testURL.shouldErr {
|
||||
|
@ -123,10 +124,10 @@ func Test_Enterprise_NewClientWithURL(t *testing.T) {
|
|||
}
|
||||
|
||||
func Test_Enterprise_ComplainsIfNotOpened(t *testing.T) {
|
||||
m1 := mock.NewTimeSeries([]string{"http://host-1.example.com:8086"}, &mock.Response{})
|
||||
cl := enterprise.NewClientWithTimeSeries(mrfusion.TimeSeries(m1))
|
||||
_, err := cl.Query(context.Background(), mrfusion.Query{"show shards", "_internal", "autogen"})
|
||||
if err != mrfusion.ErrUninitialized {
|
||||
m1 := NewMockTimeSeries("http://host-1.example.com:8086")
|
||||
cl := enterprise.NewClientWithTimeSeries(log.New(log.DebugLevel), chronograf.TimeSeries(m1))
|
||||
_, err := cl.Query(context.Background(), chronograf.Query{Command: "show shards", DB: "_internal", RP: "autogen"})
|
||||
if err != chronograf.ErrUninitialized {
|
||||
t.Error("Expected ErrUnitialized, but was this err:", err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,36 @@
|
|||
package enterprise
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type MetaClient struct {
|
||||
MetaHostPort string
|
||||
}
|
||||
|
||||
func (t *MetaClient) ShowCluster() (*Cluster, error) {
|
||||
u := &url.URL{}
|
||||
u.Host = t.MetaHostPort
|
||||
u.Path = "/show-cluster"
|
||||
|
||||
req, err := http.NewRequest("GET", u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
dec := json.NewDecoder(res.Body)
|
||||
out := &Cluster{}
|
||||
err = dec.Decode(out)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
package enterprise_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"net/url"
|
||||
|
||||
"github.com/influxdata/chronograf"
|
||||
"github.com/influxdata/chronograf/enterprise"
|
||||
)
|
||||
|
||||
type ControlClient struct {
|
||||
Cluster *enterprise.Cluster
|
||||
ShowClustersCalled bool
|
||||
}
|
||||
|
||||
func NewMockControlClient(addr string) *ControlClient {
|
||||
_, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &ControlClient{
|
||||
Cluster: &enterprise.Cluster{
|
||||
DataNodes: []enterprise.DataNode{
|
||||
enterprise.DataNode{
|
||||
HTTPAddr: addr,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *ControlClient) ShowCluster() (*enterprise.Cluster, error) {
|
||||
cc.ShowClustersCalled = true
|
||||
return cc.Cluster, nil
|
||||
}
|
||||
|
||||
type TimeSeries struct {
|
||||
URLs []string
|
||||
Response Response
|
||||
|
||||
QueryCtr int
|
||||
}
|
||||
|
||||
type Response struct{}
|
||||
|
||||
func (r *Response) MarshalJSON() ([]byte, error) {
|
||||
return json.Marshal(r)
|
||||
}
|
||||
|
||||
func (ts *TimeSeries) Query(ctx context.Context, q chronograf.Query) (chronograf.Response, error) {
|
||||
ts.QueryCtr++
|
||||
return &Response{}, nil
|
||||
}
|
||||
|
||||
func (ts *TimeSeries) Connect(ctx context.Context, src *chronograf.Source) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewMockTimeSeries(urls ...string) *TimeSeries {
|
||||
return &TimeSeries{
|
||||
URLs: urls,
|
||||
Response: Response{},
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
package enterprise
|
||||
|
||||
// Cluster is a collection of data nodes and non-data nodes within a
|
||||
// Plutonium cluster.
|
||||
type Cluster struct {
|
||||
DataNodes []DataNode `json:"data"`
|
||||
MetaNodes []Node `json:"meta"`
|
||||
}
|
||||
|
||||
type DataNode struct {
|
||||
ID uint64 `json:"id"` // Meta store ID.
|
||||
TCPAddr string `json:"tcpAddr"` // RPC addr, e.g., host:8088.
|
||||
HTTPAddr string `json:"httpAddr"` // Client addr, e.g., host:8086.
|
||||
HTTPScheme string `json:"httpScheme"` // "http" or "https" for HTTP addr.
|
||||
Status string `json:"status,omitempty"` // The cluster status of the node.
|
||||
}
|
||||
|
||||
type Node struct {
|
||||
ID uint64 `json:"id"`
|
||||
Addr string `json:"addr"`
|
||||
HTTPScheme string `json:"httpScheme"`
|
||||
TCPAddr string `json:"tcpAddr"`
|
||||
}
|
|
@ -1,34 +0,0 @@
|
|||
package mock
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
|
||||
"github.com/influxdata/plutonium/meta/control"
|
||||
)
|
||||
|
||||
type ControlClient struct {
|
||||
Cluster *control.Cluster
|
||||
ShowClustersCalled bool
|
||||
}
|
||||
|
||||
func NewMockControlClient(addr string) *ControlClient {
|
||||
_, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return &ControlClient{
|
||||
Cluster: &control.Cluster{
|
||||
DataNodes: []control.DataNode{
|
||||
control.DataNode{
|
||||
HTTPAddr: addr,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *ControlClient) ShowCluster() (*control.Cluster, error) {
|
||||
cc.ShowClustersCalled = true
|
||||
return cc.Cluster, nil
|
||||
}
|
Loading…
Reference in New Issue