diff --git a/services/meta/client.go b/services/meta/client.go index 24a75b62ad..4e49eae7e6 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -322,11 +322,6 @@ func (c *Client) RetentionPolicy(database, name string) (rpi *RetentionPolicyInf return db.RetentionPolicy(name), nil } -// VisitRetentionPolicies executes the given function on all retention policies in all databases. -func (c *Client) VisitRetentionPolicies(f func(d DatabaseInfo, r RetentionPolicyInfo)) { - -} - // DropRetentionPolicy drops a retention policy from a database. func (c *Client) DropRetentionPolicy(database, name string) error { cmd := &internal.DropRetentionPolicyCommand{ diff --git a/services/retention/service.go b/services/retention/service.go index 7721a0eb28..c1fe050880 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -12,8 +12,7 @@ import ( // Service represents the retention policy enforcement service. type Service struct { MetaClient interface { - IsLeader() bool - VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo)) + Databases() ([]*meta.DatabaseInfo, error) DeleteShardGroup(database, policy string, id uint64) error } TSDBStore interface { @@ -71,24 +70,25 @@ func (s *Service) deleteShardGroups() { return case <-ticker.C: - // Only run this on the leader, but always allow the loop to check - // as the leader can change. - if !s.MetaClient.IsLeader() { + dbs, err := s.MetaClient.Databases() + if err != nil { + log.Println("error getting databases: %s", err.Error()) continue } - s.logger.Println("retention policy enforcement check commencing") - s.MetaClient.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) { - for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { - if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s", - g.ID, d.Name, r.Name, err.Error()) - } else { - s.logger.Printf("deleted shard group %d from database %s, retention policy %s", - g.ID, d.Name, r.Name) + for _, d := range dbs { + for _, r := range d.RetentionPolicies { + for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { + if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { + s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s", + g.ID, d.Name, r.Name, err.Error()) + } else { + s.logger.Printf("deleted shard group %d from database %s, retention policy %s", + g.ID, d.Name, r.Name) + } } } - }) + } } } } @@ -111,13 +111,19 @@ func (s *Service) deleteShards() { rp string } deletedShardIDs := make(map[uint64]deletionInfo, 0) - s.MetaClient.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) { - for _, g := range r.DeletedShardGroups() { - for _, sh := range g.Shards { - deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} + dbs, err := s.MetaClient.Databases() + if err != nil { + s.logger.Println("error getting databases: %s", err.Error()) + } + for _, d := range dbs { + for _, r := range d.RetentionPolicies { + for _, g := range r.DeletedShardGroups() { + for _, sh := range g.Shards { + deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name} + } } } - }) + } for _, id := range s.TSDBStore.ShardIDs() { if di, ok := deletedShardIDs[id]; ok {