Export WaitForLeader on MetaStore

This exported function can then be used by Services and
server-reporting, so those components don't make progress until the
cluster is ready.
pull/2931/head
Philip O'Toole 2015-06-11 11:00:31 -07:00
parent f21420c3a6
commit 4dff5f48aa
8 changed files with 43 additions and 6 deletions

View File

@ -337,6 +337,10 @@ func (s *Server) Close() error {
// startServerReporting starts periodic server reporting.
func (s *Server) startServerReporting() {
for {
if err := s.MetaStore.WaitForLeader(30 * time.Second); err != nil {
log.Printf("no leader available for reporting: %s", err.Error())
continue
}
s.reportServer()
<-time.After(24 * time.Hour)
}

View File

@ -186,7 +186,6 @@ func (s *Store) Open() error {
if s.id == 0 {
go s.init()
} else {
s.waitForLeader(10 * time.Second)
close(s.ready)
}
@ -331,7 +330,7 @@ func (s *Store) init() {
// Writes the id of the node to file on success.
func (s *Store) createLocalNode() error {
// Wait for leader.
if err := s.waitForLeader(5 * time.Second); err != nil {
if err := s.WaitForLeader(5 * time.Second); err != nil {
return fmt.Errorf("wait for leader: %s", err)
}
@ -354,8 +353,8 @@ func (s *Store) createLocalNode() error {
return nil
}
// waitForLeader sleeps until a leader is found or a timeout occurs.
func (s *Store) waitForLeader(timeout time.Duration) error {
// WaitForLeader sleeps until a leader is found or a timeout occurs.
func (s *Store) WaitForLeader(timeout time.Duration) error {
// Begin timeout timer.
timer := time.NewTimer(timeout)
defer timer.Stop()

View File

@ -14,6 +14,8 @@ import (
"github.com/kimor79/gollectd"
)
const leaderWaitTimeout = 30 * time.Second
// pointsWriter is an internal interface to make testing easier.
type pointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
@ -21,6 +23,7 @@ type pointsWriter interface {
// metaStore is an internal interface to make testing easier.
type metaStore interface {
WaitForLeader(d time.Duration) error
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
}
@ -62,6 +65,11 @@ func (s *Service) Open() error {
return fmt.Errorf("PointsWriter is nil")
}
if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
s.Logger.Printf("failed to detect a cluster leader: %s", err.Error())
return err
}
if _, err := s.MetaStore.CreateDatabaseIfNotExists(s.Config.Database); err != nil {
s.Logger.Printf("failed to ensure target database %s exists: %s", s.Config.Database, err.Error())
return err

View File

@ -225,6 +225,10 @@ func (ms *testMetaStore) CreateDatabaseIfNotExists(name string) (*meta.DatabaseI
return ms.CreateDatabaseIfNotExistsFn(name)
}
func (ms *testMetaStore) WaitForLeader(d time.Duration) error {
return nil
}
func wait(c chan struct{}, d time.Duration) (err error) {
select {
case <-c:

View File

@ -18,7 +18,8 @@ import (
)
const (
udpBufferSize = 65536
udpBufferSize = 65536
leaderWaitTimeout = 30 * time.Second
)
type Service struct {
@ -43,6 +44,7 @@ type Service struct {
WritePoints(p *cluster.WritePointsRequest) error
}
MetaStore interface {
WaitForLeader(d time.Duration) error
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
}
}
@ -78,7 +80,10 @@ func NewService(c Config) (*Service, error) {
// Open starts the Graphite input processing data.
func (s *Service) Open() error {
var err error
if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
s.logger.Printf("failed to detect a cluster leader: %s", err.Error())
return err
}
if _, err := s.MetaStore.CreateDatabaseIfNotExists(s.database); err != nil {
s.logger.Printf("failed to ensure target database %s exists: %s", s.database, err.Error())
@ -86,6 +91,7 @@ func (s *Service) Open() error {
}
s.logger.Printf("ensured target database %s exists", s.database)
var err error
if strings.ToLower(s.protocol) == "tcp" {
s.addr, err = s.openTCPServer()
} else if strings.ToLower(s.protocol) == "udp" {

View File

@ -393,6 +393,10 @@ func (d *DatabaseCreator) CreateDatabaseIfNotExists(name string) (*meta.Database
return nil, nil
}
func (d *DatabaseCreator) WaitForLeader(t time.Duration) error {
return nil
}
// Test Helpers
func errstr(err error) string {
if err != nil {

View File

@ -19,6 +19,8 @@ import (
"github.com/influxdb/influxdb/tsdb"
)
const leaderWaitTimeout = 30 * time.Second
// Service manages the listener and handler for an HTTP endpoint.
type Service struct {
ln net.Listener // main listener
@ -36,6 +38,7 @@ type Service struct {
WritePoints(p *cluster.WritePointsRequest) error
}
MetaStore interface {
WaitForLeader(d time.Duration) error
CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error)
}
@ -65,6 +68,11 @@ func NewService(c Config) (*Service, error) {
// Open starts the service
func (s *Service) Open() error {
if err := s.MetaStore.WaitForLeader(leaderWaitTimeout); err != nil {
s.Logger.Printf("failed to detect a cluster leader: %s", err.Error())
return err
}
if _, err := s.MetaStore.CreateDatabaseIfNotExists(s.Database); err != nil {
s.Logger.Printf("failed to ensure target database %s exists: %s", s.Database, err.Error())
return err

View File

@ -161,3 +161,7 @@ type DatabaseCreator struct {
func (d *DatabaseCreator) CreateDatabaseIfNotExists(name string) (*meta.DatabaseInfo, error) {
return nil, nil
}
func (d *DatabaseCreator) WaitForLeader(t time.Duration) error {
return nil
}