From cd272ce6c335b05db8a201985f15534010846f20 Mon Sep 17 00:00:00 2001 From: Cory LaNou Date: Tue, 11 Oct 2016 13:26:04 -0500 Subject: [PATCH] fix retention policy creation inconsistencies --- CHANGELOG.md | 1 + cmd/influxd/run/server_helpers_test.go | 2 +- cmd/influxd/run/server_suite_test.go | 6 ++ coordinator/meta_client.go | 2 +- coordinator/meta_client_test.go | 6 +- coordinator/statement_executor.go | 8 +-- internal/meta_client.go | 6 +- services/graphite/service.go | 4 +- services/graphite/service_test.go | 2 +- services/meta/client.go | 8 +-- services/meta/client_test.go | 14 ++--- services/meta/data.go | 12 +++- services/meta/data_internal_test.go | 56 ++++++++++++++++++ services/meta/data_test.go | 81 +++++++++++++------------- 14 files changed, 137 insertions(+), 71 deletions(-) create mode 100644 services/meta/data_internal_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d39d5a9d3..75ded85c5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -76,6 +76,7 @@ The query language has been extended with a few new features: - [#7526](https://github.com/influxdata/influxdb/issues/7526): Truncate the version string when linking to the documentation. - [#7548](https://github.com/influxdata/influxdb/issues/7548): Fix output duration units for SHOW QUERIES. - [#7564](https://github.com/influxdata/influxdb/issues/7564): Fix incorrect grouping when multiple aggregates are used with sparse data. +- [#7448](https://github.com/influxdata/influxdb/pull/7448): Fix Retention Policy Inconsistencies ## v1.0.2 [2016-10-05] diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index e46548f389..fb4957dcbc 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -114,7 +114,7 @@ func (s *Server) URL() string { func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec) error { if _, err := s.MetaClient.CreateDatabase(db); err != nil { return err - } else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp); err != nil { + } else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp, true); err != nil { return err } return nil diff --git a/cmd/influxd/run/server_suite_test.go b/cmd/influxd/run/server_suite_test.go index 22baaf108e..9434fddd40 100644 --- a/cmd/influxd/run/server_suite_test.go +++ b/cmd/influxd/run/server_suite_test.go @@ -417,6 +417,12 @@ func init() { exp: `{"results":[{}]}`, once: true, }, + &Query{ + name: "create retention policy with default on", + command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1h REPLICATION 1 SHARD DURATION 30m DEFAULT`, + exp: `{"results":[{"error":"retention policy conflicts with an existing policy"}]}`, + once: true, + }, &Query{ name: "show retention policy should show both with custom shard", command: `SHOW RETENTION POLICIES ON db0`, diff --git a/coordinator/meta_client.go b/coordinator/meta_client.go index b7b3136085..44cd446b77 100644 --- a/coordinator/meta_client.go +++ b/coordinator/meta_client.go @@ -12,7 +12,7 @@ type MetaClient interface { CreateContinuousQuery(database, name, query string) error CreateDatabase(name string) (*meta.DatabaseInfo, error) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) - CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) + CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) CreateSubscription(database, rp, name, mode string, destinations []string) error CreateUser(name, password string, admin bool) (*meta.UserInfo, error) Database(name string) *meta.DatabaseInfo diff --git a/coordinator/meta_client_test.go b/coordinator/meta_client_test.go index 88dfd7b060..a39c7c42aa 100644 --- a/coordinator/meta_client_test.go +++ b/coordinator/meta_client_test.go @@ -12,7 +12,7 @@ type MetaClient struct { CreateContinuousQueryFn func(database, name, query string) error CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) - CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) + CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) DatabaseFn func(name string) *meta.DatabaseInfo @@ -52,8 +52,8 @@ func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, spec *meta.R return c.CreateDatabaseWithRetentionPolicyFn(name, spec) } -func (c *MetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { - return c.CreateRetentionPolicyFn(database, spec) +func (c *MetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) { + return c.CreateRetentionPolicyFn(database, spec, makeDefault) } func (c *MetaClient) DropShard(id uint64) error { diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index c4bc21cb44..fa35e5443d 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -271,17 +271,11 @@ func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql } // Create new retention policy. - rp, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec) + _, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec, stmt.Default) if err != nil { return err } - // If requested, set new policy as the default. - if stmt.Default { - if err := e.MetaClient.SetDefaultRetentionPolicy(stmt.Database, rp.Name); err != nil { - return err - } - } return nil } diff --git a/internal/meta_client.go b/internal/meta_client.go index f24361cb27..3d3d35c611 100644 --- a/internal/meta_client.go +++ b/internal/meta_client.go @@ -13,7 +13,7 @@ type MetaClientMock struct { CreateContinuousQueryFn func(database, name, query string) error CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error) CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) - CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) + CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) CreateShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error) @@ -63,8 +63,8 @@ func (c *MetaClientMock) CreateDatabaseWithRetentionPolicy(name string, spec *me return c.CreateDatabaseWithRetentionPolicyFn(name, spec) } -func (c *MetaClientMock) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { - return c.CreateRetentionPolicyFn(database, spec) +func (c *MetaClientMock) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) { + return c.CreateRetentionPolicyFn(database, spec, makeDefault) } func (c *MetaClientMock) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) { diff --git a/services/graphite/service.go b/services/graphite/service.go index cb7d43be57..fa7baf6b0b 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -85,7 +85,7 @@ type Service struct { MetaClient interface { CreateDatabase(name string) (*meta.DatabaseInfo, error) CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) - CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) + CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) Database(name string) *meta.DatabaseInfo RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error) } @@ -234,7 +234,7 @@ func (s *Service) createInternalStorage() error { if db := s.MetaClient.Database(s.database); db != nil { if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil { spec := meta.RetentionPolicySpec{Name: s.retentionPolicy} - if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil { + if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec, true); err != nil { return err } } diff --git a/services/graphite/service_test.go b/services/graphite/service_test.go index 468637f47c..bcc62d4026 100644 --- a/services/graphite/service_test.go +++ b/services/graphite/service_test.go @@ -271,7 +271,7 @@ func NewTestService(c *Config) *TestService { MetaClient: &internal.MetaClientMock{}, } - service.MetaClient.CreateRetentionPolicyFn = func(string, *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { + service.MetaClient.CreateRetentionPolicyFn = func(string, *meta.RetentionPolicySpec, bool) (*meta.RetentionPolicyInfo, error) { return nil, nil } diff --git a/services/meta/client.go b/services/meta/client.go index de160ebbc1..a08b3f7648 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -186,7 +186,7 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) { // create default retention policy if c.retentionAutoCreate { rpi := DefaultRetentionPolicyInfo() - if err := data.CreateRetentionPolicy(name, rpi); err != nil { + if err := data.CreateRetentionPolicy(name, rpi, true); err != nil { return nil, err } if err := data.SetDefaultRetentionPolicy(name, rpi.Name); err != nil { @@ -224,7 +224,7 @@ func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *RetentionP rpi := spec.NewRetentionPolicyInfo() if rp := db.RetentionPolicy(rpi.Name); rp == nil { - if err := data.CreateRetentionPolicy(name, rpi); err != nil { + if err := data.CreateRetentionPolicy(name, rpi, true); err != nil { return nil, err } } else if !spec.Matches(rp) { @@ -273,7 +273,7 @@ func (c *Client) DropDatabase(name string) error { } // CreateRetentionPolicy creates a retention policy on the specified database. -func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpec) (*RetentionPolicyInfo, error) { +func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpec, makeDefault bool) (*RetentionPolicyInfo, error) { c.mu.Lock() defer c.mu.Unlock() @@ -284,7 +284,7 @@ func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpe } rp := spec.NewRetentionPolicyInfo() - if err := data.CreateRetentionPolicy(database, rp); err != nil { + if err := data.CreateRetentionPolicy(database, rp, makeDefault); err != nil { return nil, err } diff --git a/services/meta/client_test.go b/services/meta/client_test.go index 3f3f059738..b371e76dd1 100644 --- a/services/meta/client_test.go +++ b/services/meta/client_test.go @@ -248,7 +248,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { ReplicaN: &rp0.ReplicaN, Duration: &rp0.Duration, ShardGroupDuration: rp0.ShardGroupDuration, - }); err != nil { + }, true); err != nil { t.Fatal(err) } @@ -265,7 +265,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { ReplicaN: &rp0.ReplicaN, Duration: &rp0.Duration, ShardGroupDuration: rp0.ShardGroupDuration, - }); err != nil { + }, true); err != nil { t.Fatal(err) } else if actual, err = c.RetentionPolicy("db0", "rp0"); err != nil { t.Fatal(err) @@ -283,7 +283,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { ReplicaN: &rp1.ReplicaN, Duration: &rp1.Duration, ShardGroupDuration: rp1.ShardGroupDuration, - }) + }, true) if exp := meta.ErrRetentionPolicyExists; got != exp { t.Fatalf("got error %v, expected error %v", got, exp) } @@ -298,7 +298,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { ReplicaN: &rp1.ReplicaN, Duration: &rp1.Duration, ShardGroupDuration: rp1.ShardGroupDuration, - }) + }, true) if exp := meta.ErrRetentionPolicyExists; got != exp { t.Fatalf("got error %v, expected error %v", got, exp) } @@ -313,7 +313,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { ReplicaN: &rp1.ReplicaN, Duration: &rp1.Duration, ShardGroupDuration: rp1.ShardGroupDuration, - }) + }, true) if exp := meta.ErrRetentionPolicyExists; got != exp { t.Fatalf("got error %v, expected error %v", got, exp) } @@ -329,7 +329,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) { ReplicaN: &rp1.ReplicaN, Duration: &rp1.Duration, ShardGroupDuration: rp1.ShardGroupDuration, - }) + }, true) if exp := meta.ErrIncompatibleDurations; got != exp { t.Fatalf("got error %v, expected error %v", got, exp) } @@ -481,7 +481,7 @@ func TestMetaClient_DropRetentionPolicy(t *testing.T) { Name: "rp0", Duration: &duration, ReplicaN: &replicaN, - }); err != nil { + }, true); err != nil { t.Fatal(err) } diff --git a/services/meta/data.go b/services/meta/data.go index 5a39b59bc0..517f71fed9 100644 --- a/services/meta/data.go +++ b/services/meta/data.go @@ -135,7 +135,7 @@ func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo, // CreateRetentionPolicy creates a new retention policy on a database. // Returns an error if name is blank or if a database does not exist. -func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) error { +func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error { // Validate retention policy. if rpi == nil { return ErrRetentionPolicyRequired @@ -163,11 +163,21 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration { return ErrRetentionPolicyExists } + // if they want to make it default, and it's not the default, it's not an identical command so it's an error + if makeDefault && di.DefaultRetentionPolicy != rpi.Name { + return ErrRetentionPolicyConflict + } return nil } // Append copy of new policy. di.RetentionPolicies = append(di.RetentionPolicies, *rpi) + + // Set the default if needed + if makeDefault { + di.DefaultRetentionPolicy = rpi.Name + } + return nil } diff --git a/services/meta/data_internal_test.go b/services/meta/data_internal_test.go new file mode 100644 index 0000000000..730f75e7d9 --- /dev/null +++ b/services/meta/data_internal_test.go @@ -0,0 +1,56 @@ +package meta + +import ( + "reflect" + "sort" + "time" + + "testing" +) + +func Test_newShardOwner(t *testing.T) { + // An error is returned if there are no data nodes available. + _, err := NewShardOwner(ShardInfo{}, map[int]int{}) + if err == nil { + t.Error("got no error, but expected one") + } + + ownerFreqs := map[int]int{1: 15, 2: 11, 3: 12} + id, err := NewShardOwner(ShardInfo{ID: 4}, ownerFreqs) + if err != nil { + t.Fatal(err) + } + + // The ID that owns the fewest shards is returned. + if got, exp := id, uint64(2); got != exp { + t.Errorf("got id %d, expected id %d", got, exp) + } + + // The ownership frequencies are updated. + if got, exp := ownerFreqs, map[int]int{1: 15, 2: 12, 3: 12}; !reflect.DeepEqual(got, exp) { + t.Errorf("got owner frequencies %v, expected %v", got, exp) + } +} + +func TestShardGroupSort(t *testing.T) { + sg1 := ShardGroupInfo{ + ID: 1, + StartTime: time.Unix(1000, 0), + EndTime: time.Unix(1100, 0), + TruncatedAt: time.Unix(1050, 0), + } + + sg2 := ShardGroupInfo{ + ID: 2, + StartTime: time.Unix(1000, 0), + EndTime: time.Unix(1100, 0), + } + + sgs := ShardGroupInfos{sg2, sg1} + + sort.Sort(sgs) + + if sgs[len(sgs)-1].ID != 2 { + t.Fatal("unstable sort for ShardGroupInfos") + } +} diff --git a/services/meta/data_test.go b/services/meta/data_test.go index 730f75e7d9..ede8355878 100644 --- a/services/meta/data_test.go +++ b/services/meta/data_test.go @@ -1,56 +1,55 @@ -package meta +package meta_test import ( - "reflect" - "sort" + "testing" "time" - "testing" + "github.com/influxdata/influxdb/services/meta" ) -func Test_newShardOwner(t *testing.T) { - // An error is returned if there are no data nodes available. - _, err := NewShardOwner(ShardInfo{}, map[int]int{}) - if err == nil { - t.Error("got no error, but expected one") - } +func Test_Data_CreateRetentionPolicy(t *testing.T) { + data := meta.Data{} - ownerFreqs := map[int]int{1: 15, 2: 11, 3: 12} - id, err := NewShardOwner(ShardInfo{ID: 4}, ownerFreqs) + err := data.CreateDatabase("foo") if err != nil { t.Fatal(err) } - // The ID that owns the fewest shards is returned. - if got, exp := id, uint64(2); got != exp { - t.Errorf("got id %d, expected id %d", got, exp) + err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{ + Name: "bar", + ReplicaN: 1, + Duration: 24 * time.Hour, + }, false) + if err != nil { + t.Fatal(err) } - // The ownership frequencies are updated. - if got, exp := ownerFreqs, map[int]int{1: 15, 2: 12, 3: 12}; !reflect.DeepEqual(got, exp) { - t.Errorf("got owner frequencies %v, expected %v", got, exp) - } -} - -func TestShardGroupSort(t *testing.T) { - sg1 := ShardGroupInfo{ - ID: 1, - StartTime: time.Unix(1000, 0), - EndTime: time.Unix(1100, 0), - TruncatedAt: time.Unix(1050, 0), - } - - sg2 := ShardGroupInfo{ - ID: 2, - StartTime: time.Unix(1000, 0), - EndTime: time.Unix(1100, 0), - } - - sgs := ShardGroupInfos{sg2, sg1} - - sort.Sort(sgs) - - if sgs[len(sgs)-1].ID != 2 { - t.Fatal("unstable sort for ShardGroupInfos") + rp, err := data.RetentionPolicy("foo", "bar") + if err != nil { + t.Fatal(err) + } + + if rp == nil { + t.Fatal("creation of retention policy failed") + } + + // Try to recreate the same RP with default set to true, should fail + err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{ + Name: "bar", + ReplicaN: 1, + Duration: 24 * time.Hour, + }, true) + if err == nil || err != meta.ErrRetentionPolicyConflict { + t.Fatalf("unexpected error. got: %v, exp: %s", err, meta.ErrRetentionPolicyConflict) + } + + // Creating the same RP with the same specifications should succeed + err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{ + Name: "bar", + ReplicaN: 1, + Duration: 24 * time.Hour, + }, false) + if err != nil { + t.Fatal(err) } }