diff --git a/CHANGELOG.md b/CHANGELOG.md index df110f0501..ed8288e781 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [#1753](https://github.com/influxdb/influxdb/pull/1874): Do Not Panic on Missing Dirs - [#1877](https://github.com/influxdb/influxdb/pull/1877): Broker clients track broker leader - [#1862](https://github.com/influxdb/influxdb/pull/1862): Fix memory leak in `httpd.serveWait`. Thanks @mountkin +- [#1883](https://github.com/influxdb/influxdb/pull/1883): RLock server during retention policy enforcement. Thanks @grisha - [#1868](https://github.com/influxdb/influxdb/pull/1868): Use `BatchPoints` for `client.Write` method. Thanks @vladlopes, @georgmu, @d2g, @evanphx, @akolosov. - [#1881](https://github.com/influxdb/influxdb/pull/1881): Update documentation for `client` package. Misc library tweaks. diff --git a/server.go b/server.go index 5e33d776fb..5e441e2612 100644 --- a/server.go +++ b/server.go @@ -299,19 +299,37 @@ func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) er func (s *Server) EnforceRetentionPolicies() { log.Println("retention policy enforcement check commencing") - // Check all shard groups. - for _, db := range s.databases { - for _, rp := range db.policies { - for _, g := range rp.shardGroups { - if rp.Duration != 0 && g.EndTime.Add(rp.Duration).Before(time.Now().UTC()) { - log.Printf("shard group %d, retention policy %s, database %s due for deletion", - g.ID, rp.Name, db.name) - if err := s.DeleteShardGroup(db.name, rp.Name, g.ID); err != nil { - log.Printf("failed to request deletion of shard group %d: %s", g.ID, err.Error()) + type group struct { + Database string + Retention string + ID uint64 + } + + groups := make([]group, 0) + // Only keep the lock while walking the shard groups, so the lock is not held while + // any deletions take place across the cluster. + func() { + s.mu.RLock() + defer s.mu.RUnlock() + + // Check all shard groups. + for _, db := range s.databases { + for _, rp := range db.policies { + for _, g := range rp.shardGroups { + if rp.Duration != 0 && g.EndTime.Add(rp.Duration).Before(time.Now().UTC()) { + log.Printf("shard group %d, retention policy %s, database %s due for deletion", + g.ID, rp.Name, db.name) + groups = append(groups, group{Database: db.name, Retention: rp.Name, ID: g.ID}) } } } } + }() + + for _, g := range groups { + if err := s.DeleteShardGroup(g.Database, g.Retention, g.ID); err != nil { + log.Printf("failed to request deletion of shard group %d: %s", g.ID, err.Error()) + } } }