From 2e8cfce7bef5785f57b82e27e53a5ed5a922d2e7 Mon Sep 17 00:00:00 2001 From: David Norton Date: Wed, 20 Jan 2016 18:36:51 -0500 Subject: [PATCH] convert CQ service to new meta client --- services/continuous_querier/service.go | 14 +++++++----- services/continuous_querier/service_test.go | 25 +++++++++++++++------ services/meta/client.go | 11 +++++---- 3 files changed, 33 insertions(+), 17 deletions(-) diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 4c9ec12e35..689b752b35 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -41,11 +41,12 @@ type queryExecutor interface { ExecuteQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}) (<-chan *influxql.Result, error) } -// metaStore is an internal interface to make testing easier. -type metaStore interface { - IsLeader() bool +// metaClient is an internal interface to make testing easier. +type metaClient interface { + AcquireLease(name string) (l *meta.Lease, err error) Databases() ([]meta.DatabaseInfo, error) Database(name string) (*meta.DatabaseInfo, error) + NodeID() uint64 } // RunRequest is a request to run one or more CQs. @@ -72,7 +73,7 @@ func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool { // Service manages continuous query execution. type Service struct { - MetaClient metaStore + MetaClient metaClient QueryExecutor queryExecutor Config *Config RunInterval time.Duration @@ -183,6 +184,7 @@ func (s *Service) Run(database, name string, t time.Time) error { // backgroundLoop runs on a go routine and periodically executes CQs. func (s *Service) backgroundLoop() { + leaseName := "continuous_querier" defer s.wg.Done() for { select { @@ -190,12 +192,12 @@ func (s *Service) backgroundLoop() { s.Logger.Println("continuous query service terminating") return case req := <-s.RunCh: - if s.MetaClient.IsLeader() { + if _, err := s.MetaClient.AcquireLease(leaseName); err == nil { s.Logger.Printf("running continuous queries by request for time: %v", req.Now) s.runContinuousQueries(req) } case <-time.After(s.RunInterval): - if s.MetaClient.IsLeader() { + if _, err := s.MetaClient.AcquireLease(leaseName); err == nil { s.runContinuousQueries(&RunRequest{Now: time.Now()}) } } diff --git a/services/continuous_querier/service_test.go b/services/continuous_querier/service_test.go index 2ecac123e1..b2aaa25f92 100644 --- a/services/continuous_querier/service_test.go +++ b/services/continuous_querier/service_test.go @@ -294,24 +294,35 @@ func NewTestService(t *testing.T) *Service { type MetaClient struct { mu sync.RWMutex Leader bool + AllowLease bool DatabaseInfos []meta.DatabaseInfo Err error t *testing.T + nodeID uint64 } // NewMetaClient returns a *MetaClient. func NewMetaClient(t *testing.T) *MetaClient { return &MetaClient{ - Leader: true, - t: t, + Leader: true, + AllowLease: true, + t: t, + nodeID: 1, } } -// IsLeader returns true if the node is the cluster leader. -func (ms *MetaClient) IsLeader() bool { - ms.mu.RLock() - defer ms.mu.RUnlock() - return ms.Leader +// NodeID returns the client's node ID. +func (ms *MetaClient) NodeID() uint64 { return ms.nodeID } + +// AcquireLease attempts to acquire the specified lease. +func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) { + if ms.Leader { + if ms.AllowLease { + return &meta.Lease{Name: name}, nil + } + return nil, errors.New("another node owns the lease") + } + return nil, meta.ErrServiceUnavailable } // Databases returns a list of database info about each database in the cluster. diff --git a/services/meta/client.go b/services/meta/client.go index 727c041f38..ee2903c700 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -51,7 +51,7 @@ var ( type Client struct { tls bool logger *log.Logger - NodeID uint64 + nodeID uint64 mu sync.RWMutex metaServers []string @@ -115,6 +115,9 @@ func (c *Client) Close() error { return nil } +// GetNodeID returns the client's node ID. +func (c *Client) NodeID() uint64 { return c.nodeID } + // Ping will hit the ping endpoint for the metaservice and return nil if // it returns 200. If checkAllMetaServers is set to true, it will hit the // ping endpoint and tell it to verify the health of all metaservers in the @@ -168,7 +171,7 @@ func (c *Client) acquireLease(name string) (*Lease, error) { c.mu.RLock() server := c.metaServers[0] c.mu.RUnlock() - url := fmt.Sprintf("%s/lease?name=%s&nodeid=%d", c.url(server), name, c.NodeID) + url := fmt.Sprintf("%s/lease?name=%s&nodeid=%d", c.url(server), name, c.nodeID) resp, err := http.Get(url) if err != nil { @@ -253,7 +256,7 @@ func (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) { return nil, err } - c.NodeID = n.ID + c.nodeID = n.ID return n, nil } @@ -823,7 +826,7 @@ func (c *Client) CreateMetaNode(httpAddr, tcpAddr string) (*NodeInfo, error) { return nil, errors.New("new meta node not found") } - c.NodeID = n.ID + c.nodeID = n.ID return n, nil }