From d54d32b9d4c6bc529abf5b2e0bb4eebe793c3937 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Wed, 9 Nov 2016 16:01:12 -0600 Subject: [PATCH] prune shards in meta data --- CHANGELOG.md | 1 + services/meta/client.go | 30 ++++++++++++++++ services/meta/client_test.go | 67 +++++++++++++++++++++++++++++++++++ services/retention/service.go | 4 +++ 4 files changed, 102 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 321f9aa3cb..dbf16b2e0f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ - [#7554](https://github.com/influxdata/influxdb/pull/7554): update latest dependencies with Godeps. - [#7368](https://github.com/influxdata/influxdb/pull/7368): Introduce syntax for marking a partial response with chunking. - [#7356](https://github.com/influxdata/influxdb/issues/7356): Use X-Forwarded-For IP address in HTTP logger if present. +- [#7601](https://github.com/influxdata/influxdb/issues/7601): Prune data in meta store for deleted shards. ### Bugfixes diff --git a/services/meta/client.go b/services/meta/client.go index c59aea6e23..7cdc32f931 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -28,6 +28,10 @@ const ( SaltBytes = 32 metaFile = "meta.db" + + // ShardGroupDeletedExpiration is the amount of time before a shard group info will be removed from cached + // data after it has been marked deleted (2 weeks) + ShardGroupDeletedExpiration = -2 * 7 * 24 * time.Hour ) var ( @@ -654,6 +658,32 @@ func (c *Client) DropShard(id uint64) error { return c.commit(data) } +// PruneShardGroups remove deleted shard groups from the data store +func (c *Client) PruneShardGroups() error { + var changed bool + expiration := time.Now().Add(ShardGroupDeletedExpiration) + c.mu.Lock() + defer c.mu.Unlock() + data := c.cacheData.Clone() + for i, d := range data.Databases { + for j, rp := range d.RetentionPolicies { + for _, sgi := range rp.ShardGroups { + if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) { + continue + } + // we are safe to delete the shard group as it's been marked deleted for the required expiration + s := append(rp.ShardGroups[:i], rp.ShardGroups[i+1:]...) + data.Databases[i].RetentionPolicies[j].ShardGroups = s + changed = true + } + } + } + if changed { + return c.commit(data) + } + return nil +} + // CreateShardGroup creates a shard group on a database and policy for a given timestamp. func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) { // Check under a read-lock diff --git a/services/meta/client_test.go b/services/meta/client_test.go index 156686aba1..65bc450ef5 100644 --- a/services/meta/client_test.go +++ b/services/meta/client_test.go @@ -919,6 +919,73 @@ func TestMetaClient_CreateShardGroupIdempotent(t *testing.T) { } } +func TestMetaClient_PruneShardGroups(t *testing.T) { + t.Parallel() + + d, c := newClient() + defer os.RemoveAll(d) + defer c.Close() + + if _, err := c.CreateDatabase("db0"); err != nil { + t.Fatal(err) + } + + duration := 1 * time.Hour + replicaN := 1 + + if _, err := c.CreateRetentionPolicy("db0", &meta.RetentionPolicySpec{ + Name: "rp0", + Duration: &duration, + ReplicaN: &replicaN, + }, true); err != nil { + t.Fatal(err) + } + + sg, err := c.CreateShardGroup("db0", "autogen", time.Now()) + if err != nil { + t.Fatal(err) + } else if sg == nil { + t.Fatalf("expected ShardGroup") + } + + sg, err = c.CreateShardGroup("db0", "rp0", time.Now()) + if err != nil { + t.Fatal(err) + } else if sg == nil { + t.Fatalf("expected ShardGroup") + } + + expiration := time.Now().Add(-2 * 7 * 24 * time.Hour).Add(-1 * time.Hour) + + data := c.Data() + data.Databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = expiration + + if err := c.SetData(&data); err != nil { + t.Fatal(err) + } + + if err := c.PruneShardGroups(); err != nil { + t.Fatal(err) + } + + data = c.Data() + rp, err := data.RetentionPolicy("db0", "autogen") + if err != nil { + t.Fatal(err) + } + if got, exp := len(rp.ShardGroups), 0; got != exp { + t.Fatalf("failed to prune shard group. got: %d, exp: %d", got, exp) + } + + rp, err = data.RetentionPolicy("db0", "rp0") + if err != nil { + t.Fatal(err) + } + if got, exp := len(rp.ShardGroups), 1; got != exp { + t.Fatalf("failed to prune shard group. got: %d, exp: %d", got, exp) + } +} + func TestMetaClient_PersistClusterIDAfterRestart(t *testing.T) { t.Parallel() diff --git a/services/retention/service.go b/services/retention/service.go index 858da75bf1..e39d9d5f0b 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -15,6 +15,7 @@ type Service struct { MetaClient interface { Databases() []meta.DatabaseInfo DeleteShardGroup(database, policy string, id uint64) error + PruneShardGroups() error } TSDBStore interface { ShardIDs() []uint64 @@ -130,6 +131,9 @@ func (s *Service) deleteShards() { id, di.db, di.rp) } } + if err := s.MetaClient.PruneShardGroups(); err != nil { + s.logger.Printf("error pruning shard groups: %s", err) + } } } }