prune shards in meta data
parent
5d63ebe311
commit
d54d32b9d4
|
@ -8,6 +8,7 @@
|
|||
- [#7554](https://github.com/influxdata/influxdb/pull/7554): update latest dependencies with Godeps.
|
||||
- [#7368](https://github.com/influxdata/influxdb/pull/7368): Introduce syntax for marking a partial response with chunking.
|
||||
- [#7356](https://github.com/influxdata/influxdb/issues/7356): Use X-Forwarded-For IP address in HTTP logger if present.
|
||||
- [#7601](https://github.com/influxdata/influxdb/issues/7601): Prune data in meta store for deleted shards.
|
||||
|
||||
### Bugfixes
|
||||
|
||||
|
|
|
@ -28,6 +28,10 @@ const (
|
|||
SaltBytes = 32
|
||||
|
||||
metaFile = "meta.db"
|
||||
|
||||
// ShardGroupDeletedExpiration is the amount of time before a shard group info will be removed from cached
|
||||
// data after it has been marked deleted (2 weeks)
|
||||
ShardGroupDeletedExpiration = -2 * 7 * 24 * time.Hour
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -654,6 +658,32 @@ func (c *Client) DropShard(id uint64) error {
|
|||
return c.commit(data)
|
||||
}
|
||||
|
||||
// PruneShardGroups remove deleted shard groups from the data store
|
||||
func (c *Client) PruneShardGroups() error {
|
||||
var changed bool
|
||||
expiration := time.Now().Add(ShardGroupDeletedExpiration)
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
data := c.cacheData.Clone()
|
||||
for i, d := range data.Databases {
|
||||
for j, rp := range d.RetentionPolicies {
|
||||
for _, sgi := range rp.ShardGroups {
|
||||
if sgi.DeletedAt.IsZero() || !expiration.After(sgi.DeletedAt) {
|
||||
continue
|
||||
}
|
||||
// we are safe to delete the shard group as it's been marked deleted for the required expiration
|
||||
s := append(rp.ShardGroups[:i], rp.ShardGroups[i+1:]...)
|
||||
data.Databases[i].RetentionPolicies[j].ShardGroups = s
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if changed {
|
||||
return c.commit(data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateShardGroup creates a shard group on a database and policy for a given timestamp.
|
||||
func (c *Client) CreateShardGroup(database, policy string, timestamp time.Time) (*ShardGroupInfo, error) {
|
||||
// Check under a read-lock
|
||||
|
|
|
@ -919,6 +919,73 @@ func TestMetaClient_CreateShardGroupIdempotent(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMetaClient_PruneShardGroups(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
d, c := newClient()
|
||||
defer os.RemoveAll(d)
|
||||
defer c.Close()
|
||||
|
||||
if _, err := c.CreateDatabase("db0"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
duration := 1 * time.Hour
|
||||
replicaN := 1
|
||||
|
||||
if _, err := c.CreateRetentionPolicy("db0", &meta.RetentionPolicySpec{
|
||||
Name: "rp0",
|
||||
Duration: &duration,
|
||||
ReplicaN: &replicaN,
|
||||
}, true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
sg, err := c.CreateShardGroup("db0", "autogen", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sg == nil {
|
||||
t.Fatalf("expected ShardGroup")
|
||||
}
|
||||
|
||||
sg, err = c.CreateShardGroup("db0", "rp0", time.Now())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
} else if sg == nil {
|
||||
t.Fatalf("expected ShardGroup")
|
||||
}
|
||||
|
||||
expiration := time.Now().Add(-2 * 7 * 24 * time.Hour).Add(-1 * time.Hour)
|
||||
|
||||
data := c.Data()
|
||||
data.Databases[0].RetentionPolicies[0].ShardGroups[0].DeletedAt = expiration
|
||||
|
||||
if err := c.SetData(&data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := c.PruneShardGroups(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
data = c.Data()
|
||||
rp, err := data.RetentionPolicy("db0", "autogen")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got, exp := len(rp.ShardGroups), 0; got != exp {
|
||||
t.Fatalf("failed to prune shard group. got: %d, exp: %d", got, exp)
|
||||
}
|
||||
|
||||
rp, err = data.RetentionPolicy("db0", "rp0")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if got, exp := len(rp.ShardGroups), 1; got != exp {
|
||||
t.Fatalf("failed to prune shard group. got: %d, exp: %d", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMetaClient_PersistClusterIDAfterRestart(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ type Service struct {
|
|||
MetaClient interface {
|
||||
Databases() []meta.DatabaseInfo
|
||||
DeleteShardGroup(database, policy string, id uint64) error
|
||||
PruneShardGroups() error
|
||||
}
|
||||
TSDBStore interface {
|
||||
ShardIDs() []uint64
|
||||
|
@ -130,6 +131,9 @@ func (s *Service) deleteShards() {
|
|||
id, di.db, di.rp)
|
||||
}
|
||||
}
|
||||
if err := s.MetaClient.PruneShardGroups(); err != nil {
|
||||
s.logger.Printf("error pruning shard groups: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue