From eda4a6eda097ca33a572f59b9ac0bff1b94f50a6 Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Sat, 2 Jan 2016 15:12:54 -0500 Subject: [PATCH] Wire up meta service and client recovery. * increase sleep on error in client exec in case a server went down so we don't max out retries before a new leader gets elected * update and add close logic to service, handler, raft state, and the client --- services/meta/client.go | 34 ++++++- services/meta/handler.go | 45 ++++++++- services/meta/service.go | 22 +++-- services/meta/service_test.go | 170 ++++++++++++++++++++++++++++------ services/meta/store.go | 11 ++- 5 files changed, 240 insertions(+), 42 deletions(-) diff --git a/services/meta/client.go b/services/meta/client.go index 293bedee3a..8430fdee62 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -21,7 +21,7 @@ import ( const ( // errSleep is the time to sleep after we've failed on every metaserver // before making another pass - errSleep = 100 * time.Millisecond + errSleep = time.Second // maxRetries is the maximum number of attemps to make before returning // a failure to the caller @@ -71,7 +71,12 @@ func (c *Client) Close() error { c.mu.Lock() defer c.mu.Unlock() - close(c.closing) + select { + case <-c.closing: + return nil + default: + close(c.closing) + } return nil } @@ -149,6 +154,7 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) { err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd) if err != nil { + fmt.Println("ERROR: ", err) return nil, err } @@ -437,6 +443,17 @@ func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.Extension var redirectServer string for { + c.mu.RLock() + // exit if we're closed + select { + case <-c.closing: + c.mu.RUnlock() + return nil + default: + // we're still open, continue on + } + c.mu.RUnlock() + // build the url to hit the redirect server or the next metaserver var url string if redirectServer != "" { @@ -526,6 +543,7 @@ func (c *Client) exec(url string, typ internal.Command_Type, desc *proto.Extensi func (c *Client) waitForIndex(idx uint64) { for { c.mu.RLock() + fmt.Println("waitForIndex: ", idx, c.data.Index) if c.data.Index >= idx { c.mu.RUnlock() return @@ -539,6 +557,9 @@ func (c *Client) waitForIndex(idx uint64) { func (c *Client) pollForUpdates() { for { data := c.retryUntilSnapshot(c.index()) + if data == nil { + return + } // update the data and notify of the change c.mu.Lock() @@ -588,6 +609,15 @@ func (c *Client) retryUntilSnapshot(idx uint64) *Data { // get the index to look from and the server to poll c.mu.RLock() + // exit if we're closed + select { + case <-c.closing: + c.mu.RUnlock() + return nil + default: + // we're still open, continue on + } + if currentServer >= len(c.metaServers) { currentServer = 0 } diff --git a/services/meta/handler.go b/services/meta/handler.go index f817cfba8e..da0c95ab59 100644 --- a/services/meta/handler.go +++ b/services/meta/handler.go @@ -12,6 +12,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "github.com/gogo/protobuf/proto" @@ -37,14 +38,20 @@ type handler struct { apply(b []byte) error join(n *NodeInfo) error } + s *Service + + mu sync.RWMutex + closing chan struct{} } // newHandler returns a new instance of handler with routes. -func newHandler(c *Config) *handler { +func newHandler(c *Config, s *Service) *handler { h := &handler{ + s: s, config: c, logger: log.New(os.Stderr, "[meta-http] ", log.LstdFlags), loggingEnabled: c.LoggingEnabled, + closing: make(chan struct{}), } return h @@ -79,8 +86,36 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } } +func (h *handler) Close() error { + h.mu.Lock() + defer h.mu.Unlock() + select { + case <-h.closing: + // do nothing here + default: + close(h.closing) + } + return nil +} + +func (h *handler) isClosed() error { + h.mu.RLock() + defer h.mu.RUnlock() + select { + case <-h.closing: + return fmt.Errorf("server closed") + default: + return nil + } +} + // serveExec executes the requested command. func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) { + if err := h.isClosed(); err != nil { + h.httpError(err, w, http.StatusInternalServerError) + return + } + // Read the command from the request body. body, err := ioutil.ReadAll(r.Body) if err != nil { @@ -183,6 +218,11 @@ func validateCommand(b []byte) error { // serveSnapshot is a long polling http connection to server cache updates func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { + if err := h.isClosed(); err != nil { + h.httpError(err, w, http.StatusInternalServerError) + return + } + // get the current index that client has index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64) if err != nil { @@ -207,6 +247,9 @@ func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) { case <-w.(http.CloseNotifier).CloseNotify(): // Client closed the connection so we're done. return + case <-h.closing: + h.httpError(fmt.Errorf("server closed"), w, http.StatusInternalServerError) + return } } diff --git a/services/meta/service.go b/services/meta/service.go index 400574346b..fea9782a99 100644 --- a/services/meta/service.go +++ b/services/meta/service.go @@ -85,7 +85,7 @@ func (s *Service) Open() error { return err } - handler := newHandler(s.config) + handler := newHandler(s.config, s) handler.logger = s.Logger handler.store = s.store s.handler = handler @@ -108,21 +108,25 @@ func (s *Service) serve() { // Close closes the underlying listener. func (s *Service) Close() error { if s.ln != nil { - return s.ln.Close() + if err := s.ln.Close(); err != nil { + return err + } } + s.handler.Close() - if err := s.store.close(); err != nil { - return err - } - - return nil + return s.store.close() } -// URL returns the HTTP URL. -func (s *Service) URL() string { +// HTTPAddr returns the bind address for the HTTP API +func (s *Service) HTTPAddr() string { return s.httpAddr } +// RaftAddr returns the bind address for the Raft TCP listener +func (s *Service) RaftAddr() string { + return s.store.raftState.ln.Addr().String() +} + // Err returns a channel for fatal errors that occur on the listener. func (s *Service) Err() <-chan error { return s.err } diff --git a/services/meta/service_test.go b/services/meta/service_test.go index 1173bdc980..dd87eb68a3 100644 --- a/services/meta/service_test.go +++ b/services/meta/service_test.go @@ -8,6 +8,7 @@ import ( "os" "path" "runtime" + "sync" "testing" "time" @@ -29,7 +30,7 @@ func TestMetaService_PingEndpoint(t *testing.T) { } defer s.Close() - url, err := url.Parse(s.URL()) + url, err := url.Parse(s.HTTPAddr()) if err != nil { t.Fatal(err) } @@ -353,23 +354,6 @@ func TestMetaService_DropRetentionPolicy(t *testing.T) { } } -// newServiceAndClient returns new data directory, *Service, and *Client or panics. -// Caller is responsible for deleting data dir and closing client. -func newServiceAndClient() (string, *meta.Service, *meta.Client) { - cfg := newConfig() - s := newService(cfg) - if err := s.Open(); err != nil { - panic(err) - } - - c := meta.NewClient([]string{s.URL()}, false) - if err := c.Open(); err != nil { - panic(err) - } - - return cfg.Dir, s, c -} - func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Parallel() @@ -388,7 +372,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { } defer s1.Close() - cfg2.JoinPeers = []string{s1.URL()} + cfg2.JoinPeers = []string{s1.HTTPAddr()} s2 := newService(cfg2) if err := s2.Open(); err != nil { t.Fatal(err.Error()) @@ -396,14 +380,14 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { defer s2.Close() func() { - cfg3.JoinPeers = []string{s2.URL()} + cfg3.JoinPeers = []string{s2.HTTPAddr()} s3 := newService(cfg3) if err := s3.Open(); err != nil { t.Fatal(err.Error()) } defer s3.Close() - c1 := meta.NewClient([]string{s1.URL()}, false) + c1 := meta.NewClient([]string{s1.HTTPAddr()}, false) if err := c1.Open(); err != nil { t.Fatal(err.Error()) } @@ -415,7 +399,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { } }() - c := meta.NewClient([]string{s1.URL()}, false) + c := meta.NewClient([]string{s1.HTTPAddr()}, false) if err := c.Open(); err != nil { t.Fatal(err.Error()) } @@ -430,7 +414,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) { t.Fatalf("meta nodes wrong: %v", metaNodes) } - cfg4.JoinPeers = []string{s1.URL()} + cfg4.JoinPeers = []string{s1.HTTPAddr()} s4 := newService(cfg4) if err := s4.Open(); err != nil { t.Fatal(err.Error()) @@ -450,26 +434,29 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) { t.Parallel() cfgs := make([]*meta.Config, 3) - srvs := make([]*meta.Service, 3) + srvs := make([]*testService, 3) for i, _ := range cfgs { c := newConfig() cfgs[i] = c if i > 0 { - c.JoinPeers = []string{srvs[0].URL()} + c.JoinPeers = []string{srvs[0].HTTPAddr()} } srvs[i] = newService(c) if err := srvs[i].Open(); err != nil { t.Fatal(err.Error()) } defer srvs[i].Close() + defer os.RemoveAll(c.Dir) } - c := meta.NewClient([]string{srvs[2].URL()}, false) + c := meta.NewClient([]string{srvs[2].HTTPAddr()}, false) if err := c.Open(); err != nil { t.Fatal(err.Error()) } + defer c.Close() + metaNodes, _ := c.MetaNodes() if len(metaNodes) != 3 { t.Fatalf("meta nodes wrong: %v", metaNodes) @@ -484,6 +471,121 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) { } } +// Ensure that the client will fail over to another server if the leader goes +// down. Also ensure that the cluster will come back up successfully after restart +func TestMetaService_FailureAndRestartCluster(t *testing.T) { + t.Parallel() + + cfgs := make([]*meta.Config, 3) + srvs := make([]*testService, 3) + for i, _ := range cfgs { + c := newConfig() + + cfgs[i] = c + + if i > 0 { + c.JoinPeers = []string{srvs[0].HTTPAddr()} + } + srvs[i] = newService(c) + if err := srvs[i].Open(); err != nil { + t.Fatal(err.Error()) + } + c.HTTPBindAddress = srvs[i].HTTPAddr() + c.BindAddress = srvs[i].RaftAddr() + c.JoinPeers = nil + defer srvs[i].Close() + defer os.RemoveAll(c.Dir) + } + + c := meta.NewClient([]string{srvs[0].HTTPAddr(), srvs[1].HTTPAddr()}, false) + if err := c.Open(); err != nil { + t.Fatal(err.Error()) + } + defer c.Close() + + if _, err := c.CreateDatabase("foo"); err != nil { + t.Fatal(err) + } + + if db, err := c.Database("foo"); db == nil || err != nil { + t.Fatalf("database foo wasn't created: %s", err.Error()) + } + + if err := srvs[0].Close(); err != nil { + t.Fatal(err.Error()) + } + + if _, err := c.CreateDatabase("bar"); err != nil { + t.Fatal(err) + } + + if db, err := c.Database("bar"); db == nil || err != nil { + t.Fatalf("database bar wasn't created: %s", err.Error()) + } + + if err := srvs[1].Close(); err != nil { + t.Fatal(err.Error()) + } + if err := srvs[2].Close(); err != nil { + t.Fatal(err.Error()) + } + + // give them a second to shut down + time.Sleep(time.Second) + + // when we start back up they need to happen simultaneously, otherwise + // a leader won't get elected + var wg sync.WaitGroup + for i, cfg := range cfgs { + srvs[i] = newService(cfg) + wg.Add(1) + go func(srv *testService) { + if err := srv.Open(); err != nil { + panic(err) + } + wg.Done() + }(srvs[i]) + defer srvs[i].Close() + } + wg.Wait() + time.Sleep(time.Second) + + c2 := meta.NewClient([]string{srvs[0].HTTPAddr()}, false) + if err := c2.Open(); err != nil { + t.Fatal(err) + } + defer c2.Close() + + if db, err := c2.Database("bar"); db == nil || err != nil { + t.Fatalf("database bar wasn't created: %s", err.Error()) + } + + if _, err := c2.CreateDatabase("asdf"); err != nil { + t.Fatal(err) + } + + if db, err := c2.Database("asdf"); db == nil || err != nil { + t.Fatalf("database bar wasn't created: %s", err.Error()) + } +} + +// newServiceAndClient returns new data directory, *Service, and *Client or panics. +// Caller is responsible for deleting data dir and closing client. +func newServiceAndClient() (string, *testService, *meta.Client) { + cfg := newConfig() + s := newService(cfg) + if err := s.Open(); err != nil { + panic(err) + } + + c := meta.NewClient([]string{s.HTTPAddr()}, false) + if err := c.Open(); err != nil { + panic(err) + } + + return cfg.Dir, s, c +} + func newConfig() *meta.Config { cfg := meta.NewConfig() cfg.BindAddress = "127.0.0.1:0" @@ -507,7 +609,19 @@ func testTempDir(skip int) string { return dir } -func newService(cfg *meta.Config) *meta.Service { +type testService struct { + *meta.Service + ln net.Listener +} + +func (t *testService) Close() error { + if err := t.Service.Close(); err != nil { + return err + } + return t.ln.Close() +} + +func newService(cfg *meta.Config) *testService { // Open shared TCP connection. ln, err := net.Listen("tcp", cfg.BindAddress) if err != nil { @@ -522,7 +636,7 @@ func newService(cfg *meta.Config) *meta.Service { go mux.Serve(ln) - return s + return &testService{Service: s, ln: ln} } func mustParseStatement(s string) influxql.Statement { diff --git a/services/meta/store.go b/services/meta/store.go index c92a8f00db..f71fda68ad 100644 --- a/services/meta/store.go +++ b/services/meta/store.go @@ -177,8 +177,15 @@ func (s *store) openRaft(initializePeers []string, raftln net.Listener) error { func (s *store) close() error { s.mu.Lock() defer s.mu.Unlock() - close(s.closing) - return nil + + select { + case <-s.closing: + // already closed + return nil + default: + close(s.closing) + return s.raftState.close() + } } func (s *store) snapshot() (*Data, error) {