From e6aa5023eb4e54dbde7e8133ea426aec46fda9c2 Mon Sep 17 00:00:00 2001 From: Andrew Hare Date: Mon, 16 Oct 2017 20:34:26 -0600 Subject: [PATCH] Create a command to truncated shard groups --- CHANGELOG.md | 1 + coordinator/meta_client.go | 1 + coordinator/meta_client_test.go | 5 +++++ internal/meta_client.go | 5 +++++ services/meta/client.go | 10 ++++++++++ services/meta/data.go | 26 ++++++++++++++++++++++++++ 6 files changed, 48 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fa9098fedb..02635bcab7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ - [#8897](https://github.com/influxdata/influxdb/pull/8897): Add message pack format for query responses. - [#8886](https://github.com/influxdata/influxdb/pull/8886): Improved compaction scheduling - [#8690](https://github.com/influxdata/influxdb/issues/8690): Implicitly decide on a lower limit for fill queries when none is present. +- [#7355](https://github.com/influxdata/influxdb/issues/7355): Create a command to truncated shard groups ### Bugfixes diff --git a/coordinator/meta_client.go b/coordinator/meta_client.go index 1fb825fa33..fa3af5a323 100644 --- a/coordinator/meta_client.go +++ b/coordinator/meta_client.go @@ -27,6 +27,7 @@ type MetaClient interface { SetAdminPrivilege(username string, admin bool) error SetPrivilege(username, database string, p influxql.Privilege) error ShardGroupsByTimeRange(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) + TruncateShardGroups(t time.Time) error UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error UpdateUser(name, password string) error UserPrivilege(username, database string) (*influxql.Privilege, error) diff --git a/coordinator/meta_client_test.go b/coordinator/meta_client_test.go index fd4064dbdb..9e2bd9dc71 100644 --- a/coordinator/meta_client_test.go +++ b/coordinator/meta_client_test.go @@ -32,6 +32,7 @@ type MetaClient struct { SetAdminPrivilegeFn func(username string, admin bool) error SetPrivilegeFn func(username, database string, p influxql.Privilege) error ShardGroupsByTimeRangeFn func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) + TruncateShardGroupsFn func(t time.Time) error UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error UpdateUserFn func(name, password string) error UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) @@ -131,6 +132,10 @@ func (c *MetaClient) ShardGroupsByTimeRange(database, policy string, min, max ti return c.ShardGroupsByTimeRangeFn(database, policy, min, max) } +func (c *MetaClient) TruncateShardGroups(t time.Time) error { + return c.TruncateShardGroupsFn(t) +} + func (c *MetaClient) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error { return c.UpdateRetentionPolicyFn(database, name, rpu, makeDefault) } diff --git a/internal/meta_client.go b/internal/meta_client.go index e87607ebcc..0703fb1b98 100644 --- a/internal/meta_client.go +++ b/internal/meta_client.go @@ -41,6 +41,7 @@ type MetaClientMock struct { SetPrivilegeFn func(username, database string, p influxql.Privilege) error ShardGroupsByTimeRangeFn func(database, policy string, min, max time.Time) (a []meta.ShardGroupInfo, err error) ShardOwnerFn func(shardID uint64) (database, policy string, sgi *meta.ShardGroupInfo) + TruncateShardGroupsFn func(t time.Time) error UpdateRetentionPolicyFn func(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error UpdateUserFn func(name, password string) error UserPrivilegeFn func(username, database string) (*influxql.Privilege, error) @@ -137,6 +138,10 @@ func (c *MetaClientMock) ShardOwner(shardID uint64) (database, policy string, sg return c.ShardOwnerFn(shardID) } +func (c *MetaClientMock) TruncateShardGroups(t time.Time) error { + return c.TruncateShardGroupsFn(t) +} + func (c *MetaClientMock) UpdateRetentionPolicy(database, name string, rpu *meta.RetentionPolicyUpdate, makeDefault bool) error { return c.UpdateRetentionPolicyFn(database, name, rpu, makeDefault) } diff --git a/services/meta/client.go b/services/meta/client.go index 6bba53dc7d..9061a25361 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -672,6 +672,16 @@ func (c *Client) DropShard(id uint64) error { return c.commit(data) } +// TruncateShardGroups truncates any shard group that could contain timestamps beyond t. +func (c *Client) TruncateShardGroups(t time.Time) error { + c.mu.Lock() + defer c.mu.Unlock() + + data := c.cacheData.Clone() + data.TruncateShardGroups(t) + return c.commit(data) +} + // PruneShardGroups remove deleted shard groups from the data store. func (c *Client) PruneShardGroups() error { var changed bool diff --git a/services/meta/data.go b/services/meta/data.go index a48793f19b..b3bf107be9 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -746,6 +746,32 @@ func (data *Data) UnmarshalBinary(buf []byte) error { return nil } +// TruncateShardGroups truncates any shard group that could contain timestamps beyond t. +func (data *Data) TruncateShardGroups(t time.Time) { + for i := range data.Databases { + dbi := &data.Databases[i] + + for j := range dbi.RetentionPolicies { + rpi := &dbi.RetentionPolicies[j] + + for k := range rpi.ShardGroups { + sgi := &rpi.ShardGroups[k] + + if !t.Before(sgi.EndTime) || sgi.Deleted() || (sgi.Truncated() && sgi.TruncatedAt.Before(t)) { + continue + } + + if !t.After(sgi.StartTime) { + // future shardgroup + sgi.TruncatedAt = sgi.StartTime + } else { + sgi.TruncatedAt = t + } + } + } + } +} + // hasAdminUser exhaustively checks for the presence of at least one admin // user. func (data *Data) hasAdminUser() bool {