Close #716. Fix #702. don't panic when dropping non-existent shard

The server would crash if you attempted to drop a shard multiple times.
It would also cause a problem on restart if a shard was dropped, but the
server restarted before a raft log compaction.
pull/716/merge
Paul Dix 2014-07-06 16:25:44 -04:00 committed by John Shahid
parent 76c50248b4
commit 2b9d32cf56
3 changed files with 65 additions and 13 deletions

View File

@ -850,6 +850,23 @@ func (self *ClusterConfiguration) GetAllShards() []*ShardData {
return append(sh, self.longTermShards...)
}
func (self *ClusterConfiguration) GetShard(id uint32) *ShardData {
self.shardsByIdLock.RLock()
shard := self.shardsById[id]
self.shardsByIdLock.RUnlock()
// may not be in the map, try to get it from the list
if shard == nil {
for _, s := range self.GetAllShards() {
if s.id == id {
shard = s
break
}
}
}
return shard
}
func (self *ClusterConfiguration) getShardRange(querySpec QuerySpec, shards []*ShardData) []*ShardData {
if querySpec.AllShardsQuery() {
return shards
@ -1104,21 +1121,10 @@ func (self *ClusterConfiguration) shardIdsForServerId(serverId uint32) []uint32
}
func (self *ClusterConfiguration) updateOrRemoveShard(shardId uint32, serverIds []uint32) {
self.shardsByIdLock.RLock()
shard := self.shardsById[shardId]
self.shardsByIdLock.RUnlock()
// may not be in the map, try to get it from the list
if shard == nil {
for _, s := range self.GetAllShards() {
if s.id == shardId {
shard = s
break
}
}
}
shard := self.GetShard(shardId)
if shard == nil {
log.Error("Attempted to remove shard %d, which we couldn't find. %d shards currently loaded.", shardId, len(self.GetAllShards()))
return
}
if len(shard.serverIds) == len(serverIds) {

View File

@ -716,6 +716,10 @@ func (self *RaftServer) CreateShards(shards []*cluster.NewShardData) ([]*cluster
}
func (self *RaftServer) DropShard(id uint32, serverIds []uint32) error {
if self.clusterConfig.GetShard(id) == nil {
log.Warn("Attempted to drop shard that doesn't exist: ", id)
return nil
}
command := NewDropShardCommand(id, serverIds)
_, err := self.doOrProxyCommand(command)
return err

View File

@ -437,6 +437,48 @@ func (self *SingleServerSuite) TestDataResurrectionAfterRestart(c *C) {
c.Assert(series, HasLen, 0)
}
// issue https://github.com/influxdb/influxdb/issues/702. Dropping shards can cause server crash
// Two cases here. First is they try to drop the same shard multiple times. Second is that
// they drop a shard and the server gets restarted so the raft log replays and tries to drop it again.
func (self *SingleServerSuite) TestDropingShardBeforeRestart(c *C) {
s := CreatePoints("data_resurrection", 1, 1)
self.server.WriteData(s, c)
self.server.WaitForServerToSync()
client := self.server.GetClient("", c)
shards, err := client.GetShards()
ids := make(map[uint32]bool)
for _, s := range shards.ShortTerm {
hasId := ids[s.Id]
if hasId {
c.Error("Shard id shows up twice: ", s.Id)
}
ids[s.Id] = true
}
oldShardCount := len(shards.ShortTerm)
oldShardId := shards.ShortTerm[0].Id
client.DropShard(oldShardId, []uint32{uint32(1)})
shards, err = client.GetShards()
c.Assert(err, IsNil)
c.Assert(len(shards.ShortTerm), Equals, oldShardCount-1)
// drop again and don't crash
client.DropShard(oldShardId, []uint32{uint32(1)})
shards, err = client.GetShards()
c.Assert(err, IsNil)
c.Assert(len(shards.ShortTerm), Equals, oldShardCount-1)
// now try to restart
self.server.Stop()
c.Assert(self.server.Start(), IsNil)
self.server.WaitForServerToStart()
shards, err = client.GetShards()
c.Assert(err, IsNil)
c.Assert(len(shards.ShortTerm), Equals, oldShardCount-1)
}
// issue #360
func (self *SingleServerSuite) TestContinuousQueriesAfterCompaction(c *C) {
defer self.server.RemoveAllContinuousQueries("db1", c)