diff --git a/config.toml.sample b/config.toml.sample index d479ccea20..9bd13668e4 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -141,8 +141,8 @@ point-batch-size = 100 [wal] dir = "/tmp/influxdb/development/wal" -flush-after = 0 # the number of writes after which wal will be flushed, 0 for flushing on every write -bookmark-after = 0 # the number of writes after which a bookmark will be created +flush-after = 1000 # the number of writes after which wal will be flushed, 0 for flushing on every write +bookmark-after = 1000 # the number of writes after which a bookmark will be created # the number of writes after which an index entry is created pointing # to the offset of the first request, default to 1k diff --git a/src/cluster/cluster_configuration.go b/src/cluster/cluster_configuration.go index e041a1cebc..ac418851f2 100644 --- a/src/cluster/cluster_configuration.go +++ b/src/cluster/cluster_configuration.go @@ -35,6 +35,7 @@ type QuerySpec interface { type WAL interface { AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error) Commit(requestNumber uint32, serverId uint32) error + CreateCheckpoint() error RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error } @@ -676,6 +677,10 @@ func (self *ClusterConfiguration) createShards(microsecondsEpoch int64, shardTyp return createdShards, nil } +func (self *ClusterConfiguration) CreateCheckpoint() error { + return self.wal.CreateCheckpoint() +} + func (self *ClusterConfiguration) getStartAndEndBasedOnDuration(microsecondsEpoch int64, duration float64) (*time.Time, *time.Time) { startTimeSeconds := math.Floor(float64(microsecondsEpoch)/1000.0/1000.0/duration) * duration startTime := time.Unix(int64(startTimeSeconds), 0) diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 219baa4f48..99081cad6d 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -606,6 +606,10 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error { return common.NewAuthorizationError("Insufficient permissions to drop database") } + if err := self.clusterConfiguration.CreateCheckpoint(); err != nil { + return err + } + if err := self.raftServer.DropDatabase(db); err != nil { return err } diff --git a/src/integration/benchmark_test.go b/src/integration/benchmark_test.go index cb71e367d8..95304e648f 100644 --- a/src/integration/benchmark_test.go +++ b/src/integration/benchmark_test.go @@ -323,6 +323,46 @@ func (self *IntegrationSuite) TestExplainsWithPassthrough(c *C) { c.Assert(series[0].Points[0][6], Equals, float64(2.0)) } +func (self *IntegrationSuite) TestDataResurrectionAfterRestart(c *C) { + s := self.createPoints("data_resurrection", 1, 10) + b, err := json.Marshal(s) + c.Assert(err, IsNil) + err = self.server.WriteData(string(b)) + c.Assert(err, IsNil) + time.Sleep(time.Second) + fmt.Printf("wrote some data\n") + data, err := self.server.RunQuery("select count(column0) from data_resurrection", "s") + c.Assert(err, IsNil) + series := []*SerializedSeries{} + err = json.Unmarshal(data, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 1) + c.Assert(series[0].Points[0][1], Equals, 10.0) + req, err := http.NewRequest("DELETE", "http://localhost:8086/db/db1?u=root&p=root", nil) + c.Assert(err, IsNil) + resp, err := http.DefaultClient.Do(req) + c.Assert(err, IsNil) + c.Assert(resp.StatusCode, Equals, http.StatusNoContent) + resp, err = http.Post("http://localhost:8086/db?u=root&p=root", "", bytes.NewBufferString("{\"name\":\"db1\", \"replicationFactor\":3}")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusCreated) + resp, err = http.Post("http://localhost:8086/db/db1/users?u=root&p=root", "", bytes.NewBufferString("{\"name\":\"user\", \"password\":\"pass\"}")) + c.Assert(err, IsNil) + defer resp.Body.Close() + c.Assert(resp.StatusCode, Equals, http.StatusOK) + self.server.stop() + time.Sleep(time.Second) + c.Assert(self.server.start(), IsNil) + time.Sleep(5 * time.Second) + data, err = self.server.RunQuery("select count(column0) from data_resurrection", "s") + c.Assert(err, IsNil) + series = []*SerializedSeries{} + err = json.Unmarshal(data, &series) + c.Assert(err, IsNil) + c.Assert(series, HasLen, 0) +} + func (self *IntegrationSuite) TestExplainsWithPassthroughAndLimit(c *C) { points := []string{} for i := 0; i < 101; i++ { diff --git a/src/wal/wal.go b/src/wal/wal.go index 9c39b1c584..8275eaf61a 100644 --- a/src/wal/wal.go +++ b/src/wal/wal.go @@ -124,9 +124,11 @@ func (self *WAL) Commit(requestNumber uint32, serverId uint32) error { func (self *WAL) RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error { requestNumber, ok := self.state.ServerLastRequestNumber[serverId] + requestNumber += 1 if !ok { requestNumber = uint32(self.state.FirstSuffix) } + logger.Info("Recovering server %d from request %d", serverId, requestNumber) return self.RecoverServerFromRequestNumber(requestNumber, shardIds, yield) } @@ -152,29 +154,31 @@ func (self *WAL) RecoverServerFromRequestNumber(requestNumber uint32, shardIds [ firstOffset := int64(-1) // find the log file from which replay will start if the request // number is in range, otherwise replay from all log files - if self.isInRange(requestNumber) { - for idx, logIndex := range self.logIndex { - logger.Debug("Trying to find request %d in %s", requestNumber, self.logFiles[idx].file.Name()) - if firstOffset = logIndex.requestOffset(requestNumber); firstOffset != -1 { - logger.Debug("Found reqeust %d in %s at offset %d", requestNumber, self.logFiles[idx].file.Name(), firstOffset) - firstIndex = idx - break - } - } + if !self.isInRange(requestNumber) { + return nil + } - // the request must be at the end of the current log file - if firstOffset == -1 { - firstIndex = len(self.logIndex) - 1 - firstOffset = self.logIndex[firstIndex].requestOrLastOffset(requestNumber) + for idx, logIndex := range self.logIndex { + logger.Debug("Trying to find request %d in %s", requestNumber, self.logFiles[idx].file.Name()) + if firstOffset = logIndex.requestOffset(requestNumber); firstOffset != -1 { + logger.Debug("Found reqeust %d in %s at offset %d", requestNumber, self.logFiles[idx].file.Name(), firstOffset) + firstIndex = idx + break } } + + // the request must be at the end of the current log file + if firstOffset == -1 { + firstIndex = len(self.logIndex) - 1 + firstOffset = self.logIndex[firstIndex].requestOrLastOffset(requestNumber) + } outer: for idx := firstIndex; idx < len(self.logFiles); idx++ { logFile := self.logFiles[idx] if idx > firstIndex { firstOffset = -1 } - logger.Info("Replaying from %s", logFile.file.Name()) + logger.Info("Replaying from %s:%d", logFile.file.Name(), firstOffset) count := 0 ch, stopChan := logFile.dupAndReplayFromOffset(shardIds, firstOffset, requestNumber) for { @@ -539,6 +543,10 @@ func (self *WAL) bookmark() error { } func (self *WAL) index() error { + if len(self.logFiles) == 0 { + return nil + } + lastIndex := self.logIndex[len(self.logIndex)-1] firstOffset := lastIndex.getLastOffset() lastIndex.addEntry( diff --git a/src/wal/wal_test.go b/src/wal/wal_test.go index 81ac2f3976..63fc2865cc 100644 --- a/src/wal/wal_test.go +++ b/src/wal/wal_test.go @@ -290,7 +290,7 @@ func (_ *WalSuite) TestRequestNumberRollOver(c *C) { } wal.Close() requests := []*protocol.Request{} - wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + wal.RecoverServerFromRequestNumber(firstRequestNumber+1, []uint32{1}, func(req *protocol.Request, shardId uint32) error { requests = append(requests, req) return nil }) @@ -310,7 +310,7 @@ func (_ *WalSuite) TestRequestNumberRollOverAcrossMultipleFiles(c *C) { } wal.Close() requests := []*protocol.Request{} - wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + wal.RecoverServerFromRequestNumber(firstRequestNumber+1, []uint32{1}, func(req *protocol.Request, shardId uint32) error { requests = append(requests, req) return nil }) @@ -319,7 +319,7 @@ func (_ *WalSuite) TestRequestNumberRollOverAcrossMultipleFiles(c *C) { wal.SetServerId(1) c.Assert(err, IsNil) requests = []*protocol.Request{} - wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + wal.RecoverServerFromRequestNumber(firstRequestNumber+1, []uint32{1}, func(req *protocol.Request, shardId uint32) error { requests = append(requests, req) return nil }) @@ -373,7 +373,7 @@ func (_ *WalSuite) TestRecoveryFromCrash(c *C) { wal.SetServerId(1) c.Assert(err, IsNil) requests := []*protocol.Request{} - wal.RecoverServerFromRequestNumber(0, []uint32{1}, func(req *protocol.Request, shardId uint32) error { + wal.RecoverServerFromRequestNumber(1, []uint32{1}, func(req *protocol.Request, shardId uint32) error { requests = append(requests, req) return nil }) @@ -412,7 +412,7 @@ func (_ *WalSuite) TestSimultaneousReplay(c *C) { <-signalChan requests := []*protocol.Request{} - wal.RecoverServerFromRequestNumber(uint32(0), []uint32{1}, func(req *protocol.Request, shardId uint32) error { + wal.RecoverServerFromRequestNumber(uint32(1), []uint32{1}, func(req *protocol.Request, shardId uint32) error { requests = append(requests, req) return nil })