From 25cb03e38c3b1d451999dde4f6ecce456d701b52 Mon Sep 17 00:00:00 2001 From: John Shahid Date: Thu, 30 Jan 2014 15:44:41 -0500 Subject: [PATCH] Fix #215. Fix hangs on startup after raft compaction Because raft compaction compact the entire log in one state, commands in the log aren't replayed on startup. Instead the entire state is loaded this prevented the AddPotentialServer from being called which in turn didn't put anything in the channel --- src/api/http/api.go | 10 +++++++++ src/coordinator/cluster_configuration.go | 16 ++++++++++++++ src/coordinator/coordinator.go | 8 +++++++ src/coordinator/coordinator_test.go | 2 +- src/coordinator/interface.go | 3 +++ src/coordinator/raft_server.go | 8 ++++--- src/integration/server_test.go | 27 ++++++++++++++++++++++++ 7 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/api/http/api.go b/src/api/http/api.go index 59756020bb..74b828f882 100644 --- a/src/api/http/api.go +++ b/src/api/http/api.go @@ -130,6 +130,9 @@ func (self *HttpServer) Serve(listener net.Listener) { // healthcheck self.registerEndpoint(p, "get", "/ping", self.ping) + // force a raft log compaction + self.registerEndpoint(p, "post", "/raft/force_compaction", self.forceRaftCompaction) + // fetch current list of available interfaces self.registerEndpoint(p, "get", "/interfaces", self.listInterfaces) @@ -267,6 +270,13 @@ func TimePrecisionFromString(s string) (TimePrecision, error) { return 0, fmt.Errorf("Unknown time precision %s", s) } +func (self *HttpServer) forceRaftCompaction(w libhttp.ResponseWriter, r *libhttp.Request) { + self.tryAsClusterAdmin(w, r, func(user common.User) (int, interface{}) { + self.coordinator.ForceCompaction(user) + return libhttp.StatusOK, "OK" + }) +} + func (self *HttpServer) sendCrossOriginHeader(w libhttp.ResponseWriter, r *libhttp.Request) { w.WriteHeader(libhttp.StatusOK) } diff --git a/src/coordinator/cluster_configuration.go b/src/coordinator/cluster_configuration.go index 1307760ca1..fb8092762d 100644 --- a/src/coordinator/cluster_configuration.go +++ b/src/coordinator/cluster_configuration.go @@ -40,6 +40,7 @@ type ClusterConfiguration struct { ClusterVersion uint32 config *configuration.Configuration addedLocalServerWait chan bool + addedLocalServer bool } type ContinuousQuery struct { @@ -307,6 +308,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) { log.Info("Added the local server") self.localServerId = server.Id self.addedLocalServerWait <- true + self.addedLocalServer = true } } @@ -569,9 +571,23 @@ func (self *ClusterConfiguration) Recovery(b []byte) error { server.Connect() } } + self.hasRunningServers = data.HasRunningServers self.localServerId = data.LocalServerId self.ClusterVersion = data.ClusterVersion + if self.addedLocalServer { + return nil + } + + for _, server := range self.servers { + if server.ProtobufConnectionString != self.config.ProtobufConnectionString() { + continue + } + self.addedLocalServerWait <- true + self.addedLocalServer = true + break + } + return nil } diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 576fdb435c..a874f3dfe6 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -78,6 +78,14 @@ func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsens return coordinator } +func (self *CoordinatorImpl) ForceCompaction(user common.User) error { + if !user.IsClusterAdmin() { + return fmt.Errorf("Insufficient permission to force a log compaction") + } + + return self.raftServer.ForceLogCompaction() +} + // Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order. // TODO: make this work even if there is a downed server in the cluster func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error { diff --git a/src/coordinator/coordinator_test.go b/src/coordinator/coordinator_test.go index eac2efd351..59e7047fe8 100644 --- a/src/coordinator/coordinator_test.go +++ b/src/coordinator/coordinator_test.go @@ -176,7 +176,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) { } size, err := GetFileSize(server.raftServer.LogPath()) c.Assert(err, IsNil) - server.forceLogCompaction() + server.ForceLogCompaction() newSize, err := GetFileSize(server.raftServer.LogPath()) c.Assert(err, IsNil) c.Assert(newSize < size, Equals, true) diff --git a/src/coordinator/interface.go b/src/coordinator/interface.go index 40fa4be9c0..14f2c301f7 100644 --- a/src/coordinator/interface.go +++ b/src/coordinator/interface.go @@ -22,6 +22,7 @@ type Coordinator interface { DropDatabase(user common.User, db string) error DropSeries(user common.User, db, series string) error CreateDatabase(user common.User, db string, replicationFactor uint8) error + ForceCompaction(user common.User) error ListDatabases(user common.User) ([]*Database, error) ListSeries(user common.User, database string) ([]*protocol.Series, error) ReplicateWrite(request *protocol.Request) error @@ -91,6 +92,8 @@ type ClusterConsensus interface { // When a cluster is turned on for the first time. CreateRootUser() error + + ForceLogCompaction() error } type RequestHandler interface { diff --git a/src/coordinator/raft_server.go b/src/coordinator/raft_server.go index 5e473b8610..5f73fc9703 100644 --- a/src/coordinator/raft_server.go +++ b/src/coordinator/raft_server.go @@ -282,11 +282,13 @@ const ( MAX_SIZE = 10 * MEGABYTE ) -func (s *RaftServer) forceLogCompaction() { +func (s *RaftServer) ForceLogCompaction() error { err := s.raftServer.TakeSnapshot() if err != nil { log.Error("Cannot take snapshot: %s", err) + return err } + return nil } func (s *RaftServer) CompactLog() { @@ -306,9 +308,9 @@ func (s *RaftServer) CompactLog() { if size < MAX_SIZE { continue } - s.forceLogCompaction() + s.ForceLogCompaction() case <-forceCompactionTicker: - s.forceLogCompaction() + s.ForceLogCompaction() } } } diff --git a/src/integration/server_test.go b/src/integration/server_test.go index b0ab86eb8b..223ef64a3f 100644 --- a/src/integration/server_test.go +++ b/src/integration/server_test.go @@ -200,6 +200,33 @@ func (self *ServerSuite) TearDownSuite(c *C) { } } +func (self *ServerSuite) TestRestartAfterCompaction(c *C) { + data := ` + [{ + "points": [[1]], + "name": "test_restart_after_compaction", + "columns": ["val"] + }] + ` + self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c) + + collection := self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c) + c.Assert(collection.Members, HasLen, 1) + series := collection.GetSeries("test_restart_after_compaction", c) + c.Assert(series.Points, HasLen, 1) + resp := self.serverProcesses[0].Post("/raft/force_compaction?u=root&p=root", "", c) + c.Assert(resp.StatusCode, Equals, http.StatusOK) + self.serverProcesses[0].Stop() + time.Sleep(time.Second) + self.serverProcesses[0].Start() + time.Sleep(time.Second * 3) + + collection = self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c) + c.Assert(collection.Members, HasLen, 1) + series = collection.GetSeries("test_restart_after_compaction", c) + c.Assert(series.Points, HasLen, 1) +} + // For issue #140 https://github.com/influxdb/influxdb/issues/140 func (self *ServerSuite) TestRestartServers(c *C) { data := `