From 8607a60fe0a1ff86bcdd6a306f0db3ed090a422c Mon Sep 17 00:00:00 2001 From: John Shahid Date: Fri, 1 Aug 2014 16:56:14 -0400 Subject: [PATCH] Make sure we remove the shard metadata whenever it expires --- cluster/cluster_configuration.go | 15 +-------------- coordinator/raft_server.go | 20 ++++++++++++++++++-- server/server.go | 1 - 3 files changed, 19 insertions(+), 17 deletions(-) diff --git a/cluster/cluster_configuration.go b/cluster/cluster_configuration.go index 1d91643600..bc40b75337 100644 --- a/cluster/cluster_configuration.go +++ b/cluster/cluster_configuration.go @@ -180,19 +180,6 @@ func (self *ClusterConfiguration) CreateFutureShardsAutomaticallyBeforeTimeComes }() } -func (self *ClusterConfiguration) PeriodicallyDropShardsWithRetentionPolicies() { - go func() { - for { - time.Sleep(self.config.StorageRetentionSweepPeriod.Duration) - log.Info("Checking for shards to drop") - shards := self.getExpiredShards() - for _, s := range shards { - self.shardStore.DeleteShard(s.id) - } - } - }() -} - func (self *ClusterConfiguration) allSpaces() []*ShardSpace { self.shardLock.RLock() defer self.shardLock.RUnlock() @@ -203,7 +190,7 @@ func (self *ClusterConfiguration) allSpaces() []*ShardSpace { return spaces } -func (self *ClusterConfiguration) getExpiredShards() []*ShardData { +func (self *ClusterConfiguration) GetExpiredShards() []*ShardData { self.shardLock.RLock() defer self.shardLock.RUnlock() shards := make([]*ShardData, 0) diff --git a/coordinator/raft_server.go b/coordinator/raft_server.go index e600b858ca..8439b55f07 100644 --- a/coordinator/raft_server.go +++ b/coordinator/raft_server.go @@ -421,7 +421,11 @@ func (s *RaftServer) startRaft() error { func (s *RaftServer) raftEventHandler(e raft.Event) { if e.Value() == "leader" { log.Info("(raft:%s) Selected as leader. Starting leader loop.", s.raftServer.Name()) - go s.raftLeaderLoop(time.NewTicker(1 * time.Second)) + config := s.clusterConfig.GetLocalConfiguration() + retentionSweepPeriod := config.StorageRetentionSweepPeriod.Duration + retentionSweepTimer := time.NewTicker(retentionSweepPeriod) + go s.raftLeaderLoop(time.NewTicker(1*time.Second), + retentionSweepTimer) } if e.PrevValue() == "leader" { @@ -430,13 +434,17 @@ func (s *RaftServer) raftEventHandler(e raft.Event) { } } -func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker) { +func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker, + retentionSweepTimer *time.Ticker) { for { select { case <-loopTimer.C: log.Debug("(raft:%s) Executing leader loop.", s.raftServer.Name()) s.checkContinuousQueries() break + case <-retentionSweepTimer.C: + s.dropShardsWithRetentionPolicies() + break case <-s.notLeader: log.Debug("(raft:%s) Exiting leader loop.", s.raftServer.Name()) return @@ -492,6 +500,14 @@ func (s *RaftServer) checkContinuousQueries() { } } +func (s *RaftServer) dropShardsWithRetentionPolicies() { + log.Info("Checking for shards to drop") + shards := s.clusterConfig.GetExpiredShards() + for _, shard := range shards { + s.DropShard(shard.Id(), shard.ServerIds()) + } +} + func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) { adminName := s.clusterConfig.GetClusterAdmins()[0] clusterAdmin := s.clusterConfig.GetClusterAdmin(adminName) diff --git a/server/server.go b/server/server.go index 0d36dd4733..b9a5b963eb 100644 --- a/server/server.go +++ b/server/server.go @@ -58,7 +58,6 @@ func NewServer(config *configuration.Configuration) (*Server, error) { clusterConfig.LocalRaftName = raftServer.GetRaftName() clusterConfig.SetShardCreator(raftServer) clusterConfig.CreateFutureShardsAutomaticallyBeforeTimeComes() - clusterConfig.PeriodicallyDropShardsWithRetentionPolicies() coord := coordinator.NewCoordinatorImpl(config, raftServer, clusterConfig, metaStore) requestHandler := coordinator.NewProtobufRequestHandler(coord, clusterConfig)