convert CQ service to new meta client

pull/5428/head
David Norton 2016-01-20 18:36:51 -05:00
parent ef596c6b6b
commit 2e8cfce7be
3 changed files with 33 additions and 17 deletions

View File

@ -41,11 +41,12 @@ type queryExecutor interface {
ExecuteQuery(query *influxql.Query, database string, chunkSize int, closing chan struct{}) (<-chan *influxql.Result, error)
}
// metaStore is an internal interface to make testing easier.
type metaStore interface {
IsLeader() bool
// metaClient is an internal interface to make testing easier.
type metaClient interface {
AcquireLease(name string) (l *meta.Lease, err error)
Databases() ([]meta.DatabaseInfo, error)
Database(name string) (*meta.DatabaseInfo, error)
NodeID() uint64
}
// RunRequest is a request to run one or more CQs.
@ -72,7 +73,7 @@ func (rr *RunRequest) matches(cq *meta.ContinuousQueryInfo) bool {
// Service manages continuous query execution.
type Service struct {
MetaClient metaStore
MetaClient metaClient
QueryExecutor queryExecutor
Config *Config
RunInterval time.Duration
@ -183,6 +184,7 @@ func (s *Service) Run(database, name string, t time.Time) error {
// backgroundLoop runs on a go routine and periodically executes CQs.
func (s *Service) backgroundLoop() {
leaseName := "continuous_querier"
defer s.wg.Done()
for {
select {
@ -190,12 +192,12 @@ func (s *Service) backgroundLoop() {
s.Logger.Println("continuous query service terminating")
return
case req := <-s.RunCh:
if s.MetaClient.IsLeader() {
if _, err := s.MetaClient.AcquireLease(leaseName); err == nil {
s.Logger.Printf("running continuous queries by request for time: %v", req.Now)
s.runContinuousQueries(req)
}
case <-time.After(s.RunInterval):
if s.MetaClient.IsLeader() {
if _, err := s.MetaClient.AcquireLease(leaseName); err == nil {
s.runContinuousQueries(&RunRequest{Now: time.Now()})
}
}

View File

@ -294,24 +294,35 @@ func NewTestService(t *testing.T) *Service {
type MetaClient struct {
mu sync.RWMutex
Leader bool
AllowLease bool
DatabaseInfos []meta.DatabaseInfo
Err error
t *testing.T
nodeID uint64
}
// NewMetaClient returns a *MetaClient.
func NewMetaClient(t *testing.T) *MetaClient {
return &MetaClient{
Leader: true,
AllowLease: true,
t: t,
nodeID: 1,
}
}
// IsLeader returns true if the node is the cluster leader.
func (ms *MetaClient) IsLeader() bool {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.Leader
// NodeID returns the client's node ID.
func (ms *MetaClient) NodeID() uint64 { return ms.nodeID }
// AcquireLease attempts to acquire the specified lease.
func (ms *MetaClient) AcquireLease(name string) (l *meta.Lease, err error) {
if ms.Leader {
if ms.AllowLease {
return &meta.Lease{Name: name}, nil
}
return nil, errors.New("another node owns the lease")
}
return nil, meta.ErrServiceUnavailable
}
// Databases returns a list of database info about each database in the cluster.

View File

@ -51,7 +51,7 @@ var (
type Client struct {
tls bool
logger *log.Logger
NodeID uint64
nodeID uint64
mu sync.RWMutex
metaServers []string
@ -115,6 +115,9 @@ func (c *Client) Close() error {
return nil
}
// GetNodeID returns the client's node ID.
func (c *Client) NodeID() uint64 { return c.nodeID }
// Ping will hit the ping endpoint for the metaservice and return nil if
// it returns 200. If checkAllMetaServers is set to true, it will hit the
// ping endpoint and tell it to verify the health of all metaservers in the
@ -168,7 +171,7 @@ func (c *Client) acquireLease(name string) (*Lease, error) {
c.mu.RLock()
server := c.metaServers[0]
c.mu.RUnlock()
url := fmt.Sprintf("%s/lease?name=%s&nodeid=%d", c.url(server), name, c.NodeID)
url := fmt.Sprintf("%s/lease?name=%s&nodeid=%d", c.url(server), name, c.nodeID)
resp, err := http.Get(url)
if err != nil {
@ -253,7 +256,7 @@ func (c *Client) CreateDataNode(httpAddr, tcpAddr string) (*NodeInfo, error) {
return nil, err
}
c.NodeID = n.ID
c.nodeID = n.ID
return n, nil
}
@ -823,7 +826,7 @@ func (c *Client) CreateMetaNode(httpAddr, tcpAddr string) (*NodeInfo, error) {
return nil, errors.New("new meta node not found")
}
c.NodeID = n.ID
c.nodeID = n.ID
return n, nil
}