diff --git a/database.go b/database.go index 5623a46e01..afa11ca2fa 100644 --- a/database.go +++ b/database.go @@ -3,7 +3,6 @@ package influxdb import ( "encoding/json" "fmt" - "regexp" "sort" "sync" "time" @@ -12,28 +11,29 @@ import ( // "github.com/influxdb/influxdb/messaging" ) -// Database represents a collection of shard spaces. +// Database represents a collection of retention policies. type Database struct { mu sync.RWMutex server *Server name string - users map[string]*DBUser // database users by name - spaces map[string]*ShardSpace // shard spaces by name - shards map[uint64]*Shard // shards by id - series map[string]*Series // series by name + users map[string]*DBUser // database users by name + policies map[string]*RetentionPolicy // retention policies by name + shards map[uint64]*Shard // shards by id + series map[string]*Series // series by name - maxFieldID uint64 // largest field id in use + defaultRetentionPolicy string + maxFieldID uint64 // largest field id in use } // newDatabase returns an instance of Database associated with a server. func newDatabase(s *Server) *Database { return &Database{ - server: s, - users: make(map[string]*DBUser), - spaces: make(map[string]*ShardSpace), - shards: make(map[uint64]*Shard), - series: make(map[string]*Series), + server: s, + users: make(map[string]*DBUser), + policies: make(map[string]*RetentionPolicy), + shards: make(map[uint64]*Shard), + series: make(map[string]*Series), } } @@ -44,6 +44,13 @@ func (db *Database) Name() string { return db.name } +// DefaultRetentionPolicy returns the retention policy that writes and queries will default to or nil if not set. +func (db *Database) DefaultRetentionPolicy() *RetentionPolicy { + db.mu.Lock() + defer db.mu.Unlock() + return db.policies[db.defaultRetentionPolicy] +} + // User returns a database user by name. func (db *Database) User(name string) *DBUser { db.mu.Lock() @@ -180,93 +187,94 @@ func (db *Database) applyChangePassword(username, newPassword string) error { return nil } -// ShardSpace returns a shard space by name. -func (db *Database) ShardSpace(name string) *ShardSpace { +// RetentionPolicy returns a retention policy by name. +func (db *Database) RetentionPolicy(name string) *RetentionPolicy { db.mu.Lock() defer db.mu.Unlock() - return db.spaces[name] + return db.policies[name] } -// shardSpaceBySeries returns a shard space that matches a series name. -func (db *Database) shardSpaceBySeries(name string) *ShardSpace { - for _, ss := range db.spaces { - if ss.Regex.MatchString(name) { - return ss - } +// CreateRetentionPolicy creates a retention policy in the database. +func (db *Database) CreateRetentionPolicy(ss *RetentionPolicy) error { + c := &createRetentionPolicyCommand{ + Database: db.Name(), + Name: ss.Name, + Duration: ss.Duration, + ReplicaN: ss.ReplicaN, + SplitN: ss.SplitN, } - return nil -} - -// CreateShardSpace creates a shard space in the database. -func (db *Database) CreateShardSpace(ss *ShardSpace) error { - c := &createShardSpaceCommand{ - Database: db.Name(), - Name: ss.Name, - Retention: ss.Retention, - Duration: ss.Duration, - ReplicaN: ss.ReplicaN, - SplitN: ss.SplitN, - } - if ss.Regex != nil { - c.Regex = ss.Regex.String() - } - _, err := db.server.broadcast(createShardSpaceMessageType, c) + _, err := db.server.broadcast(createRetentionPolicyMessageType, c) return err } -func (db *Database) applyCreateShardSpace(name, regex string, retention, duration time.Duration, replicaN, splitN uint32) error { +func (db *Database) applyCreateRetentionPolicy(name string, duration time.Duration, replicaN, splitN uint32) error { db.mu.Lock() defer db.mu.Unlock() - // Validate shard space. + // Validate retention policy. if name == "" { - return ErrShardSpaceNameRequired - } else if db.spaces[name] != nil { - return ErrShardSpaceExists + return ErrRetentionPolicyNameRequired + } else if db.policies[name] != nil { + return ErrRetentionPolicyExists } - // Compile regex. - re := regexp.MustCompile(regex) - // Add space to the database. - db.spaces[name] = &ShardSpace{ - Name: name, - Regex: re, - Retention: retention, - Duration: duration, - ReplicaN: replicaN, - SplitN: splitN, + db.policies[name] = &RetentionPolicy{ + Name: name, + Duration: duration, + ReplicaN: replicaN, + SplitN: splitN, } return nil } -// DeleteShardSpace removes a shard space from the database. -func (db *Database) DeleteShardSpace(name string) error { - c := &deleteShardSpaceCommand{Database: db.Name(), Name: name} - _, err := db.server.broadcast(deleteShardSpaceMessageType, c) +// DeleteRetentionPolicy removes a retention policy from the database. +func (db *Database) DeleteRetentionPolicy(name string) error { + c := &deleteRetentionPolicyCommand{Database: db.Name(), Name: name} + _, err := db.server.broadcast(deleteRetentionPolicyMessageType, c) return err } -func (db *Database) applyDeleteShardSpace(name string) error { +func (db *Database) applyDeleteRetentionPolicy(name string) error { db.mu.Lock() defer db.mu.Unlock() - // Validate shard space. + // Validate retention policy. if name == "" { - return ErrShardSpaceNameRequired - } else if db.spaces[name] == nil { - return ErrShardSpaceNotFound + return ErrRetentionPolicyNameRequired + } else if db.policies[name] == nil { + return ErrRetentionPolicyNotFound } - // Remove shard space. - delete(db.spaces, name) + // Remove retention policy. + delete(db.policies, name) + return nil +} + +// SetDefaultRetentionPolicy sets the default policy to write data into and query from on a database. +func (db *Database) SetDefaultRetentionPolicy(name string) error { + c := &setDefaultRetentionPolicyCommand{Database: db.Name(), Name: name} + _, err := db.server.broadcast(setDefaultRetentionPolicyMessageType, c) + return err +} + +func (db *Database) applySetDefaultRetentionPolicy(name string) error { + db.mu.Lock() + defer db.mu.Unlock() + + // Check the retention policy exists + if db.policies[name] == nil { + return ErrRetentionPolicyNotFound + } + + db.defaultRetentionPolicy = name return nil } // shard returns a shard by id. func (db *Database) shard(id uint64) *Shard { - for _, ss := range db.spaces { + for _, ss := range db.policies { for _, s := range ss.Shards { if s.ID == id { return s @@ -276,21 +284,21 @@ func (db *Database) shard(id uint64) *Shard { return nil } -// CreateShardIfNotExists creates a shard for a shard space for a given timestamp. +// CreateShardIfNotExists creates a shard for a retention policy for a given timestamp. func (db *Database) CreateShardIfNotExists(space string, timestamp time.Time) error { c := &createShardIfNotExistsSpaceCommand{Database: db.name, Space: space, Timestamp: timestamp} _, err := db.server.broadcast(createShardIfNotExistsMessageType, c) return err } -func (db *Database) applyCreateShardIfNotExists(id uint64, space string, timestamp time.Time) (error, bool) { +func (db *Database) applyCreateShardIfNotExists(id uint64, policy string, timestamp time.Time) (error, bool) { db.mu.Lock() defer db.mu.Unlock() - // Validate shard space. - ss := db.spaces[space] + // Validate retention policy. + ss := db.policies[policy] if ss == nil { - return ErrShardSpaceNotFound, false + return ErrRetentionPolicyNotFound, false } // If we can match to an existing shard date range then just ignore request. @@ -311,7 +319,7 @@ func (db *Database) applyCreateShardIfNotExists(id uint64, space string, timesta panic("unable to open shard: " + err.Error()) } - // Append to shard space. + // Append to retention policy. ss.Shards = append(ss.Shards, s) return nil, true @@ -322,15 +330,15 @@ func (db *Database) WriteSeries(name string, tags map[string]string, timestamp t panic("not yet implemented: Database.WriteSeries()") /* TEMPORARILY REMOVED FOR PROTOBUFS. - // Find shard space matching the series and split points by shard. + // Find retention policy matching the series and split points by shard. db.mu.Lock() name := db.name - space := db.shardSpaceBySeries(series.GetName()) + space := db.retentionPolicyBySeries(series.GetName()) db.mu.Unlock() // Ensure there is a space available. if space == nil { - return ErrShardSpaceNotFound + return ErrRetentionPolicyNotFound } // Group points by shard. @@ -458,12 +466,13 @@ func (db *Database) MarshalJSON() ([]byte, error) { // Copy over properties to intermediate type. var o databaseJSON o.Name = db.name + o.DefaultRetentionPolicy = db.defaultRetentionPolicy o.MaxFieldID = db.maxFieldID for _, u := range db.users { o.Users = append(o.Users, u) } - for _, ss := range db.spaces { - o.Spaces = append(o.Spaces, ss) + for _, ss := range db.policies { + o.Policies = append(o.Policies, ss) } for _, s := range db.shards { o.Shards = append(o.Shards, s) @@ -484,6 +493,7 @@ func (db *Database) UnmarshalJSON(data []byte) error { // Copy over properties from intermediate type. db.name = o.Name + db.defaultRetentionPolicy = o.DefaultRetentionPolicy db.maxFieldID = o.MaxFieldID // Copy users. @@ -492,10 +502,10 @@ func (db *Database) UnmarshalJSON(data []byte) error { db.users[u.Name] = u } - // Copy shard spaces. - db.spaces = make(map[string]*ShardSpace) - for _, ss := range o.Spaces { - db.spaces[ss.Name] = ss + // Copy shard policies. + db.policies = make(map[string]*RetentionPolicy) + for _, ss := range o.Policies { + db.policies[ss.Name] = ss } // Copy shards. @@ -515,12 +525,13 @@ func (db *Database) UnmarshalJSON(data []byte) error { // databaseJSON represents the JSON-serialization format for a database. type databaseJSON struct { - Name string `json:"name,omitempty"` - MaxFieldID uint64 `json:"maxFieldID,omitempty"` - Users []*DBUser `json:"users,omitempty"` - Spaces []*ShardSpace `json:"spaces,omitempty"` - Shards []*Shard `json:"shards,omitempty"` - Series []*Series `json:"series,omitempty"` + Name string `json:"name,omitempty"` + DefaultRetentionPolicy string `json:"defaultRetentionPolicy,omitempty"` + MaxFieldID uint64 `json:"maxFieldID,omitempty"` + Users []*DBUser `json:"users,omitempty"` + Policies []*RetentionPolicy `json:"policies,omitempty"` + Shards []*Shard `json:"shards,omitempty"` + Series []*Series `json:"series,omitempty"` } // databases represents a list of databases, sortable by name. @@ -530,16 +541,13 @@ func (p databases) Len() int { return len(p) } func (p databases) Less(i, j int) bool { return p[i].name < p[j].name } func (p databases) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -// ShardSpace represents a policy for creating new shards in a database. -type ShardSpace struct { +// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for. +type RetentionPolicy struct { // Unique name within database. Required. Name string - // Expression used to match against series. Optional. Defaults to /.*/. - Regex *regexp.Regexp - - Retention time.Duration - Duration time.Duration + // Length of time to keep data around + Duration time.Duration ReplicaN uint32 SplitN uint32 @@ -547,21 +555,19 @@ type ShardSpace struct { Shards []*Shard } -// NewShardSpace returns a new instance of ShardSpace with defaults set. -func NewShardSpace() *ShardSpace { - return &ShardSpace{ - Regex: regexp.MustCompile(`.*`), - ReplicaN: DefaultReplicaN, - SplitN: DefaultSplitN, - Retention: DefaultShardRetention, - Duration: DefaultShardDuration, +// NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set. +func NewRetentionPolicy() *RetentionPolicy { + return &RetentionPolicy{ + ReplicaN: DefaultReplicaN, + SplitN: DefaultSplitN, + Duration: DefaultShardRetention, } } /* // SplitPoints groups a set of points by shard id. // Also returns a list of timestamps that did not match an existing shard. -func (ss *ShardSpace) Split(a []*protocol.Point) (points map[uint64][]*protocol.Point, unassigned []*protocol.Point) { +func (ss *RetentionPolicy) Split(a []*protocol.Point) (points map[uint64][]*protocol.Point, unassigned []*protocol.Point) { points = make(map[uint64][]*protocol.Point) for _, p := range a { if s := ss.ShardByTimestamp(time.Unix(0, p.GetTimestamp())); s != nil { @@ -576,7 +582,7 @@ func (ss *ShardSpace) Split(a []*protocol.Point) (points map[uint64][]*protocol. // ShardByTimestamp returns the shard in the space that owns a given timestamp. // Returns nil if the shard does not exist. -func (ss *ShardSpace) ShardByTimestamp(timestamp time.Time) *Shard { +func (ss *RetentionPolicy) ShardByTimestamp(timestamp time.Time) *Shard { for _, s := range ss.Shards { if timeBetween(timestamp, s.StartTime, s.EndTime) { return s @@ -585,22 +591,20 @@ func (ss *ShardSpace) ShardByTimestamp(timestamp time.Time) *Shard { return nil } -// MarshalJSON encodes a shard space to a JSON-encoded byte slice. -func (s *ShardSpace) MarshalJSON() ([]byte, error) { - return json.Marshal(&shardSpaceJSON{ - Name: s.Name, - Regex: s.Regex.String(), - Retention: s.Retention, - Duration: s.Duration, - ReplicaN: s.ReplicaN, - SplitN: s.SplitN, +// MarshalJSON encodes a retention policy to a JSON-encoded byte slice. +func (s *RetentionPolicy) MarshalJSON() ([]byte, error) { + return json.Marshal(&retentionPolicyJSON{ + Name: s.Name, + Duration: s.Duration, + ReplicaN: s.ReplicaN, + SplitN: s.SplitN, }) } -// UnmarshalJSON decodes a JSON-encoded byte slice to a shard space. -func (s *ShardSpace) UnmarshalJSON(data []byte) error { +// UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy. +func (s *RetentionPolicy) UnmarshalJSON(data []byte) error { // Decode into intermediate type. - var o shardSpaceJSON + var o retentionPolicyJSON if err := json.Unmarshal(data, &o); err != nil { return err } @@ -609,34 +613,26 @@ func (s *ShardSpace) UnmarshalJSON(data []byte) error { s.Name = o.Name s.ReplicaN = o.ReplicaN s.SplitN = o.SplitN - s.Retention = o.Retention s.Duration = o.Duration s.Shards = o.Shards - s.Regex, _ = regexp.Compile(o.Regex) - if s.Regex == nil { - s.Regex = regexp.MustCompile(`.*`) - } - return nil } -// shardSpaceJSON represents an intermediate struct for JSON marshaling. -type shardSpaceJSON struct { - Name string `json:"name"` - Regex string `json:"regex,omitempty"` - ReplicaN uint32 `json:"replicaN,omitempty"` - SplitN uint32 `json:"splitN,omitempty"` - Retention time.Duration `json:"retention,omitempty"` - Duration time.Duration `json:"duration,omitempty"` - Shards []*Shard `json:"shards,omitempty"` +// retentionPolicyJSON represents an intermediate struct for JSON marshaling. +type retentionPolicyJSON struct { + Name string `json:"name"` + ReplicaN uint32 `json:"replicaN,omitempty"` + SplitN uint32 `json:"splitN,omitempty"` + Duration time.Duration `json:"duration,omitempty"` + Shards []*Shard `json:"shards,omitempty"` } -// ShardSpaces represents a list of shard spaces. -type ShardSpaces []*ShardSpace +// RetentionPolicys represents a list of shard policies. +type RetentionPolicys []*RetentionPolicy -// Shards returns a list of all shards for all spaces. -func (a ShardSpaces) Shards() []*Shard { +// Shards returns a list of all shards for all policies. +func (a RetentionPolicys) Shards() []*Shard { var shards []*Shard for _, ss := range a { shards = append(shards, ss.Shards...) diff --git a/database_test.go b/database_test.go index 6697b519c6..0ae8184eb8 100644 --- a/database_test.go +++ b/database_test.go @@ -2,7 +2,6 @@ package influxdb_test import ( "reflect" - "regexp" "testing" "time" @@ -216,8 +215,8 @@ func TestDatabase_Users(t *testing.T) { } } -// Ensure the database can create a new shard space. -func TestDatabase_CreateShardSpace(t *testing.T) { +// Ensure the database can create a new retention policy. +func TestDatabase_CreateRetentionPolicy(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() @@ -226,30 +225,28 @@ func TestDatabase_CreateShardSpace(t *testing.T) { t.Fatal(err) } - // Create a shard space on the database. - ss := &influxdb.ShardSpace{ - Name: "bar", - Regex: regexp.MustCompile(`myseries`), - Duration: time.Hour, - Retention: time.Minute, - ReplicaN: 2, - SplitN: 3, + // Create a retention policy on the database. + rp := &influxdb.RetentionPolicy{ + Name: "bar", + Duration: time.Hour, + ReplicaN: 2, + SplitN: 3, } - if err := s.Database("foo").CreateShardSpace(ss); err != nil { + if err := s.Database("foo").CreateRetentionPolicy(rp); err != nil { t.Fatal(err) } s.Restart() - // Verify that the user exists. - if o := s.Database("foo").ShardSpace("bar"); o == nil { - t.Fatalf("shard space not found") - } else if !reflect.DeepEqual(ss, o) { - t.Fatalf("shard space mismatch: %#v", o) + // Verify that the policy exists. + if o := s.Database("foo").RetentionPolicy("bar"); o == nil { + t.Fatalf("retention policy not found") + } else if !reflect.DeepEqual(rp, o) { + t.Fatalf("retention policy mismatch: %#v", o) } } -// Ensure the server returns an error when creating a shard space after db is dropped. -func TestDatabase_CreateShardSpace_ErrDatabaseNotFound(t *testing.T) { +// Ensure the server returns an error when creating a retention policy after db is dropped. +func TestDatabase_CreateRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() @@ -258,62 +255,62 @@ func TestDatabase_CreateShardSpace_ErrDatabaseNotFound(t *testing.T) { db := s.Database("foo") s.DeleteDatabase("foo") - // Create a shard space on the database. - if err := db.CreateShardSpace(&influxdb.ShardSpace{Name: "bar"}); err != influxdb.ErrDatabaseNotFound { + // Create a retention policy on the database. + if err := db.CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrDatabaseNotFound { t.Fatal(err) } } -// Ensure the server returns an error when creating a shard space without a name. -func TestDatabase_CreateShardSpace_ErrShardSpaceNameRequired(t *testing.T) { +// Ensure the server returns an error when creating a retention policy without a name. +func TestDatabase_CreateRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - if err := s.Database("foo").CreateShardSpace(&influxdb.ShardSpace{Name: ""}); err != influxdb.ErrShardSpaceNameRequired { + if err := s.Database("foo").CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: ""}); err != influxdb.ErrRetentionPolicyNameRequired { t.Fatal(err) } } -// Ensure the server returns an error when creating a duplicate shard space. -func TestDatabase_CreateShardSpace_ErrShardSpaceExists(t *testing.T) { +// Ensure the server returns an error when creating a duplicate retention policy. +func TestDatabase_CreateRetentionPolicy_ErrRetentionPolicyExists(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - s.Database("foo").CreateShardSpace(&influxdb.ShardSpace{Name: "bar"}) - if err := s.Database("foo").CreateShardSpace(&influxdb.ShardSpace{Name: "bar"}); err != influxdb.ErrShardSpaceExists { + s.Database("foo").CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: "bar"}) + if err := s.Database("foo").CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: "bar"}); err != influxdb.ErrRetentionPolicyExists { t.Fatal(err) } } -// Ensure the server can delete an existing shard space. -func TestDatabase_DeleteShardSpace(t *testing.T) { +// Ensure the server can delete an existing retention policy. +func TestDatabase_DeleteRetentionPolicy(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() - // Create a database and shard space. + // Create a database and retention policy. s.CreateDatabase("foo") db := s.Database("foo") - if err := db.CreateShardSpace(&influxdb.ShardSpace{Name: "bar"}); err != nil { + if err := db.CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: "bar"}); err != nil { t.Fatal(err) - } else if db.ShardSpace("bar") == nil { - t.Fatal("shard space not created") + } else if db.RetentionPolicy("bar") == nil { + t.Fatal("retention policy not created") } - // Remove shard space from database. - if err := db.DeleteShardSpace("bar"); err != nil { + // Remove retention policy from database. + if err := db.DeleteRetentionPolicy("bar"); err != nil { t.Fatal(err) - } else if db.ShardSpace("bar") != nil { - t.Fatal("shard space not deleted") + } else if db.RetentionPolicy("bar") != nil { + t.Fatal("retention policy not deleted") } s.Restart() - if s.Database("foo").ShardSpace("bar") != nil { - t.Fatal("shard space not deleted after restart") + if s.Database("foo").RetentionPolicy("bar") != nil { + t.Fatal("retention policy not deleted after restart") } } -// Ensure the server returns an error when deleting a shard space after db is dropped. -func TestDatabase_DeleteShardSpace_ErrDatabaseNotFound(t *testing.T) { +// Ensure the server returns an error when deleting a retention policy after db is dropped. +func TestDatabase_DeleteRetentionPolicy_ErrDatabaseNotFound(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() @@ -322,28 +319,74 @@ func TestDatabase_DeleteShardSpace_ErrDatabaseNotFound(t *testing.T) { db := s.Database("foo") s.DeleteDatabase("foo") - // Delete shard space on the database. - if err := db.DeleteShardSpace("bar"); err != influxdb.ErrDatabaseNotFound { + // Delete retention policy on the database. + if err := db.DeleteRetentionPolicy("bar"); err != influxdb.ErrDatabaseNotFound { t.Fatal(err) } } -// Ensure the server returns an error when deleting a shard space without a name. -func TestDatabase_DeleteShardSpace_ErrShardSpaceNameRequired(t *testing.T) { +// Ensure the server returns an error when deleting a retention policy without a name. +func TestDatabase_DeleteRetentionPolicy_ErrRetentionPolicyNameRequired(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - if err := s.Database("foo").DeleteShardSpace(""); err != influxdb.ErrShardSpaceNameRequired { + if err := s.Database("foo").DeleteRetentionPolicy(""); err != influxdb.ErrRetentionPolicyNameRequired { t.Fatal(err) } } -// Ensure the server returns an error when deleting a non-existent shard space. -func TestDatabase_DeleteShardSpace_ErrShardSpaceNotFound(t *testing.T) { +// Ensure the server returns an error when deleting a non-existent retention policy. +func TestDatabase_DeleteRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() s.CreateDatabase("foo") - if err := s.Database("foo").DeleteShardSpace("no_such_space"); err != influxdb.ErrShardSpaceNotFound { + if err := s.Database("foo").DeleteRetentionPolicy("no_such_policy"); err != influxdb.ErrRetentionPolicyNotFound { + t.Fatal(err) + } +} + +// Ensure the server can set the default retention policy +func TestDatabase_SetDefaultRetentionPolicy(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + s.CreateDatabase("foo") + db := s.Database("foo") + + rp := &influxdb.RetentionPolicy{Name: "bar"} + if err := db.CreateRetentionPolicy(rp); err != nil { + t.Fatal(err) + } else if db.RetentionPolicy("bar") == nil { + t.Fatal("retention policy not created") + } + + // Set bar as default + if err := db.SetDefaultRetentionPolicy("bar"); err != nil { + t.Fatal(err) + } + + o := db.DefaultRetentionPolicy() + if o == nil { + t.Fatal("default policy not set") + } else if !reflect.DeepEqual(rp, o) { + t.Fatalf("retention policy mismatch: %#v", o) + } + + s.Restart() + + o = s.Database("foo").DefaultRetentionPolicy() + if o == nil { + t.Fatal("default policy not kept after restart") + } else if !reflect.DeepEqual(rp, o) { + t.Fatalf("retention policy mismatch after restart: %#v", o) + } +} + +// Ensure the server returns an error when setting the deafult retention policy to a non-existant one. +func TestDatabase_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + s.CreateDatabase("foo") + if err := s.Database("foo").SetDefaultRetentionPolicy("no_such_policy"); err != influxdb.ErrRetentionPolicyNotFound { t.Fatal(err) } } @@ -357,7 +400,7 @@ func TestDatabase_WriteSeries(t *testing.T) { defer s.Close() s.CreateDatabase("foo") db := s.Database("foo") - db.CreateShardSpace(&influxdb.ShardSpace{Name: "myspace", Duration: 1 * time.Hour}) + db.CreateRetentionPolicys(&influxdb.RetentionPolicy{Name: "myspace", Duration: 1 * time.Hour}) db.CreateUser("susy", "pass", nil) // Write series with one point to the database. diff --git a/handler.go b/handler.go index 730a070f45..8f9e6d7020 100644 --- a/handler.go +++ b/handler.go @@ -57,15 +57,15 @@ func NewHandler(s *Server) *Handler { h.mux.Get("/interfaces", http.HandlerFunc(h.serveInterfaces)) // Shard routes. - h.mux.Get("/cluster/shards", http.HandlerFunc(h.serveShards)) - h.mux.Post("/cluster/shards", http.HandlerFunc(h.serveCreateShard)) - h.mux.Del("/cluster/shards/:id", http.HandlerFunc(h.serveDeleteShard)) + h.mux.Get("/db/:db/shards", http.HandlerFunc(h.serveShards)) + h.mux.Del("/db/:db/shards/:id", http.HandlerFunc(h.serveDeleteShard)) - // Shard space routes. - h.mux.Get("/cluster/shard_spaces", http.HandlerFunc(h.serveShardSpaces)) - h.mux.Post("/cluster/shard_spaces/:db", http.HandlerFunc(h.serveCreateShardSpace)) - h.mux.Post("/cluster/shard_spaces/:db/:name", http.HandlerFunc(h.serveUpdateShardSpace)) - h.mux.Del("/cluster/shard_spaces/:db/:name", http.HandlerFunc(h.serveDeleteShardSpace)) + // retention policy routes. + h.mux.Get("/db/:db/retention_policies", http.HandlerFunc(h.serveRetentionPolicys)) + h.mux.Get("/db/:db/retention_policies/:name/shards", http.HandlerFunc(h.serveShardsByRetentionPolicy)) + h.mux.Post("/db/:db/retention_policies", http.HandlerFunc(h.serveCreateRetentionPolicy)) + h.mux.Post("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveUpdateRetentionPolicy)) + h.mux.Del("/db/:db/retention_policies/:name", http.HandlerFunc(h.serveDeleteRetentionPolicy)) // Cluster config endpoints h.mux.Get("/cluster/servers", http.HandlerFunc(h.serveServers)) @@ -277,23 +277,23 @@ func (h *Handler) serveInterfaces(w http.ResponseWriter, r *http.Request) {} // serveShards returns a list of shards. func (h *Handler) serveShards(w http.ResponseWriter, r *http.Request) {} -// serveCreateShard creates a new shard. -func (h *Handler) serveCreateShard(w http.ResponseWriter, r *http.Request) {} +// serveShardsByRetentionPolicy returns a list of shards for a given retention policy. +func (h *Handler) serveShardsByRetentionPolicy(w http.ResponseWriter, r *http.Request) {} // serveDeleteShard removes an existing shard. func (h *Handler) serveDeleteShard(w http.ResponseWriter, r *http.Request) {} -// serveShardSpaces returns a list of shard spaces. -func (h *Handler) serveShardSpaces(w http.ResponseWriter, r *http.Request) {} +// serveRetentionPolicys returns a list of retention policys. +func (h *Handler) serveRetentionPolicys(w http.ResponseWriter, r *http.Request) {} -// serveCreateShardSpace creates a new shard space. -func (h *Handler) serveCreateShardSpace(w http.ResponseWriter, r *http.Request) {} +// serveCreateRetentionPolicy creates a new retention policy. +func (h *Handler) serveCreateRetentionPolicy(w http.ResponseWriter, r *http.Request) {} -// serveUpdateShardSpace updates an existing shard space. -func (h *Handler) serveUpdateShardSpace(w http.ResponseWriter, r *http.Request) {} +// serveUpdateRetentionPolicy updates an existing retention policy. +func (h *Handler) serveUpdateRetentionPolicy(w http.ResponseWriter, r *http.Request) {} -// serveDeleteShardSpace removes an existing shard space. -func (h *Handler) serveDeleteShardSpace(w http.ResponseWriter, r *http.Request) {} +// serveDeleteRetentionPolicy removes an existing retention policy. +func (h *Handler) serveDeleteRetentionPolicy(w http.ResponseWriter, r *http.Request) {} // serveServers returns a list of servers in the cluster. func (h *Handler) serveServers(w http.ResponseWriter, r *http.Request) {} @@ -915,15 +915,15 @@ func (self *HTTPServer) convertShardsToMap(shards []*cluster.ShardData) []interf return result } -func (self *HTTPServer) getShardSpaces(w libhttp.ResponseWriter, r *libhttp.Request) { +func (self *HTTPServer) getRetentionPolicys(w libhttp.ResponseWriter, r *libhttp.Request) { self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) { - return libhttp.StatusOK, self.clusterConfig.GetShardSpaces() + return libhttp.StatusOK, self.clusterConfig.GetRetentionPolicys() }) } -func (self *HTTPServer) createShardSpace(w libhttp.ResponseWriter, r *libhttp.Request) { +func (self *HTTPServer) createRetentionPolicy(w libhttp.ResponseWriter, r *libhttp.Request) { self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) { - space := &cluster.ShardSpace{} + space := &cluster.RetentionPolicy{} decoder := json.NewDecoder(r.Body) err := decoder.Decode(space) if err != nil { @@ -934,7 +934,7 @@ func (self *HTTPServer) createShardSpace(w libhttp.ResponseWriter, r *libhttp.Re if err != nil { return libhttp.StatusBadRequest, err.Error() } - err = self.raftServer.CreateShardSpace(space) + err = self.raftServer.CreateRetentionPolicy(space) if err != nil { return libhttp.StatusInternalServerError, err.Error() } @@ -942,11 +942,11 @@ func (self *HTTPServer) createShardSpace(w libhttp.ResponseWriter, r *libhttp.Re }) } -func (self *HTTPServer) dropShardSpace(w libhttp.ResponseWriter, r *libhttp.Request) { +func (self *HTTPServer) dropRetentionPolicy(w libhttp.ResponseWriter, r *libhttp.Request) { self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) { name := r.URL.Query().Get(":name") db := r.URL.Query().Get(":db") - if err := self.raftServer.DropShardSpace(db, name); err != nil { + if err := self.raftServer.DropRetentionPolicy(db, name); err != nil { return libhttp.StatusInternalServerError, err.Error() } return libhttp.StatusOK, nil @@ -954,7 +954,7 @@ func (self *HTTPServer) dropShardSpace(w libhttp.ResponseWriter, r *libhttp.Requ } type DatabaseConfig struct { - Spaces []*cluster.ShardSpace `json:"spaces"` + Spaces []*cluster.RetentionPolicy `json:"spaces"` ContinuousQueries []string `json:"continuousQueries"` } @@ -981,7 +981,7 @@ func (self *HTTPServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R } } - // validate shard spaces + // validate retention policys for _, space := range databaseConfig.Spaces { err := space.Validate(self.clusterConfig, false) if err != nil { @@ -995,7 +995,7 @@ func (self *HTTPServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R } for _, space := range databaseConfig.Spaces { space.Database = database - err = self.raftServer.CreateShardSpace(space) + err = self.raftServer.CreateRetentionPolicy(space) if err != nil { return libhttp.StatusInternalServerError, err.Error() } @@ -1010,9 +1010,9 @@ func (self *HTTPServer) configureDatabase(w libhttp.ResponseWriter, r *libhttp.R }) } -func (self *HTTPServer) updateShardSpace(w libhttp.ResponseWriter, r *libhttp.Request) { +func (self *HTTPServer) updateRetentionPolicy(w libhttp.ResponseWriter, r *libhttp.Request) { self.tryAsClusterAdmin(w, r, func(u User) (int, interface{}) { - space := &cluster.ShardSpace{} + space := &cluster.RetentionPolicy{} decoder := json.NewDecoder(r.Body) err := decoder.Decode(space) if err != nil { @@ -1021,13 +1021,13 @@ func (self *HTTPServer) updateShardSpace(w libhttp.ResponseWriter, r *libhttp.Re space.Database = r.URL.Query().Get(":db") space.Name = r.URL.Query().Get(":name") if !self.clusterConfig.DatabaseExists(space.Database) { - return libhttp.StatusNotAcceptable, "Can't update a shard space for a database that doesn't exist" + return libhttp.StatusNotAcceptable, "Can't update a retention policy for a database that doesn't exist" } - if !self.clusterConfig.ShardSpaceExists(space) { - return libhttp.StatusNotAcceptable, "Can't update a shard space that doesn't exist" + if !self.clusterConfig.RetentionPolicyExists(space) { + return libhttp.StatusNotAcceptable, "Can't update a retention policy that doesn't exist" } - if err := self.raftServer.UpdateShardSpace(space); err != nil { + if err := self.raftServer.UpdateRetentionPolicy(space); err != nil { return libhttp.StatusInternalServerError, err.Error() } return libhttp.StatusOK, nil diff --git a/influxdb.go b/influxdb.go index 2630c077ed..c7a77849a9 100644 --- a/influxdb.go +++ b/influxdb.go @@ -41,14 +41,14 @@ var ( // ErrInvalidUsername is returned when using a username with invalid characters. ErrInvalidUsername = errors.New("invalid username") - // ErrShardSpaceExists is returned when creating a duplicate shard space. - ErrShardSpaceExists = errors.New("shard space exists") + // ErrRetentionPolicyExists is returned when creating a duplicate shard space. + ErrRetentionPolicyExists = errors.New("retention policy exists") - // ErrShardSpaceNotFound is returned when deleting a non-existent shard space. - ErrShardSpaceNotFound = errors.New("shard space not found") + // ErrRetentionPolicyNotFound is returned when deleting a non-existent shard space. + ErrRetentionPolicyNotFound = errors.New("retention policy not found") - // ErrShardSpaceNameRequired is returned using a blank shard space name. - ErrShardSpaceNameRequired = errors.New("shard space name required") + // ErrRetentionPolicyNameRequired is returned using a blank shard space name. + ErrRetentionPolicyNameRequired = errors.New("retention policy name required") // ErrShardNotFound is returned writing to a non-existent shard. ErrShardNotFound = errors.New("shard not found") diff --git a/server.go b/server.go index 0b27a27a59..8a490c5730 100644 --- a/server.go +++ b/server.go @@ -19,8 +19,8 @@ const ( // It is also used when reseting the root user's password. DefaultRootPassword = "root" - // DefaultShardSpaceName is the name of a databases's default shard space. - DefaultShardSpaceName = "default" + // DefaultRetentionPolicyName is the name of a databases's default shard space. + DefaultRetentionPolicyName = "default" // DefaultSplitN represents the number of partitions a shard is split into. DefaultSplitN = 1 @@ -37,17 +37,18 @@ const ( const ( // broadcast messages - createDatabaseMessageType = messaging.MessageType(0x00) - deleteDatabaseMessageType = messaging.MessageType(0x01) - createShardSpaceMessageType = messaging.MessageType(0x02) - deleteShardSpaceMessageType = messaging.MessageType(0x03) - createClusterAdminMessageType = messaging.MessageType(0x04) - deleteClusterAdminMessageType = messaging.MessageType(0x05) - clusterAdminSetPasswordMessageType = messaging.MessageType(0x06) - createDBUserMessageType = messaging.MessageType(0x07) - deleteDBUserMessageType = messaging.MessageType(0x08) - dbUserSetPasswordMessageType = messaging.MessageType(0x09) - createShardIfNotExistsMessageType = messaging.MessageType(0x0a) + createDatabaseMessageType = messaging.MessageType(0x00) + deleteDatabaseMessageType = messaging.MessageType(0x01) + createRetentionPolicyMessageType = messaging.MessageType(0x02) + deleteRetentionPolicyMessageType = messaging.MessageType(0x03) + createClusterAdminMessageType = messaging.MessageType(0x04) + deleteClusterAdminMessageType = messaging.MessageType(0x05) + clusterAdminSetPasswordMessageType = messaging.MessageType(0x06) + createDBUserMessageType = messaging.MessageType(0x07) + deleteDBUserMessageType = messaging.MessageType(0x08) + dbUserSetPasswordMessageType = messaging.MessageType(0x09) + createShardIfNotExistsMessageType = messaging.MessageType(0x0a) + setDefaultRetentionPolicyMessageType = messaging.MessageType(0x0b) // per-topic messages writeSeriesMessageType = messaging.MessageType(0x80) @@ -541,8 +542,8 @@ type deleteDBUserCommand struct { Username string `json:"username"` } -func (s *Server) applyCreateShardSpace(m *messaging.Message) error { - var c createShardSpaceCommand +func (s *Server) applyCreateRetentionPolicy(m *messaging.Message) error { + var c createRetentionPolicyCommand mustUnmarshalJSON(m.Data, &c) s.mu.Lock() @@ -554,7 +555,7 @@ func (s *Server) applyCreateShardSpace(m *messaging.Message) error { return ErrDatabaseNotFound } - if err := db.applyCreateShardSpace(c.Name, c.Regex, c.Retention, c.Duration, c.ReplicaN, c.SplitN); err != nil { + if err := db.applyCreateRetentionPolicy(c.Name, c.Duration, c.ReplicaN, c.SplitN); err != nil { return err } @@ -566,18 +567,16 @@ func (s *Server) applyCreateShardSpace(m *messaging.Message) error { return nil } -type createShardSpaceCommand struct { - Database string `json:"database"` - Name string `json:"name"` - Regex string `json:"regex"` - Retention time.Duration `json:"retention"` - Duration time.Duration `json:"duration"` - ReplicaN uint32 `json:"replicaN"` - SplitN uint32 `json:"splitN"` +type createRetentionPolicyCommand struct { + Database string `json:"database"` + Name string `json:"name"` + Duration time.Duration `json:"duration"` + ReplicaN uint32 `json:"replicaN"` + SplitN uint32 `json:"splitN"` } -func (s *Server) applyDeleteShardSpace(m *messaging.Message) error { - var c deleteShardSpaceCommand +func (s *Server) applyDeleteRetentionPolicy(m *messaging.Message) error { + var c deleteRetentionPolicyCommand mustUnmarshalJSON(m.Data, &c) s.mu.Lock() @@ -590,7 +589,7 @@ func (s *Server) applyDeleteShardSpace(m *messaging.Message) error { } // Remove shard space from database. - if err := db.applyDeleteShardSpace(c.Name); err != nil { + if err := db.applyDeleteRetentionPolicy(c.Name); err != nil { return err } @@ -602,7 +601,37 @@ func (s *Server) applyDeleteShardSpace(m *messaging.Message) error { return nil } -type deleteShardSpaceCommand struct { +type deleteRetentionPolicyCommand struct { + Database string `json:"database"` + Name string `json:"name"` +} + +func (s *Server) applySetDefaultRetentionPolicy(m *messaging.Message) error { + var c setDefaultRetentionPolicyCommand + mustUnmarshalJSON(m.Data, &c) + + s.mu.Lock() + defer s.mu.Unlock() + + // Retrieve the database. + db := s.databases[c.Database] + if s.databases[c.Database] == nil { + return ErrDatabaseNotFound + } + + if err := db.applySetDefaultRetentionPolicy(c.Name); err != nil { + return err + } + + // Persist to metastore. + s.meta.mustUpdate(func(tx *metatx) error { + return tx.saveDatabase(db) + }) + + return nil +} + +type setDefaultRetentionPolicyCommand struct { Database string `json:"database"` Name string `json:"name"` } @@ -660,12 +689,14 @@ func (s *Server) processor(done chan struct{}) { err = s.applyDeleteDBUser(m) case dbUserSetPasswordMessageType: err = s.applyDBUserSetPassword(m) - case createShardSpaceMessageType: - err = s.applyCreateShardSpace(m) - case deleteShardSpaceMessageType: - err = s.applyDeleteShardSpace(m) + case createRetentionPolicyMessageType: + err = s.applyCreateRetentionPolicy(m) + case deleteRetentionPolicyMessageType: + err = s.applyDeleteRetentionPolicy(m) case createShardIfNotExistsMessageType: err = s.applyCreateShardIfNotExists(m) + case setDefaultRetentionPolicyMessageType: + err = s.applySetDefaultRetentionPolicy(m) case writeSeriesMessageType: /* TEMPORARILY REMOVED FOR PROTOBUFS. err = s.applyWriteSeries(m)