Update meta client and retention service.
* Remove VisitRetentionPolicies from meta client. * Update retention enforcer to run on every data node.pull/5428/head
parent
70de1a7690
commit
0341bc3532
|
@ -322,11 +322,6 @@ func (c *Client) RetentionPolicy(database, name string) (rpi *RetentionPolicyInf
|
|||
return db.RetentionPolicy(name), nil
|
||||
}
|
||||
|
||||
// VisitRetentionPolicies executes the given function on all retention policies in all databases.
|
||||
func (c *Client) VisitRetentionPolicies(f func(d DatabaseInfo, r RetentionPolicyInfo)) {
|
||||
|
||||
}
|
||||
|
||||
// DropRetentionPolicy drops a retention policy from a database.
|
||||
func (c *Client) DropRetentionPolicy(database, name string) error {
|
||||
cmd := &internal.DropRetentionPolicyCommand{
|
||||
|
|
|
@ -12,8 +12,7 @@ import (
|
|||
// Service represents the retention policy enforcement service.
|
||||
type Service struct {
|
||||
MetaClient interface {
|
||||
IsLeader() bool
|
||||
VisitRetentionPolicies(f func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo))
|
||||
Databases() ([]*meta.DatabaseInfo, error)
|
||||
DeleteShardGroup(database, policy string, id uint64) error
|
||||
}
|
||||
TSDBStore interface {
|
||||
|
@ -71,24 +70,25 @@ func (s *Service) deleteShardGroups() {
|
|||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// Only run this on the leader, but always allow the loop to check
|
||||
// as the leader can change.
|
||||
if !s.MetaClient.IsLeader() {
|
||||
dbs, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
log.Println("error getting databases: %s", err.Error())
|
||||
continue
|
||||
}
|
||||
s.logger.Println("retention policy enforcement check commencing")
|
||||
|
||||
s.MetaClient.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) {
|
||||
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
|
||||
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
|
||||
s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s",
|
||||
g.ID, d.Name, r.Name, err.Error())
|
||||
} else {
|
||||
s.logger.Printf("deleted shard group %d from database %s, retention policy %s",
|
||||
g.ID, d.Name, r.Name)
|
||||
for _, d := range dbs {
|
||||
for _, r := range d.RetentionPolicies {
|
||||
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
|
||||
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
|
||||
s.logger.Printf("failed to delete shard group %d from database %s, retention policy %s: %s",
|
||||
g.ID, d.Name, r.Name, err.Error())
|
||||
} else {
|
||||
s.logger.Printf("deleted shard group %d from database %s, retention policy %s",
|
||||
g.ID, d.Name, r.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -111,13 +111,19 @@ func (s *Service) deleteShards() {
|
|||
rp string
|
||||
}
|
||||
deletedShardIDs := make(map[uint64]deletionInfo, 0)
|
||||
s.MetaClient.VisitRetentionPolicies(func(d meta.DatabaseInfo, r meta.RetentionPolicyInfo) {
|
||||
for _, g := range r.DeletedShardGroups() {
|
||||
for _, sh := range g.Shards {
|
||||
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
|
||||
dbs, err := s.MetaClient.Databases()
|
||||
if err != nil {
|
||||
s.logger.Println("error getting databases: %s", err.Error())
|
||||
}
|
||||
for _, d := range dbs {
|
||||
for _, r := range d.RetentionPolicies {
|
||||
for _, g := range r.DeletedShardGroups() {
|
||||
for _, sh := range g.Shards {
|
||||
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
for _, id := range s.TSDBStore.ShardIDs() {
|
||||
if di, ok := deletedShardIDs[id]; ok {
|
||||
|
|
Loading…
Reference in New Issue