fix #342. Make sure we create a checkpoint before deleting data

Deleting data without creating a wal bookmark will cause the log to
replay some requests that where written before the delete happened,
causing the data to be resurrected. This change makes sure we create a
bookmark right before we delete the data to make sure that any write
before the delete will not be replayed.
pull/342/head
John Shahid 2014-03-26 12:48:08 -04:00
parent 92463617f9
commit 56866841aa
6 changed files with 78 additions and 21 deletions

View File

@ -141,8 +141,8 @@ point-batch-size = 100
[wal]
dir = "/tmp/influxdb/development/wal"
flush-after = 0 # the number of writes after which wal will be flushed, 0 for flushing on every write
bookmark-after = 0 # the number of writes after which a bookmark will be created
flush-after = 1000 # the number of writes after which wal will be flushed, 0 for flushing on every write
bookmark-after = 1000 # the number of writes after which a bookmark will be created
# the number of writes after which an index entry is created pointing
# to the offset of the first request, default to 1k

View File

@ -35,6 +35,7 @@ type QuerySpec interface {
type WAL interface {
AssignSequenceNumbersAndLog(request *protocol.Request, shard wal.Shard) (uint32, error)
Commit(requestNumber uint32, serverId uint32) error
CreateCheckpoint() error
RecoverServerFromRequestNumber(requestNumber uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error
}
@ -676,6 +677,10 @@ func (self *ClusterConfiguration) createShards(microsecondsEpoch int64, shardTyp
return createdShards, nil
}
func (self *ClusterConfiguration) CreateCheckpoint() error {
return self.wal.CreateCheckpoint()
}
func (self *ClusterConfiguration) getStartAndEndBasedOnDuration(microsecondsEpoch int64, duration float64) (*time.Time, *time.Time) {
startTimeSeconds := math.Floor(float64(microsecondsEpoch)/1000.0/1000.0/duration) * duration
startTime := time.Unix(int64(startTimeSeconds), 0)

View File

@ -606,6 +606,10 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
return common.NewAuthorizationError("Insufficient permissions to drop database")
}
if err := self.clusterConfiguration.CreateCheckpoint(); err != nil {
return err
}
if err := self.raftServer.DropDatabase(db); err != nil {
return err
}

View File

@ -323,6 +323,46 @@ func (self *IntegrationSuite) TestExplainsWithPassthrough(c *C) {
c.Assert(series[0].Points[0][6], Equals, float64(2.0))
}
func (self *IntegrationSuite) TestDataResurrectionAfterRestart(c *C) {
s := self.createPoints("data_resurrection", 1, 10)
b, err := json.Marshal(s)
c.Assert(err, IsNil)
err = self.server.WriteData(string(b))
c.Assert(err, IsNil)
time.Sleep(time.Second)
fmt.Printf("wrote some data\n")
data, err := self.server.RunQuery("select count(column0) from data_resurrection", "s")
c.Assert(err, IsNil)
series := []*SerializedSeries{}
err = json.Unmarshal(data, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Points[0][1], Equals, 10.0)
req, err := http.NewRequest("DELETE", "http://localhost:8086/db/db1?u=root&p=root", nil)
c.Assert(err, IsNil)
resp, err := http.DefaultClient.Do(req)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, http.StatusNoContent)
resp, err = http.Post("http://localhost:8086/db?u=root&p=root", "", bytes.NewBufferString("{\"name\":\"db1\", \"replicationFactor\":3}"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusCreated)
resp, err = http.Post("http://localhost:8086/db/db1/users?u=root&p=root", "", bytes.NewBufferString("{\"name\":\"user\", \"password\":\"pass\"}"))
c.Assert(err, IsNil)
defer resp.Body.Close()
c.Assert(resp.StatusCode, Equals, http.StatusOK)
self.server.stop()
time.Sleep(time.Second)
c.Assert(self.server.start(), IsNil)
time.Sleep(5 * time.Second)
data, err = self.server.RunQuery("select count(column0) from data_resurrection", "s")
c.Assert(err, IsNil)
series = []*SerializedSeries{}
err = json.Unmarshal(data, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 0)
}
func (self *IntegrationSuite) TestExplainsWithPassthroughAndLimit(c *C) {
points := []string{}
for i := 0; i < 101; i++ {

View File

@ -124,9 +124,11 @@ func (self *WAL) Commit(requestNumber uint32, serverId uint32) error {
func (self *WAL) RecoverServerFromLastCommit(serverId uint32, shardIds []uint32, yield func(request *protocol.Request, shardId uint32) error) error {
requestNumber, ok := self.state.ServerLastRequestNumber[serverId]
requestNumber += 1
if !ok {
requestNumber = uint32(self.state.FirstSuffix)
}
logger.Info("Recovering server %d from request %d", serverId, requestNumber)
return self.RecoverServerFromRequestNumber(requestNumber, shardIds, yield)
}
@ -152,29 +154,31 @@ func (self *WAL) RecoverServerFromRequestNumber(requestNumber uint32, shardIds [
firstOffset := int64(-1)
// find the log file from which replay will start if the request
// number is in range, otherwise replay from all log files
if self.isInRange(requestNumber) {
for idx, logIndex := range self.logIndex {
logger.Debug("Trying to find request %d in %s", requestNumber, self.logFiles[idx].file.Name())
if firstOffset = logIndex.requestOffset(requestNumber); firstOffset != -1 {
logger.Debug("Found reqeust %d in %s at offset %d", requestNumber, self.logFiles[idx].file.Name(), firstOffset)
firstIndex = idx
break
}
}
if !self.isInRange(requestNumber) {
return nil
}
// the request must be at the end of the current log file
if firstOffset == -1 {
firstIndex = len(self.logIndex) - 1
firstOffset = self.logIndex[firstIndex].requestOrLastOffset(requestNumber)
for idx, logIndex := range self.logIndex {
logger.Debug("Trying to find request %d in %s", requestNumber, self.logFiles[idx].file.Name())
if firstOffset = logIndex.requestOffset(requestNumber); firstOffset != -1 {
logger.Debug("Found reqeust %d in %s at offset %d", requestNumber, self.logFiles[idx].file.Name(), firstOffset)
firstIndex = idx
break
}
}
// the request must be at the end of the current log file
if firstOffset == -1 {
firstIndex = len(self.logIndex) - 1
firstOffset = self.logIndex[firstIndex].requestOrLastOffset(requestNumber)
}
outer:
for idx := firstIndex; idx < len(self.logFiles); idx++ {
logFile := self.logFiles[idx]
if idx > firstIndex {
firstOffset = -1
}
logger.Info("Replaying from %s", logFile.file.Name())
logger.Info("Replaying from %s:%d", logFile.file.Name(), firstOffset)
count := 0
ch, stopChan := logFile.dupAndReplayFromOffset(shardIds, firstOffset, requestNumber)
for {
@ -539,6 +543,10 @@ func (self *WAL) bookmark() error {
}
func (self *WAL) index() error {
if len(self.logFiles) == 0 {
return nil
}
lastIndex := self.logIndex[len(self.logIndex)-1]
firstOffset := lastIndex.getLastOffset()
lastIndex.addEntry(

View File

@ -290,7 +290,7 @@ func (_ *WalSuite) TestRequestNumberRollOver(c *C) {
}
wal.Close()
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
wal.RecoverServerFromRequestNumber(firstRequestNumber+1, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
@ -310,7 +310,7 @@ func (_ *WalSuite) TestRequestNumberRollOverAcrossMultipleFiles(c *C) {
}
wal.Close()
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
wal.RecoverServerFromRequestNumber(firstRequestNumber+1, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
@ -319,7 +319,7 @@ func (_ *WalSuite) TestRequestNumberRollOverAcrossMultipleFiles(c *C) {
wal.SetServerId(1)
c.Assert(err, IsNil)
requests = []*protocol.Request{}
wal.RecoverServerFromRequestNumber(firstRequestNumber, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
wal.RecoverServerFromRequestNumber(firstRequestNumber+1, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
@ -373,7 +373,7 @@ func (_ *WalSuite) TestRecoveryFromCrash(c *C) {
wal.SetServerId(1)
c.Assert(err, IsNil)
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(0, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
wal.RecoverServerFromRequestNumber(1, []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})
@ -412,7 +412,7 @@ func (_ *WalSuite) TestSimultaneousReplay(c *C) {
<-signalChan
requests := []*protocol.Request{}
wal.RecoverServerFromRequestNumber(uint32(0), []uint32{1}, func(req *protocol.Request, shardId uint32) error {
wal.RecoverServerFromRequestNumber(uint32(1), []uint32{1}, func(req *protocol.Request, shardId uint32) error {
requests = append(requests, req)
return nil
})