Fix #215. Fix hangs on startup after raft compaction

Because raft compaction compact the entire log in one state, commands in
the log aren't replayed on startup. Instead the entire state is loaded
this prevented the AddPotentialServer from being called which in turn
didn't put anything in the channel
pull/215/head
John Shahid 2014-01-30 15:44:41 -05:00
parent 3d0b5270e5
commit 25cb03e38c
7 changed files with 70 additions and 4 deletions

View File

@ -130,6 +130,9 @@ func (self *HttpServer) Serve(listener net.Listener) {
// healthcheck // healthcheck
self.registerEndpoint(p, "get", "/ping", self.ping) self.registerEndpoint(p, "get", "/ping", self.ping)
// force a raft log compaction
self.registerEndpoint(p, "post", "/raft/force_compaction", self.forceRaftCompaction)
// fetch current list of available interfaces // fetch current list of available interfaces
self.registerEndpoint(p, "get", "/interfaces", self.listInterfaces) self.registerEndpoint(p, "get", "/interfaces", self.listInterfaces)
@ -267,6 +270,13 @@ func TimePrecisionFromString(s string) (TimePrecision, error) {
return 0, fmt.Errorf("Unknown time precision %s", s) return 0, fmt.Errorf("Unknown time precision %s", s)
} }
func (self *HttpServer) forceRaftCompaction(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(user common.User) (int, interface{}) {
self.coordinator.ForceCompaction(user)
return libhttp.StatusOK, "OK"
})
}
func (self *HttpServer) sendCrossOriginHeader(w libhttp.ResponseWriter, r *libhttp.Request) { func (self *HttpServer) sendCrossOriginHeader(w libhttp.ResponseWriter, r *libhttp.Request) {
w.WriteHeader(libhttp.StatusOK) w.WriteHeader(libhttp.StatusOK)
} }

View File

@ -40,6 +40,7 @@ type ClusterConfiguration struct {
ClusterVersion uint32 ClusterVersion uint32
config *configuration.Configuration config *configuration.Configuration
addedLocalServerWait chan bool addedLocalServerWait chan bool
addedLocalServer bool
} }
type ContinuousQuery struct { type ContinuousQuery struct {
@ -307,6 +308,7 @@ func (self *ClusterConfiguration) AddPotentialServer(server *ClusterServer) {
log.Info("Added the local server") log.Info("Added the local server")
self.localServerId = server.Id self.localServerId = server.Id
self.addedLocalServerWait <- true self.addedLocalServerWait <- true
self.addedLocalServer = true
} }
} }
@ -569,9 +571,23 @@ func (self *ClusterConfiguration) Recovery(b []byte) error {
server.Connect() server.Connect()
} }
} }
self.hasRunningServers = data.HasRunningServers self.hasRunningServers = data.HasRunningServers
self.localServerId = data.LocalServerId self.localServerId = data.LocalServerId
self.ClusterVersion = data.ClusterVersion self.ClusterVersion = data.ClusterVersion
if self.addedLocalServer {
return nil
}
for _, server := range self.servers {
if server.ProtobufConnectionString != self.config.ProtobufConnectionString() {
continue
}
self.addedLocalServerWait <- true
self.addedLocalServer = true
break
}
return nil return nil
} }

View File

