Merge pull request #1883 from influxdb/retention_crash

RLock Server when checking retention policies
pull/1900/head
Philip O'Toole 2015-03-09 15:52:34 -07:00
commit f6d1a00e23
2 changed files with 28 additions and 9 deletions

View File

@ -6,6 +6,7 @@
- [#1753](https://github.com/influxdb/influxdb/pull/1874): Do Not Panic on Missing Dirs
- [#1877](https://github.com/influxdb/influxdb/pull/1877): Broker clients track broker leader
- [#1862](https://github.com/influxdb/influxdb/pull/1862): Fix memory leak in `httpd.serveWait`. Thanks @mountkin
- [#1883](https://github.com/influxdb/influxdb/pull/1883): RLock server during retention policy enforcement. Thanks @grisha
- [#1868](https://github.com/influxdb/influxdb/pull/1868): Use `BatchPoints` for `client.Write` method. Thanks @vladlopes, @georgmu, @d2g, @evanphx, @akolosov.
- [#1881](https://github.com/influxdb/influxdb/pull/1881): Update documentation for `client` package. Misc library tweaks.

View File

@ -299,19 +299,37 @@ func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) er
func (s *Server) EnforceRetentionPolicies() {
log.Println("retention policy enforcement check commencing")
// Check all shard groups.
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
if rp.Duration != 0 && g.EndTime.Add(rp.Duration).Before(time.Now().UTC()) {
log.Printf("shard group %d, retention policy %s, database %s due for deletion",
g.ID, rp.Name, db.name)
if err := s.DeleteShardGroup(db.name, rp.Name, g.ID); err != nil {
log.Printf("failed to request deletion of shard group %d: %s", g.ID, err.Error())
type group struct {
Database string
Retention string
ID uint64
}
groups := make([]group, 0)
// Only keep the lock while walking the shard groups, so the lock is not held while
// any deletions take place across the cluster.
func() {
s.mu.RLock()
defer s.mu.RUnlock()
// Check all shard groups.
for _, db := range s.databases {
for _, rp := range db.policies {
for _, g := range rp.shardGroups {
if rp.Duration != 0 && g.EndTime.Add(rp.Duration).Before(time.Now().UTC()) {
log.Printf("shard group %d, retention policy %s, database %s due for deletion",
g.ID, rp.Name, db.name)
groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID})
}
}
}
}
}()
for _, g := range groups {
if err := s.DeleteShardGroup(g.Database, g.Retention, g.ID); err != nil {
log.Printf("failed to request deletion of shard group %d: %s", g.ID, err.Error())
}
}
}