Wire up meta service and client recovery.
* increase sleep on error in client exec in case a server went down so we don't max out retries before a new leader gets elected * update and add close logic to service, handler, raft state, and the clientpull/5428/head
parent
5c20e16406
commit
eda4a6eda0
|
@ -21,7 +21,7 @@ import (
|
|||
const (
|
||||
// errSleep is the time to sleep after we've failed on every metaserver
|
||||
// before making another pass
|
||||
errSleep = 100 * time.Millisecond
|
||||
errSleep = time.Second
|
||||
|
||||
// maxRetries is the maximum number of attemps to make before returning
|
||||
// a failure to the caller
|
||||
|
@ -71,7 +71,12 @@ func (c *Client) Close() error {
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
close(c.closing)
|
||||
select {
|
||||
case <-c.closing:
|
||||
return nil
|
||||
default:
|
||||
close(c.closing)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -149,6 +154,7 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
|
|||
|
||||
err := c.retryUntilExec(internal.Command_CreateDatabaseCommand, internal.E_CreateDatabaseCommand_Command, cmd)
|
||||
if err != nil {
|
||||
fmt.Println("ERROR: ", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -437,6 +443,17 @@ func (c *Client) retryUntilExec(typ internal.Command_Type, desc *proto.Extension
|
|||
var redirectServer string
|
||||
|
||||
for {
|
||||
c.mu.RLock()
|
||||
// exit if we're closed
|
||||
select {
|
||||
case <-c.closing:
|
||||
c.mu.RUnlock()
|
||||
return nil
|
||||
default:
|
||||
// we're still open, continue on
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
|
||||
// build the url to hit the redirect server or the next metaserver
|
||||
var url string
|
||||
if redirectServer != "" {
|
||||
|
@ -526,6 +543,7 @@ func (c *Client) exec(url string, typ internal.Command_Type, desc *proto.Extensi
|
|||
func (c *Client) waitForIndex(idx uint64) {
|
||||
for {
|
||||
c.mu.RLock()
|
||||
fmt.Println("waitForIndex: ", idx, c.data.Index)
|
||||
if c.data.Index >= idx {
|
||||
c.mu.RUnlock()
|
||||
return
|
||||
|
@ -539,6 +557,9 @@ func (c *Client) waitForIndex(idx uint64) {
|
|||
func (c *Client) pollForUpdates() {
|
||||
for {
|
||||
data := c.retryUntilSnapshot(c.index())
|
||||
if data == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// update the data and notify of the change
|
||||
c.mu.Lock()
|
||||
|
@ -588,6 +609,15 @@ func (c *Client) retryUntilSnapshot(idx uint64) *Data {
|
|||
// get the index to look from and the server to poll
|
||||
c.mu.RLock()
|
||||
|
||||
// exit if we're closed
|
||||
select {
|
||||
case <-c.closing:
|
||||
c.mu.RUnlock()
|
||||
return nil
|
||||
default:
|
||||
// we're still open, continue on
|
||||
}
|
||||
|
||||
if currentServer >= len(c.metaServers) {
|
||||
currentServer = 0
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
@ -37,14 +38,20 @@ type handler struct {
|
|||
apply(b []byte) error
|
||||
join(n *NodeInfo) error
|
||||
}
|
||||
s *Service
|
||||
|
||||
mu sync.RWMutex
|
||||
closing chan struct{}
|
||||
}
|
||||
|
||||
// newHandler returns a new instance of handler with routes.
|
||||
func newHandler(c *Config) *handler {
|
||||
func newHandler(c *Config, s *Service) *handler {
|
||||
h := &handler{
|
||||
s: s,
|
||||
config: c,
|
||||
logger: log.New(os.Stderr, "[meta-http] ", log.LstdFlags),
|
||||
loggingEnabled: c.LoggingEnabled,
|
||||
closing: make(chan struct{}),
|
||||
}
|
||||
|
||||
return h
|
||||
|
@ -79,8 +86,36 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *handler) Close() error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
select {
|
||||
case <-h.closing:
|
||||
// do nothing here
|
||||
default:
|
||||
close(h.closing)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *handler) isClosed() error {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
select {
|
||||
case <-h.closing:
|
||||
return fmt.Errorf("server closed")
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// serveExec executes the requested command.
|
||||
func (h *handler) serveExec(w http.ResponseWriter, r *http.Request) {
|
||||
if err := h.isClosed(); err != nil {
|
||||
h.httpError(err, w, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Read the command from the request body.
|
||||
body, err := ioutil.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
|
@ -183,6 +218,11 @@ func validateCommand(b []byte) error {
|
|||
|
||||
// serveSnapshot is a long polling http connection to server cache updates
|
||||
func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
|
||||
if err := h.isClosed(); err != nil {
|
||||
h.httpError(err, w, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// get the current index that client has
|
||||
index, err := strconv.ParseUint(r.URL.Query().Get("index"), 10, 64)
|
||||
if err != nil {
|
||||
|
@ -207,6 +247,9 @@ func (h *handler) serveSnapshot(w http.ResponseWriter, r *http.Request) {
|
|||
case <-w.(http.CloseNotifier).CloseNotify():
|
||||
// Client closed the connection so we're done.
|
||||
return
|
||||
case <-h.closing:
|
||||
h.httpError(fmt.Errorf("server closed"), w, http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -85,7 +85,7 @@ func (s *Service) Open() error {
|
|||
return err
|
||||
}
|
||||
|
||||
handler := newHandler(s.config)
|
||||
handler := newHandler(s.config, s)
|
||||
handler.logger = s.Logger
|
||||
handler.store = s.store
|
||||
s.handler = handler
|
||||
|
@ -108,21 +108,25 @@ func (s *Service) serve() {
|
|||
// Close closes the underlying listener.
|
||||
func (s *Service) Close() error {
|
||||
if s.ln != nil {
|
||||
return s.ln.Close()
|
||||
if err := s.ln.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
s.handler.Close()
|
||||
|
||||
if err := s.store.close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
return s.store.close()
|
||||
}
|
||||
|
||||
// URL returns the HTTP URL.
|
||||
func (s *Service) URL() string {
|
||||
// HTTPAddr returns the bind address for the HTTP API
|
||||
func (s *Service) HTTPAddr() string {
|
||||
return s.httpAddr
|
||||
}
|
||||
|
||||
// RaftAddr returns the bind address for the Raft TCP listener
|
||||
func (s *Service) RaftAddr() string {
|
||||
return s.store.raftState.ln.Addr().String()
|
||||
}
|
||||
|
||||
// Err returns a channel for fatal errors that occur on the listener.
|
||||
func (s *Service) Err() <-chan error { return s.err }
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"os"
|
||||
"path"
|
||||
"runtime"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -29,7 +30,7 @@ func TestMetaService_PingEndpoint(t *testing.T) {
|
|||
}
|
||||
defer s.Close()
|
||||
|
||||
url, err := url.Parse(s.URL())
|
||||
url, err := url.Parse(s.HTTPAddr())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -353,23 +354,6 @@ func TestMetaService_DropRetentionPolicy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// newServiceAndClient returns new data directory, *Service, and *Client or panics.
|
||||
// Caller is responsible for deleting data dir and closing client.
|
||||
func newServiceAndClient() (string, *meta.Service, *meta.Client) {
|
||||
cfg := newConfig()
|
||||
s := newService(cfg)
|
||||
if err := s.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c := meta.NewClient([]string{s.URL()}, false)
|
||||
if err := c.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return cfg.Dir, s, c
|
||||
}
|
||||
|
||||
func TestMetaService_CreateRemoveMetaNode(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -388,7 +372,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) {
|
|||
}
|
||||
defer s1.Close()
|
||||
|
||||
cfg2.JoinPeers = []string{s1.URL()}
|
||||
cfg2.JoinPeers = []string{s1.HTTPAddr()}
|
||||
s2 := newService(cfg2)
|
||||
if err := s2.Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
|
@ -396,14 +380,14 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) {
|
|||
defer s2.Close()
|
||||
|
||||
func() {
|
||||
cfg3.JoinPeers = []string{s2.URL()}
|
||||
cfg3.JoinPeers = []string{s2.HTTPAddr()}
|
||||
s3 := newService(cfg3)
|
||||
if err := s3.Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer s3.Close()
|
||||
|
||||
c1 := meta.NewClient([]string{s1.URL()}, false)
|
||||
c1 := meta.NewClient([]string{s1.HTTPAddr()}, false)
|
||||
if err := c1.Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
@ -415,7 +399,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) {
|
|||
}
|
||||
}()
|
||||
|
||||
c := meta.NewClient([]string{s1.URL()}, false)
|
||||
c := meta.NewClient([]string{s1.HTTPAddr()}, false)
|
||||
if err := c.Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
@ -430,7 +414,7 @@ func TestMetaService_CreateRemoveMetaNode(t *testing.T) {
|
|||
t.Fatalf("meta nodes wrong: %v", metaNodes)
|
||||
}
|
||||
|
||||
cfg4.JoinPeers = []string{s1.URL()}
|
||||
cfg4.JoinPeers = []string{s1.HTTPAddr()}
|
||||
s4 := newService(cfg4)
|
||||
if err := s4.Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
|
@ -450,26 +434,29 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
cfgs := make([]*meta.Config, 3)
|
||||
srvs := make([]*meta.Service, 3)
|
||||
srvs := make([]*testService, 3)
|
||||
for i, _ := range cfgs {
|
||||
c := newConfig()
|
||||
|
||||
cfgs[i] = c
|
||||
|
||||
if i > 0 {
|
||||
c.JoinPeers = []string{srvs[0].URL()}
|
||||
c.JoinPeers = []string{srvs[0].HTTPAddr()}
|
||||
}
|
||||
srvs[i] = newService(c)
|
||||
if err := srvs[i].Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer srvs[i].Close()
|
||||
defer os.RemoveAll(c.Dir)
|
||||
}
|
||||
|
||||
c := meta.NewClient([]string{srvs[2].URL()}, false)
|
||||
c := meta.NewClient([]string{srvs[2].HTTPAddr()}, false)
|
||||
if err := c.Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
metaNodes, _ := c.MetaNodes()
|
||||
if len(metaNodes) != 3 {
|
||||
t.Fatalf("meta nodes wrong: %v", metaNodes)
|
||||
|
@ -484,6 +471,121 @@ func TestMetaService_CommandAgainstNonLeader(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// Ensure that the client will fail over to another server if the leader goes
|
||||
// down. Also ensure that the cluster will come back up successfully after restart
|
||||
func TestMetaService_FailureAndRestartCluster(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
cfgs := make([]*meta.Config, 3)
|
||||
srvs := make([]*testService, 3)
|
||||
for i, _ := range cfgs {
|
||||
c := newConfig()
|
||||
|
||||
cfgs[i] = c
|
||||
|
||||
if i > 0 {
|
||||
c.JoinPeers = []string{srvs[0].HTTPAddr()}
|
||||
}
|
||||
srvs[i] = newService(c)
|
||||
if err := srvs[i].Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
c.HTTPBindAddress = srvs[i].HTTPAddr()
|
||||
c.BindAddress = srvs[i].RaftAddr()
|
||||
c.JoinPeers = nil
|
||||
defer srvs[i].Close()
|
||||
defer os.RemoveAll(c.Dir)
|
||||
}
|
||||
|
||||
c := meta.NewClient([]string{srvs[0].HTTPAddr(), srvs[1].HTTPAddr()}, false)
|
||||
if err := c.Open(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
if _, err := c.CreateDatabase("foo"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if db, err := c.Database("foo"); db == nil || err != nil {
|
||||
t.Fatalf("database foo wasn't created: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := srvs[0].Close(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
if _, err := c.CreateDatabase("bar"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if db, err := c.Database("bar"); db == nil || err != nil {
|
||||
t.Fatalf("database bar wasn't created: %s", err.Error())
|
||||
}
|
||||
|
||||
if err := srvs[1].Close(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
if err := srvs[2].Close(); err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
// give them a second to shut down
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// when we start back up they need to happen simultaneously, otherwise
|
||||
// a leader won't get elected
|
||||
var wg sync.WaitGroup
|
||||
for i, cfg := range cfgs {
|
||||
srvs[i] = newService(cfg)
|
||||
wg.Add(1)
|
||||
go func(srv *testService) {
|
||||
if err := srv.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
wg.Done()
|
||||
}(srvs[i])
|
||||
defer srvs[i].Close()
|
||||
}
|
||||
wg.Wait()
|
||||
time.Sleep(time.Second)
|
||||
|
||||
c2 := meta.NewClient([]string{srvs[0].HTTPAddr()}, false)
|
||||
if err := c2.Open(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer c2.Close()
|
||||
|
||||
if db, err := c2.Database("bar"); db == nil || err != nil {
|
||||
t.Fatalf("database bar wasn't created: %s", err.Error())
|
||||
}
|
||||
|
||||
if _, err := c2.CreateDatabase("asdf"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if db, err := c2.Database("asdf"); db == nil || err != nil {
|
||||
t.Fatalf("database bar wasn't created: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// newServiceAndClient returns new data directory, *Service, and *Client or panics.
|
||||
// Caller is responsible for deleting data dir and closing client.
|
||||
func newServiceAndClient() (string, *testService, *meta.Client) {
|
||||
cfg := newConfig()
|
||||
s := newService(cfg)
|
||||
if err := s.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
c := meta.NewClient([]string{s.HTTPAddr()}, false)
|
||||
if err := c.Open(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return cfg.Dir, s, c
|
||||
}
|
||||
|
||||
func newConfig() *meta.Config {
|
||||
cfg := meta.NewConfig()
|
||||
cfg.BindAddress = "127.0.0.1:0"
|
||||
|
@ -507,7 +609,19 @@ func testTempDir(skip int) string {
|
|||
return dir
|
||||
}
|
||||
|
||||
func newService(cfg *meta.Config) *meta.Service {
|
||||
type testService struct {
|
||||
*meta.Service
|
||||
ln net.Listener
|
||||
}
|
||||
|
||||
func (t *testService) Close() error {
|
||||
if err := t.Service.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return t.ln.Close()
|
||||
}
|
||||
|
||||
func newService(cfg *meta.Config) *testService {
|
||||
// Open shared TCP connection.
|
||||
ln, err := net.Listen("tcp", cfg.BindAddress)
|
||||
if err != nil {
|
||||
|
@ -522,7 +636,7 @@ func newService(cfg *meta.Config) *meta.Service {
|
|||
|
||||
go mux.Serve(ln)
|
||||
|
||||
return s
|
||||
return &testService{Service: s, ln: ln}
|
||||
}
|
||||
|
||||
func mustParseStatement(s string) influxql.Statement {
|
||||
|
|
|
@ -177,8 +177,15 @@ func (s *store) openRaft(initializePeers []string, raftln net.Listener) error {
|
|||
func (s *store) close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
close(s.closing)
|
||||
return nil
|
||||
|
||||
select {
|
||||
case <-s.closing:
|
||||
// already closed
|
||||
return nil
|
||||
default:
|
||||
close(s.closing)
|
||||
return s.raftState.close()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *store) snapshot() (*Data, error) {
|
||||
|
|
Loading…
Reference in New Issue