@ -78,6 +78,14 @@ func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsens
return coordinator return coordinator
} }
func (self *CoordinatorImpl) ForceCompaction(user common.User) error {
if !user.IsClusterAdmin() {
return fmt.Errorf("Insufficient permission to force a log compaction")
}
return self.raftServer.ForceLogCompaction()
}
// Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order. // Distributes the query across the cluster and combines the results. Yields as they come in ensuring proper order.
// TODO: make this work even if there is a downed server in the cluster // TODO: make this work even if there is a downed server in the cluster
func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error { func (self *CoordinatorImpl) DistributeQuery(user common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error {

View File

@ -176,7 +176,7 @@ func (self *CoordinatorSuite) TestCanSnapshot(c *C) {
} }
size, err := GetFileSize(server.raftServer.LogPath()) size, err := GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil) c.Assert(err, IsNil)
server.forceLogCompaction() server.ForceLogCompaction()
newSize, err := GetFileSize(server.raftServer.LogPath()) newSize, err := GetFileSize(server.raftServer.LogPath())
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(newSize < size, Equals, true) c.Assert(newSize < size, Equals, true)

View File

@ -22,6 +22,7 @@ type Coordinator interface {
DropDatabase(user common.User, db string) error DropDatabase(user common.User, db string) error
DropSeries(user common.User, db, series string) error DropSeries(user common.User, db, series string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error CreateDatabase(user common.User, db string, replicationFactor uint8) error
ForceCompaction(user common.User) error
ListDatabases(user common.User) ([]*Database, error) ListDatabases(user common.User) ([]*Database, error)
ListSeries(user common.User, database string) ([]*protocol.Series, error) ListSeries(user common.User, database string) ([]*protocol.Series, error)
ReplicateWrite(request *protocol.Request) error ReplicateWrite(request *protocol.Request) error
@ -91,6 +92,8 @@ type ClusterConsensus interface {
// When a cluster is turned on for the first time. // When a cluster is turned on for the first time.
CreateRootUser() error CreateRootUser() error
ForceLogCompaction() error
} }
type RequestHandler interface { type RequestHandler interface {

View File

@ -282,11 +282,13 @@ const (
MAX_SIZE = 10 * MEGABYTE MAX_SIZE = 10 * MEGABYTE
) )
func (s *RaftServer) forceLogCompaction() { func (s *RaftServer) ForceLogCompaction() error {
err := s.raftServer.TakeSnapshot() err := s.raftServer.TakeSnapshot()
if err != nil { if err != nil {
log.Error("Cannot take snapshot: %s", err) log.Error("Cannot take snapshot: %s", err)
return err
} }
return nil
} }
func (s *RaftServer) CompactLog() { func (s *RaftServer) CompactLog() {
@ -306,9 +308,9 @@ func (s *RaftServer) CompactLog() {
if size < MAX_SIZE { if size < MAX_SIZE {
continue continue
} }
s.forceLogCompaction() s.ForceLogCompaction()
case <-forceCompactionTicker: case <-forceCompactionTicker:
s.forceLogCompaction() s.ForceLogCompaction()
} }
} }
} }

View File

@ -200,6 +200,33 @@ func (self *ServerSuite) TearDownSuite(c *C) {
} }
} }
func (self *ServerSuite) TestRestartAfterCompaction(c *C) {
data := `
[{
"points": [[1]],
"name": "test_restart_after_compaction",
"columns": ["val"]
}]
`
self.serverProcesses[0].Post("/db/test_rep/series?u=paul&p=pass", data, c)
collection := self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c)
c.Assert(collection.Members, HasLen, 1)
series := collection.GetSeries("test_restart_after_compaction", c)
c.Assert(series.Points, HasLen, 1)
resp := self.serverProcesses[0].Post("/raft/force_compaction?u=root&p=root", "", c)
c.Assert(resp.StatusCode, Equals, http.StatusOK)
self.serverProcesses[0].Stop()
time.Sleep(time.Second)
self.serverProcesses[0].Start()
time.Sleep(time.Second * 3)
collection = self.serverProcesses[0].Query("test_rep", "select * from test_restart_after_compaction", false, c)
c.Assert(collection.Members, HasLen, 1)
series = collection.GetSeries("test_restart_after_compaction", c)
c.Assert(series.Points, HasLen, 1)
}
// For issue #140 https://github.com/influxdb/influxdb/issues/140 // For issue #140 https://github.com/influxdb/influxdb/issues/140
func (self *ServerSuite) TestRestartServers(c *C) { func (self *ServerSuite) TestRestartServers(c *C) {
data := ` data := `