Fix a regression introduced in #7449.

This commit ensures that create database with retention policy will work
correctly.
pull/7865/head
Edd Robinson 2017-01-23 17:57:32 +00:00
parent 3722fa383d
commit 4442cd7efa
2 changed files with 107 additions and 21 deletions

View File

@ -206,8 +206,23 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) {
return db, nil
}
// CreateDatabaseWithRetentionPolicy creates a database with the specified retention policy.
// CreateDatabaseWithRetentionPolicy creates a database with the specified
// retention policy.
//
// When creating a database with a retention policy, the retention policy will
// always be set to default. Therefore if the caller provides a retention policy
// that already exists on the database, but that retention policy is not the
// default one, an error will be returned.
//
// This call is only idempotent when the caller provides the exact same
// retention policy, and that retention policy is already the default for the
// database.
//
func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *RetentionPolicySpec) (*DatabaseInfo, error) {
if spec == nil {
return nil, errors.New("CreateDatabaseWithRetentionPolicy called with nil spec")
}
c.mu.Lock()
defer c.mu.Unlock()
@ -225,26 +240,29 @@ func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *RetentionP
db = data.Database(name)
}
// No existing retention policies, so we can create the provided policy as
// the new default policy.
rpi := spec.NewRetentionPolicyInfo()
if rp := db.RetentionPolicy(rpi.Name); rp == nil {
if len(db.RetentionPolicies) == 0 {
if err := data.CreateRetentionPolicy(name, rpi, true); err != nil {
return nil, err
}
} else if !spec.Matches(rp) {
// Verify that the retention policy with this name matches
// the one already created.
} else if !spec.Matches(db.RetentionPolicy(rpi.Name)) {
// In this case we already have a retention policy on the database and
// the provided retention policy does not match it. Therefore, this call
// is not idempotent and we need to return an error.
return nil, ErrRetentionPolicyConflict
}
// If no default retention policy has been set, set it to the retention
// policy we just created. If the default is different from what we are
// trying to create, record it as a conflict and abandon with an error.
if db.DefaultRetentionPolicy == "" {
db.DefaultRetentionPolicy = rpi.Name
} else if rpi.Name != db.DefaultRetentionPolicy {
// If a non-default retention policy was passed in that already exists then
// it's an error regardless of if the exact same retention policy is
// provided. CREATE DATABASE WITH RETENTION POLICY should only be used to
// create DEFAULT retention policies.
if db.DefaultRetentionPolicy != rpi.Name {
return nil, ErrRetentionPolicyConflict
}
// Commit the changes.
if err := c.commit(data); err != nil {
return nil, err
}

View File

@ -79,6 +79,12 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) {
defer os.RemoveAll(d)
defer c.Close()
// Calling CreateDatabaseWithRetentionPolicy with a nil spec should return
// an error
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", nil); err == nil {
t.Fatal("expected error")
}
duration := 1 * time.Hour
replicaN := 1
spec := meta.RetentionPolicySpec{
@ -115,8 +121,46 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) {
t.Fatal(err)
}
// If the rp's duration is different, an error should be returned.
// If create database is used by itself, no error should be returned and
// the default retention policy should not be changed.
if dbi, err := c.CreateDatabase("db0"); err != nil {
t.Fatalf("got %v, but expected %v", err, nil)
} else if dbi.DefaultRetentionPolicy != "rp0" {
t.Fatalf("got %v, but expected %v", dbi.DefaultRetentionPolicy, "rp0")
} else if got, exp := len(dbi.RetentionPolicies), 1; got != exp {
// Ensure no additional retention policies were created.
t.Fatalf("got %v, but expected %v", got, exp)
}
}
func TestMetaClient_CreateDatabaseWithRetentionPolicy_Conflict_Fields(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
duration := 1 * time.Hour
replicaN := 1
spec := meta.RetentionPolicySpec{
Name: "rp0",
Duration: &duration,
ReplicaN: &replicaN,
ShardGroupDuration: 60 * time.Minute,
}
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec); err != nil {
t.Fatal(err)
}
// If the rp's name is different, and error should be returned.
spec2 := spec
spec2.Name = spec.Name + "1"
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict {
t.Fatalf("got %v, but expected %v", err, meta.ErrRetentionPolicyConflict)
}
// If the rp's duration is different, an error should be returned.
spec2 = spec
duration2 := *spec.Duration + time.Minute
spec2.Duration = &duration2
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict {
@ -137,16 +181,40 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) {
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict {
t.Fatalf("got %v, but expected %v", err, meta.ErrRetentionPolicyConflict)
}
}
// If create database is used by itself, no error should be returned and
// the default retention policy should not be changed.
if dbi, err := c.CreateDatabase("db0"); err != nil {
t.Fatalf("got %v, but expected %v", err, nil)
} else if dbi.DefaultRetentionPolicy != "rp0" {
t.Fatalf("got %v, but expected %v", dbi.DefaultRetentionPolicy, "rp0")
} else if got, exp := len(dbi.RetentionPolicies), 1; got != exp {
// Ensure no additional retention policies were created.
t.Fatalf("got %v, but expected %v", got, exp)
func TestMetaClient_CreateDatabaseWithRetentionPolicy_Conflict_NonDefault(t *testing.T) {
t.Parallel()
d, c := newClient()
defer os.RemoveAll(d)
defer c.Close()
duration := 1 * time.Hour
replicaN := 1
spec := meta.RetentionPolicySpec{
Name: "rp0",
Duration: &duration,
ReplicaN: &replicaN,
ShardGroupDuration: 60 * time.Minute,
}
// Create a default retention policy.
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec); err != nil {
t.Fatal(err)
}
// Let's create a non-default retention policy.
spec2 := spec
spec2.Name = "rp1"
if _, err := c.CreateRetentionPolicy("db0", &spec2, false); err != nil {
t.Fatal(err)
}
// If we try to create a database with the non-default retention policy then
// it's an error.
if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict {
t.Fatalf("got %v, but expected %v", err, meta.ErrRetentionPolicyConflict)
}
}