diff --git a/services/meta/client.go b/services/meta/client.go index fce1545b4d..72715f6e11 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -206,8 +206,23 @@ func (c *Client) CreateDatabase(name string) (*DatabaseInfo, error) { return db, nil } -// CreateDatabaseWithRetentionPolicy creates a database with the specified retention policy. +// CreateDatabaseWithRetentionPolicy creates a database with the specified +// retention policy. +// +// When creating a database with a retention policy, the retention policy will +// always be set to default. Therefore if the caller provides a retention policy +// that already exists on the database, but that retention policy is not the +// default one, an error will be returned. +// +// This call is only idempotent when the caller provides the exact same +// retention policy, and that retention policy is already the default for the +// database. +// func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *RetentionPolicySpec) (*DatabaseInfo, error) { + if spec == nil { + return nil, errors.New("CreateDatabaseWithRetentionPolicy called with nil spec") + } + c.mu.Lock() defer c.mu.Unlock() @@ -225,26 +240,29 @@ func (c *Client) CreateDatabaseWithRetentionPolicy(name string, spec *RetentionP db = data.Database(name) } + // No existing retention policies, so we can create the provided policy as + // the new default policy. rpi := spec.NewRetentionPolicyInfo() - if rp := db.RetentionPolicy(rpi.Name); rp == nil { + if len(db.RetentionPolicies) == 0 { if err := data.CreateRetentionPolicy(name, rpi, true); err != nil { return nil, err } - } else if !spec.Matches(rp) { - // Verify that the retention policy with this name matches - // the one already created. + } else if !spec.Matches(db.RetentionPolicy(rpi.Name)) { + // In this case we already have a retention policy on the database and + // the provided retention policy does not match it. Therefore, this call + // is not idempotent and we need to return an error. return nil, ErrRetentionPolicyConflict } - // If no default retention policy has been set, set it to the retention - // policy we just created. If the default is different from what we are - // trying to create, record it as a conflict and abandon with an error. - if db.DefaultRetentionPolicy == "" { - db.DefaultRetentionPolicy = rpi.Name - } else if rpi.Name != db.DefaultRetentionPolicy { + // If a non-default retention policy was passed in that already exists then + // it's an error regardless of if the exact same retention policy is + // provided. CREATE DATABASE WITH RETENTION POLICY should only be used to + // create DEFAULT retention policies. + if db.DefaultRetentionPolicy != rpi.Name { return nil, ErrRetentionPolicyConflict } + // Commit the changes. if err := c.commit(data); err != nil { return nil, err } diff --git a/services/meta/client_test.go b/services/meta/client_test.go index f48a827ebe..cac5292b3d 100644 --- a/services/meta/client_test.go +++ b/services/meta/client_test.go @@ -79,6 +79,12 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) { defer os.RemoveAll(d) defer c.Close() + // Calling CreateDatabaseWithRetentionPolicy with a nil spec should return + // an error + if _, err := c.CreateDatabaseWithRetentionPolicy("db0", nil); err == nil { + t.Fatal("expected error") + } + duration := 1 * time.Hour replicaN := 1 spec := meta.RetentionPolicySpec{ @@ -115,8 +121,46 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) { t.Fatal(err) } - // If the rp's duration is different, an error should be returned. + // If create database is used by itself, no error should be returned and + // the default retention policy should not be changed. + if dbi, err := c.CreateDatabase("db0"); err != nil { + t.Fatalf("got %v, but expected %v", err, nil) + } else if dbi.DefaultRetentionPolicy != "rp0" { + t.Fatalf("got %v, but expected %v", dbi.DefaultRetentionPolicy, "rp0") + } else if got, exp := len(dbi.RetentionPolicies), 1; got != exp { + // Ensure no additional retention policies were created. + t.Fatalf("got %v, but expected %v", got, exp) + } +} + +func TestMetaClient_CreateDatabaseWithRetentionPolicy_Conflict_Fields(t *testing.T) { + t.Parallel() + + d, c := newClient() + defer os.RemoveAll(d) + defer c.Close() + + duration := 1 * time.Hour + replicaN := 1 + spec := meta.RetentionPolicySpec{ + Name: "rp0", + Duration: &duration, + ReplicaN: &replicaN, + ShardGroupDuration: 60 * time.Minute, + } + if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec); err != nil { + t.Fatal(err) + } + + // If the rp's name is different, and error should be returned. spec2 := spec + spec2.Name = spec.Name + "1" + if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict { + t.Fatalf("got %v, but expected %v", err, meta.ErrRetentionPolicyConflict) + } + + // If the rp's duration is different, an error should be returned. + spec2 = spec duration2 := *spec.Duration + time.Minute spec2.Duration = &duration2 if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict { @@ -137,16 +181,40 @@ func TestMetaClient_CreateDatabaseWithRetentionPolicy(t *testing.T) { if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict { t.Fatalf("got %v, but expected %v", err, meta.ErrRetentionPolicyConflict) } +} - // If create database is used by itself, no error should be returned and - // the default retention policy should not be changed. - if dbi, err := c.CreateDatabase("db0"); err != nil { - t.Fatalf("got %v, but expected %v", err, nil) - } else if dbi.DefaultRetentionPolicy != "rp0" { - t.Fatalf("got %v, but expected %v", dbi.DefaultRetentionPolicy, "rp0") - } else if got, exp := len(dbi.RetentionPolicies), 1; got != exp { - // Ensure no additional retention policies were created. - t.Fatalf("got %v, but expected %v", got, exp) +func TestMetaClient_CreateDatabaseWithRetentionPolicy_Conflict_NonDefault(t *testing.T) { + t.Parallel() + + d, c := newClient() + defer os.RemoveAll(d) + defer c.Close() + + duration := 1 * time.Hour + replicaN := 1 + spec := meta.RetentionPolicySpec{ + Name: "rp0", + Duration: &duration, + ReplicaN: &replicaN, + ShardGroupDuration: 60 * time.Minute, + } + + // Create a default retention policy. + if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec); err != nil { + t.Fatal(err) + } + + // Let's create a non-default retention policy. + spec2 := spec + spec2.Name = "rp1" + if _, err := c.CreateRetentionPolicy("db0", &spec2, false); err != nil { + t.Fatal(err) + } + + // If we try to create a database with the non-default retention policy then + // it's an error. + if _, err := c.CreateDatabaseWithRetentionPolicy("db0", &spec2); err != meta.ErrRetentionPolicyConflict { + t.Fatalf("got %v, but expected %v", err, meta.ErrRetentionPolicyConflict) } }