Merge pull request #9647 from influxdata/bj-delete-deleted-shards

Delete deleted shards in retention service.
pull/9658/head
Ben Johnson 2018-03-28 12:08:58 -06:00 committed by GitHub
commit 93b4463736
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 22 additions and 5 deletions

View File

@ -96,6 +96,14 @@ func (s *Service) run() {
dbs := s.MetaClient.Databases()
for _, d := range dbs {
for _, r := range d.RetentionPolicies {
// Build list of already deleted shards.
for _, g := range r.DeletedShardGroups() {
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = deletionInfo{db: d.Name, rp: r.Name}
}
}
// Determine all shards that have expired and need to be deleted.
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
log.Info("Failed to delete shard group",

View File

@ -236,16 +236,16 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{})
// A database and a bunch of shards
var mu sync.Mutex
shards := []uint64{3, 5, 8, 9, 11}
localShards := []uint64{3, 5, 8, 9, 11}
shards := []uint64{3, 5, 8, 9, 11, 12}
localShards := []uint64{3, 5, 8, 9, 11, 12}
databases := []meta.DatabaseInfo{
{
Name: "db0",
RetentionPolicies: []meta.RetentionPolicyInfo{
{
Name: "autogen",
Duration: time.Millisecond,
ShardGroupDuration: time.Millisecond,
Duration: 24 * time.Hour,
ShardGroupDuration: 24 * time.Hour,
ShardGroups: []meta.ShardGroupInfo{
{
ID: 1,
@ -255,6 +255,15 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{})
{ID: 3}, {ID: 9},
},
},
{
ID: 2,
StartTime: time.Now().Add(-1 * time.Hour),
EndTime: time.Now(),
DeletedAt: time.Now(),
Shards: []meta.ShardInfo{
{ID: 11}, {ID: 12},
},
},
},
},
},
@ -326,7 +335,7 @@ func testService_8819_repro(t *testing.T) (*Service, chan error, chan struct{})
}
// We should have removed shards 3 and 9
if !reflect.DeepEqual(localShards, []uint64{5, 8, 11}) {
if !reflect.DeepEqual(localShards, []uint64{5, 8}) {
sendError(fmt.Errorf("removed shards still present locally: %v", localShards))
return nil
}