Fix issue where data nodes would not advance
container/ring wasn't being used correctly, so all requests were directed to the first data node in the ring. This ensures (via test) that queries are properly distributed among available data nodes.pull/10616/head
parent
202fff9451
commit
efb7cf67bd
enterprise
|
@ -23,6 +23,21 @@ type Client struct {
|
|||
dataNodes *ring.Ring
|
||||
}
|
||||
|
||||
// NewClientWithTimeSeries initializes a Client with a known set of TimeSeries.
|
||||
// It is not necessary to call Open when creating a Client using this function
|
||||
func NewClientWithTimeSeries(series ...mrfusion.TimeSeries) *Client {
|
||||
c := &Client{}
|
||||
|
||||
c.dataNodes = ring.New(len(series))
|
||||
|
||||
for _, s := range series {
|
||||
c.dataNodes.Value = s
|
||||
c.dataNodes = c.dataNodes.Next()
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
// NewClient initializes and returns a Client.
|
||||
func (c *Client) Open() error {
|
||||
cluster, err := c.Ctrl.ShowCluster()
|
||||
|
@ -46,11 +61,15 @@ func (c *Client) Open() error {
|
|||
// Query retrieves timeseries information pertaining to a specified query. It
|
||||
// can be cancelled by using a provided context.
|
||||
func (c *Client) Query(ctx context.Context, q mrfusion.Query) (mrfusion.Response, error) {
|
||||
cl := c.dataNodes.Next().Value.(mrfusion.TimeSeries)
|
||||
return cl.Query(ctx, q)
|
||||
return c.nextDataNode().Query(ctx, q)
|
||||
}
|
||||
|
||||
// MonitoredServices returns the services monitored by this Enterprise cluster.
|
||||
func (c *Client) MonitoredServices(ctx context.Context) ([]mrfusion.MonitoredService, error) {
|
||||
return []mrfusion.MonitoredService{}, nil
|
||||
}
|
||||
|
||||
func (c *Client) nextDataNode() mrfusion.TimeSeries {
|
||||
c.dataNodes = c.dataNodes.Next()
|
||||
return c.dataNodes.Value.(mrfusion.TimeSeries)
|
||||
}
|
||||
|
|
|
@ -66,3 +66,23 @@ func Test_Enterprise_IssuesQueries(t *testing.T) {
|
|||
t.Fatal("Expected request to data node but none was received")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_Enterprise_AdvancesDataNodes(t *testing.T) {
|
||||
m1 := mock.NewTimeSeries([]string{"http://host-1.example.com:8086"}, &mock.Response{})
|
||||
m2 := mock.NewTimeSeries([]string{"http://host-2.example.com:8086"}, &mock.Response{})
|
||||
cl := enterprise.NewClientWithTimeSeries(mrfusion.TimeSeries(m1), mrfusion.TimeSeries(m2))
|
||||
|
||||
_, err := cl.Query(context.Background(), mrfusion.Query{"show shards", "_internal", "autogen"})
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while issuing query: err:", err)
|
||||
}
|
||||
|
||||
_, err = cl.Query(context.Background(), mrfusion.Query{"show shards", "_internal", "autogen"})
|
||||
if err != nil {
|
||||
t.Fatal("Unexpected error while issuing query: err:", err)
|
||||
}
|
||||
|
||||
if m1.QueryCtr != 1 || m2.QueryCtr != 1 {
|
||||
t.Fatalf("Expected m1.Query to be called once but was %d. Expected m2.Query to be called once but was %d\n", m1.QueryCtr, m2.QueryCtr)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue