Make sure we remove the shard metadata whenever it expires
parent
adfbdb0e4e
commit
8607a60fe0
|
@ -180,19 +180,6 @@ func (self *ClusterConfiguration) CreateFutureShardsAutomaticallyBeforeTimeComes
|
|||
}()
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) PeriodicallyDropShardsWithRetentionPolicies() {
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(self.config.StorageRetentionSweepPeriod.Duration)
|
||||
log.Info("Checking for shards to drop")
|
||||
shards := self.getExpiredShards()
|
||||
for _, s := range shards {
|
||||
self.shardStore.DeleteShard(s.id)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) allSpaces() []*ShardSpace {
|
||||
self.shardLock.RLock()
|
||||
defer self.shardLock.RUnlock()
|
||||
|
@ -203,7 +190,7 @@ func (self *ClusterConfiguration) allSpaces() []*ShardSpace {
|
|||
return spaces
|
||||
}
|
||||
|
||||
func (self *ClusterConfiguration) getExpiredShards() []*ShardData {
|
||||
func (self *ClusterConfiguration) GetExpiredShards() []*ShardData {
|
||||
self.shardLock.RLock()
|
||||
defer self.shardLock.RUnlock()
|
||||
shards := make([]*ShardData, 0)
|
||||
|
|
|
@ -421,7 +421,11 @@ func (s *RaftServer) startRaft() error {
|
|||
func (s *RaftServer) raftEventHandler(e raft.Event) {
|
||||
if e.Value() == "leader" {
|
||||
log.Info("(raft:%s) Selected as leader. Starting leader loop.", s.raftServer.Name())
|
||||
go s.raftLeaderLoop(time.NewTicker(1 * time.Second))
|
||||
config := s.clusterConfig.GetLocalConfiguration()
|
||||
retentionSweepPeriod := config.StorageRetentionSweepPeriod.Duration
|
||||
retentionSweepTimer := time.NewTicker(retentionSweepPeriod)
|
||||
go s.raftLeaderLoop(time.NewTicker(1*time.Second),
|
||||
retentionSweepTimer)
|
||||
}
|
||||
|
||||
if e.PrevValue() == "leader" {
|
||||
|
@ -430,13 +434,17 @@ func (s *RaftServer) raftEventHandler(e raft.Event) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker) {
|
||||
func (s *RaftServer) raftLeaderLoop(loopTimer *time.Ticker,
|
||||
retentionSweepTimer *time.Ticker) {
|
||||
for {
|
||||
select {
|
||||
case <-loopTimer.C:
|
||||
log.Debug("(raft:%s) Executing leader loop.", s.raftServer.Name())
|
||||
s.checkContinuousQueries()
|
||||
break
|
||||
case <-retentionSweepTimer.C:
|
||||
s.dropShardsWithRetentionPolicies()
|
||||
break
|
||||
case <-s.notLeader:
|
||||
log.Debug("(raft:%s) Exiting leader loop.", s.raftServer.Name())
|
||||
return
|
||||
|
@ -492,6 +500,14 @@ func (s *RaftServer) checkContinuousQueries() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *RaftServer) dropShardsWithRetentionPolicies() {
|
||||
log.Info("Checking for shards to drop")
|
||||
shards := s.clusterConfig.GetExpiredShards()
|
||||
for _, shard := range shards {
|
||||
s.DropShard(shard.Id(), shard.ServerIds())
|
||||
}
|
||||
}
|
||||
|
||||
func (s *RaftServer) runContinuousQuery(db string, query *parser.SelectQuery, start time.Time, end time.Time) {
|
||||
adminName := s.clusterConfig.GetClusterAdmins()[0]
|
||||
clusterAdmin := s.clusterConfig.GetClusterAdmin(adminName)
|
||||
|
|
|
@ -58,7 +58,6 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
|
|||
clusterConfig.LocalRaftName = raftServer.GetRaftName()
|
||||
clusterConfig.SetShardCreator(raftServer)
|
||||
clusterConfig.CreateFutureShardsAutomaticallyBeforeTimeComes()
|
||||
clusterConfig.PeriodicallyDropShardsWithRetentionPolicies()
|
||||
|
||||
coord := coordinator.NewCoordinatorImpl(config, raftServer, clusterConfig, metaStore)
|
||||
requestHandler := coordinator.NewProtobufRequestHandler(coord, clusterConfig)
|
||||
|
|
Loading…
Reference in New Issue