fix retention policy creation inconsistencies
parent
4b85371a71
commit
cd272ce6c3
|
@ -76,6 +76,7 @@ The query language has been extended with a few new features:
|
|||
- [#7526](https://github.com/influxdata/influxdb/issues/7526): Truncate the version string when linking to the documentation.
|
||||
- [#7548](https://github.com/influxdata/influxdb/issues/7548): Fix output duration units for SHOW QUERIES.
|
||||
- [#7564](https://github.com/influxdata/influxdb/issues/7564): Fix incorrect grouping when multiple aggregates are used with sparse data.
|
||||
- [#7448](https://github.com/influxdata/influxdb/pull/7448): Fix Retention Policy Inconsistencies
|
||||
|
||||
## v1.0.2 [2016-10-05]
|
||||
|
||||
|
|
|
@ -114,7 +114,7 @@ func (s *Server) URL() string {
|
|||
func (s *Server) CreateDatabaseAndRetentionPolicy(db string, rp *meta.RetentionPolicySpec) error {
|
||||
if _, err := s.MetaClient.CreateDatabase(db); err != nil {
|
||||
return err
|
||||
} else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp); err != nil {
|
||||
} else if _, err := s.MetaClient.CreateRetentionPolicy(db, rp, true); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -417,6 +417,12 @@ func init() {
|
|||
exp: `{"results":[{}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "create retention policy with default on",
|
||||
command: `CREATE RETENTION POLICY rp3 ON db0 DURATION 1h REPLICATION 1 SHARD DURATION 30m DEFAULT`,
|
||||
exp: `{"results":[{"error":"retention policy conflicts with an existing policy"}]}`,
|
||||
once: true,
|
||||
},
|
||||
&Query{
|
||||
name: "show retention policy should show both with custom shard",
|
||||
command: `SHOW RETENTION POLICIES ON db0`,
|
||||
|
|
|
@ -12,7 +12,7 @@ type MetaClient interface {
|
|||
CreateContinuousQuery(database, name, query string) error
|
||||
CreateDatabase(name string) (*meta.DatabaseInfo, error)
|
||||
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
|
||||
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
|
||||
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
|
||||
CreateSubscription(database, rp, name, mode string, destinations []string) error
|
||||
CreateUser(name, password string, admin bool) (*meta.UserInfo, error)
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
|
|
|
@ -12,7 +12,7 @@ type MetaClient struct {
|
|||
CreateContinuousQueryFn func(database, name, query string) error
|
||||
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
|
||||
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
|
||||
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
|
||||
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
|
||||
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
|
||||
DatabaseFn func(name string) *meta.DatabaseInfo
|
||||
|
@ -52,8 +52,8 @@ func (c *MetaClient) CreateDatabaseWithRetentionPolicy(name string, spec *meta.R
|
|||
return c.CreateDatabaseWithRetentionPolicyFn(name, spec)
|
||||
}
|
||||
|
||||
func (c *MetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
|
||||
return c.CreateRetentionPolicyFn(database, spec)
|
||||
func (c *MetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) {
|
||||
return c.CreateRetentionPolicyFn(database, spec, makeDefault)
|
||||
}
|
||||
|
||||
func (c *MetaClient) DropShard(id uint64) error {
|
||||
|
|
|
@ -271,17 +271,11 @@ func (e *StatementExecutor) executeCreateRetentionPolicyStatement(stmt *influxql
|
|||
}
|
||||
|
||||
// Create new retention policy.
|
||||
rp, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec)
|
||||
_, err := e.MetaClient.CreateRetentionPolicy(stmt.Database, &spec, stmt.Default)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// If requested, set new policy as the default.
|
||||
if stmt.Default {
|
||||
if err := e.MetaClient.SetDefaultRetentionPolicy(stmt.Database, rp.Name); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@ type MetaClientMock struct {
|
|||
CreateContinuousQueryFn func(database, name, query string) error
|
||||
CreateDatabaseFn func(name string) (*meta.DatabaseInfo, error)
|
||||
CreateDatabaseWithRetentionPolicyFn func(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
|
||||
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
|
||||
CreateRetentionPolicyFn func(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
|
||||
CreateShardGroupFn func(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error)
|
||||
CreateSubscriptionFn func(database, rp, name, mode string, destinations []string) error
|
||||
CreateUserFn func(name, password string, admin bool) (*meta.UserInfo, error)
|
||||
|
@ -63,8 +63,8 @@ func (c *MetaClientMock) CreateDatabaseWithRetentionPolicy(name string, spec *me
|
|||
return c.CreateDatabaseWithRetentionPolicyFn(name, spec)
|
||||
}
|
||||
|
||||
func (c *MetaClientMock) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
|
||||
return c.CreateRetentionPolicyFn(database, spec)
|
||||
func (c *MetaClientMock) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) {
|
||||
return c.CreateRetentionPolicyFn(database, spec, makeDefault)
|
||||
}
|
||||
|
||||
func (c *MetaClientMock) CreateShardGroup(database, policy string, timestamp time.Time) (*meta.ShardGroupInfo, error) {
|
||||
|
|
|
@ -85,7 +85,7 @@ type Service struct {
|
|||
MetaClient interface {
|
||||
CreateDatabase(name string) (*meta.DatabaseInfo, error)
|
||||
CreateDatabaseWithRetentionPolicy(name string, spec *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error)
|
||||
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error)
|
||||
CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error)
|
||||
Database(name string) *meta.DatabaseInfo
|
||||
RetentionPolicy(database, name string) (*meta.RetentionPolicyInfo, error)
|
||||
}
|
||||
|
@ -234,7 +234,7 @@ func (s *Service) createInternalStorage() error {
|
|||
if db := s.MetaClient.Database(s.database); db != nil {
|
||||
if rp, _ := s.MetaClient.RetentionPolicy(s.database, s.retentionPolicy); rp == nil {
|
||||
spec := meta.RetentionPolicySpec{Name: s.retentionPolicy}
|
||||
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec); err != nil {
|
||||
if _, err := s.MetaClient.CreateRetentionPolicy(s.database, &spec, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -271,7 +271,7 @@ func NewTestService(c *Config) *TestService {
|
|||
MetaClient: &internal.MetaClientMock{},
|
||||
}
|
||||
|
||||
service.MetaClient.CreateRetentionPolicyFn = func(string, *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) {
|
||||
service.MetaClient.CreateRetentionPolicyFn = func(string, *meta.RetentionPolicySpec, bool) (*meta.RetentionPolicyInfo, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -186,7 +186,7 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
|
|||
// create default retention policy
|
||||
if c.retentionAutoCreate {
|
||||
rpi := DefaultRetentionPolicyInfo()
|
||||
if err := data.CreateRetentionPolicy(name, rpi); err != nil {
|
||||
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := data.SetDefaultRetentionPolicy(name, rpi.Name); err != nil {
|
||||
|
@ -224,7 +224,7 @@ func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *RetentionP
|
|||
|
||||
rpi := spec.NewRetentionPolicyInfo()
|
||||
if rp := db.RetentionPolicy(rpi.Name); rp == nil {
|
||||
if err := data.CreateRetentionPolicy(name, rpi); err != nil {
|
||||
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else if !spec.Matches(rp) {
|
||||
|
@ -273,7 +273,7 @@ func (c *Client) DropDatabase(name string) error {
|
|||
}
|
||||
|
||||
// CreateRetentionPolicy creates a retention policy on the specified database.
|
||||
func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpec) (*RetentionPolicyInfo, error) {
|
||||
func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpec, makeDefault bool) (*RetentionPolicyInfo, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
|
@ -284,7 +284,7 @@ func (c *Client) CreateRetentionPolicy(database string, spec *RetentionPolicySpe
|
|||
}
|
||||
|
||||
rp := spec.NewRetentionPolicyInfo()
|
||||
if err := data.CreateRetentionPolicy(database, rp); err != nil {
|
||||
if err := data.CreateRetentionPolicy(database, rp, makeDefault); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
|
@ -248,7 +248,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
ReplicaN: &rp0.ReplicaN,
|
||||
Duration: &rp0.Duration,
|
||||
ShardGroupDuration: rp0.ShardGroupDuration,
|
||||
}); err != nil {
|
||||
}, true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
@ -265,7 +265,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
ReplicaN: &rp0.ReplicaN,
|
||||
Duration: &rp0.Duration,
|
||||
ShardGroupDuration: rp0.ShardGroupDuration,
|
||||
}); err != nil {
|
||||
}, true); err != nil {
|
||||
t.Fatal(err)
|
||||
} else if actual, err = c.RetentionPolicy("db0", "rp0"); err != nil {
|
||||
t.Fatal(err)
|
||||
|
@ -283,7 +283,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
ReplicaN: &rp1.ReplicaN,
|
||||
Duration: &rp1.Duration,
|
||||
ShardGroupDuration: rp1.ShardGroupDuration,
|
||||
})
|
||||
}, true)
|
||||
if exp := meta.ErrRetentionPolicyExists; got != exp {
|
||||
t.Fatalf("got error %v, expected error %v", got, exp)
|
||||
}
|
||||
|
@ -298,7 +298,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
ReplicaN: &rp1.ReplicaN,
|
||||
Duration: &rp1.Duration,
|
||||
ShardGroupDuration: rp1.ShardGroupDuration,
|
||||
})
|
||||
}, true)
|
||||
if exp := meta.ErrRetentionPolicyExists; got != exp {
|
||||
t.Fatalf("got error %v, expected error %v", got, exp)
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
ReplicaN: &rp1.ReplicaN,
|
||||
Duration: &rp1.Duration,
|
||||
ShardGroupDuration: rp1.ShardGroupDuration,
|
||||
})
|
||||
}, true)
|
||||
if exp := meta.ErrRetentionPolicyExists; got != exp {
|
||||
t.Fatalf("got error %v, expected error %v", got, exp)
|
||||
}
|
||||
|
@ -329,7 +329,7 @@ func TestMetaClient_CreateRetentionPolicy(t *testing.T) {
|
|||
ReplicaN: &rp1.ReplicaN,
|
||||
Duration: &rp1.Duration,
|
||||
ShardGroupDuration: rp1.ShardGroupDuration,
|
||||
})
|
||||
}, true)
|
||||
if exp := meta.ErrIncompatibleDurations; got != exp {
|
||||
t.Fatalf("got error %v, expected error %v", got, exp)
|
||||
}
|
||||
|
@ -481,7 +481,7 @@ func TestMetaClient_DropRetentionPolicy(t *testing.T) {
|
|||
Name: "rp0",
|
||||
Duration: &duration,
|
||||
ReplicaN: &replicaN,
|
||||
}); err != nil {
|
||||
}, true); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ func (data *Data) RetentionPolicy(database, name string) (*RetentionPolicyInfo,
|
|||
|
||||
// CreateRetentionPolicy creates a new retention policy on a database.
|
||||
// Returns an error if name is blank or if a database does not exist.
|
||||
func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo) error {
|
||||
func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInfo, makeDefault bool) error {
|
||||
// Validate retention policy.
|
||||
if rpi == nil {
|
||||
return ErrRetentionPolicyRequired
|
||||
|
@ -163,11 +163,21 @@ func (data *Data) CreateRetentionPolicy(database string, rpi *RetentionPolicyInf
|
|||
if rp.ReplicaN != rpi.ReplicaN || rp.Duration != rpi.Duration || rp.ShardGroupDuration != rpi.ShardGroupDuration {
|
||||
return ErrRetentionPolicyExists
|
||||
}
|
||||
// if they want to make it default, and it's not the default, it's not an identical command so it's an error
|
||||
if makeDefault && di.DefaultRetentionPolicy != rpi.Name {
|
||||
return ErrRetentionPolicyConflict
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Append copy of new policy.
|
||||
di.RetentionPolicies = append(di.RetentionPolicies, *rpi)
|
||||
|
||||
// Set the default if needed
|
||||
if makeDefault {
|
||||
di.DefaultRetentionPolicy = rpi.Name
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"testing"
|
||||
)
|
||||
|
||||
func Test_newShardOwner(t *testing.T) {
|
||||
// An error is returned if there are no data nodes available.
|
||||
_, err := NewShardOwner(ShardInfo{}, map[int]int{})
|
||||
if err == nil {
|
||||
t.Error("got no error, but expected one")
|
||||
}
|
||||
|
||||
ownerFreqs := map[int]int{1: 15, 2: 11, 3: 12}
|
||||
id, err := NewShardOwner(ShardInfo{ID: 4}, ownerFreqs)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The ID that owns the fewest shards is returned.
|
||||
if got, exp := id, uint64(2); got != exp {
|
||||
t.Errorf("got id %d, expected id %d", got, exp)
|
||||
}
|
||||
|
||||
// The ownership frequencies are updated.
|
||||
if got, exp := ownerFreqs, map[int]int{1: 15, 2: 12, 3: 12}; !reflect.DeepEqual(got, exp) {
|
||||
t.Errorf("got owner frequencies %v, expected %v", got, exp)
|
||||
}
|
||||
}
|
||||
|
||||
func TestShardGroupSort(t *testing.T) {
|
||||
sg1 := ShardGroupInfo{
|
||||
ID: 1,
|
||||
StartTime: time.Unix(1000, 0),
|
||||
EndTime: time.Unix(1100, 0),
|
||||
TruncatedAt: time.Unix(1050, 0),
|
||||
}
|
||||
|
||||
sg2 := ShardGroupInfo{
|
||||
ID: 2,
|
||||
StartTime: time.Unix(1000, 0),
|
||||
EndTime: time.Unix(1100, 0),
|
||||
}
|
||||
|
||||
sgs := ShardGroupInfos{sg2, sg1}
|
||||
|
||||
sort.Sort(sgs)
|
||||
|
||||
if sgs[len(sgs)-1].ID != 2 {
|
||||
t.Fatal("unstable sort for ShardGroupInfos")
|
||||
}
|
||||
}
|
|
@ -1,56 +1,55 @@
|
|||
package meta
|
||||
package meta_test
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"testing"
|
||||
"github.com/influxdata/influxdb/services/meta"
|
||||
)
|
||||
|
||||
func Test_newShardOwner(t *testing.T) {
|
||||
// An error is returned if there are no data nodes available.
|
||||
_, err := NewShardOwner(ShardInfo{}, map[int]int{})
|
||||
if err == nil {
|
||||
t.Error("got no error, but expected one")
|
||||
}
|
||||
func Test_Data_CreateRetentionPolicy(t *testing.T) {
|
||||
data := meta.Data{}
|
||||
|
||||
ownerFreqs := map[int]int{1: 15, 2: 11, 3: 12}
|
||||
id, err := NewShardOwner(ShardInfo{ID: 4}, ownerFreqs)
|
||||
err := data.CreateDatabase("foo")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The ID that owns the fewest shards is returned.
|
||||
if got, exp := id, uint64(2); got != exp {
|
||||
t.Errorf("got id %d, expected id %d", got, exp)
|
||||
err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{
|
||||
Name: "bar",
|
||||
ReplicaN: 1,
|
||||
Duration: 24 * time.Hour,
|
||||
}, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// The ownership frequencies are updated.
|
||||
if got, exp := ownerFreqs, map[int]int{1: 15, 2: 12, 3: 12}; !reflect.DeepEqual(got, exp) {
|
||||
t.Errorf("got owner frequencies %v, expected %v", got, exp)
|
||||
}
|
||||
rp, err := data.RetentionPolicy("foo", "bar")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
func TestShardGroupSort(t *testing.T) {
|
||||
sg1 := ShardGroupInfo{
|
||||
ID: 1,
|
||||
StartTime: time.Unix(1000, 0),
|
||||
EndTime: time.Unix(1100, 0),
|
||||
TruncatedAt: time.Unix(1050, 0),
|
||||
if rp == nil {
|
||||
t.Fatal("creation of retention policy failed")
|
||||
}
|
||||
|
||||
sg2 := ShardGroupInfo{
|
||||
ID: 2,
|
||||
StartTime: time.Unix(1000, 0),
|
||||
EndTime: time.Unix(1100, 0),
|
||||
// Try to recreate the same RP with default set to true, should fail
|
||||
err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{
|
||||
Name: "bar",
|
||||
ReplicaN: 1,
|
||||
Duration: 24 * time.Hour,
|
||||
}, true)
|
||||
if err == nil || err != meta.ErrRetentionPolicyConflict {
|
||||
t.Fatalf("unexpected error. got: %v, exp: %s", err, meta.ErrRetentionPolicyConflict)
|
||||
}
|
||||
|
||||
sgs := ShardGroupInfos{sg2, sg1}
|
||||
|
||||
sort.Sort(sgs)
|
||||
|
||||
if sgs[len(sgs)-1].ID != 2 {
|
||||
t.Fatal("unstable sort for ShardGroupInfos")
|
||||
// Creating the same RP with the same specifications should succeed
|
||||
err = data.CreateRetentionPolicy("foo", &meta.RetentionPolicyInfo{
|
||||
Name: "bar",
|
||||
ReplicaN: 1,
|
||||
Duration: 24 * time.Hour,
|
||||
}, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue