2014-11-01 01:31:19 +00:00
package influxdb
import (
"encoding/json"
"fmt"
"sort"
"sync"
"time"
2014-11-30 23:00:50 +00:00
"unsafe"
2014-11-04 04:15:58 +00:00
2014-11-30 23:00:50 +00:00
"github.com/influxdb/influxdb/messaging"
2014-11-01 01:31:19 +00:00
)
2014-11-19 00:03:21 +00:00
// Database represents a collection of retention policies.
2014-11-01 01:31:19 +00:00
type Database struct {
mu sync . RWMutex
server * Server
name string
2014-11-05 05:32:17 +00:00
2014-11-18 22:57:10 +00:00
users map [ string ] * DBUser // database users by name
policies map [ string ] * RetentionPolicy // retention policies by name
shards map [ uint64 ] * Shard // shards by id
series map [ string ] * Series // series by name
2014-11-10 02:55:53 +00:00
2014-11-18 23:59:37 +00:00
defaultRetentionPolicy string
2014-11-01 01:31:19 +00:00
}
// newDatabase returns an instance of Database associated with a server.
func newDatabase ( s * Server ) * Database {
return & Database {
2014-11-18 22:57:10 +00:00
server : s ,
users : make ( map [ string ] * DBUser ) ,
policies : make ( map [ string ] * RetentionPolicy ) ,
shards : make ( map [ uint64 ] * Shard ) ,
series : make ( map [ string ] * Series ) ,
2014-11-01 01:31:19 +00:00
}
}
// Name returns the database name.
func ( db * Database ) Name ( ) string {
db . mu . Lock ( )
defer db . mu . Unlock ( )
return db . name
}
2014-11-18 23:59:37 +00:00
// DefaultRetentionPolicy returns the retention policy that writes and queries will default to or nil if not set.
func ( db * Database ) DefaultRetentionPolicy ( ) * RetentionPolicy {
db . mu . Lock ( )
defer db . mu . Unlock ( )
return db . policies [ db . defaultRetentionPolicy ]
}
2014-11-01 01:31:19 +00:00
// User returns a database user by name.
func ( db * Database ) User ( name string ) * DBUser {
db . mu . Lock ( )
defer db . mu . Unlock ( )
return db . users [ name ]
}
// User returns a list of all database users.
func ( db * Database ) Users ( ) [ ] * DBUser {
db . mu . Lock ( )
defer db . mu . Unlock ( )
var a dbUsers
for _ , u := range db . users {
a = append ( a , u )
}
sort . Sort ( a )
return a
}
// CreateUser creates a user in the database.
2014-11-26 16:02:17 +00:00
func ( db * Database ) CreateUser ( username , password string , read , write [ ] * Matcher ) error {
2014-11-01 01:31:19 +00:00
// TODO: Authorization.
c := & createDBUserCommand {
2014-12-03 16:32:53 +00:00
Database : db . Name ( ) ,
Username : username ,
Password : password ,
2014-11-26 16:02:17 +00:00
ReadFrom : read ,
2014-12-03 16:32:53 +00:00
WriteTo : write ,
2014-11-01 01:31:19 +00:00
}
_ , err := db . server . broadcast ( createDBUserMessageType , c )
return err
}
2014-11-26 16:02:17 +00:00
func ( db * Database ) applyCreateUser ( username , password string , read , write [ ] * Matcher ) error {
2014-11-01 01:31:19 +00:00
db . mu . Lock ( )
defer db . mu . Unlock ( )
// Validate user.
if username == "" {
return ErrUsernameRequired
} else if ! isValidName ( username ) {
return ErrInvalidUsername
} else if db . users [ username ] != nil {
return ErrUserExists
}
// Generate the hash of the password.
hash , err := HashPassword ( password )
if err != nil {
return err
}
// Create the user.
db . users [ username ] = & DBUser {
CommonUser : CommonUser {
Name : username ,
Hash : string ( hash ) ,
} ,
DB : db . name ,
2014-11-26 16:02:17 +00:00
ReadFrom : read ,
WriteTo : write ,
2014-11-01 01:31:19 +00:00
IsAdmin : false ,
}
return nil
}
// DeleteUser removes a user from the database.
func ( db * Database ) DeleteUser ( username string ) error {
c := & deleteDBUserCommand {
Database : db . Name ( ) ,
Username : username ,
}
_ , err := db . server . broadcast ( deleteDBUserMessageType , c )
return err
}
func ( db * Database ) applyDeleteUser ( username string ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
// Validate user.
if username == "" {
return ErrUsernameRequired
} else if db . users [ username ] == nil {
return ErrUserNotFound
}
// Remove user.
delete ( db . users , username )
return nil
}
// ChangePassword changes the password for a user in the database
func ( db * Database ) ChangePassword ( username , newPassword string ) error {
c := & dbUserSetPasswordCommand {
Database : db . Name ( ) ,
Username : username ,
Password : newPassword ,
}
_ , err := db . server . broadcast ( dbUserSetPasswordMessageType , c )
return err
}
2014-11-05 05:32:17 +00:00
func ( db * Database ) applyChangePassword ( username , newPassword string ) error {
2014-11-01 01:31:19 +00:00
db . mu . Lock ( )
defer db . mu . Unlock ( )
// Validate user.
u := db . users [ username ]
if username == "" {
return ErrUsernameRequired
} else if u == nil {
return ErrUserNotFound
}
// Generate the hash of the password.
hash , err := HashPassword ( newPassword )
if err != nil {
return err
}
// Update user password hash.
u . Hash = string ( hash )
return nil
}
2014-11-18 23:59:37 +00:00
// RetentionPolicy returns a retention policy by name.
2014-11-18 22:57:10 +00:00
func ( db * Database ) RetentionPolicy ( name string ) * RetentionPolicy {
2014-11-01 01:31:19 +00:00
db . mu . Lock ( )
defer db . mu . Unlock ( )
2014-11-18 22:57:10 +00:00
return db . policies [ name ]
2014-11-01 01:31:19 +00:00
}
2014-11-18 23:59:37 +00:00
// CreateRetentionPolicy creates a retention policy in the database.
2014-11-26 13:40:48 +00:00
func ( db * Database ) CreateRetentionPolicy ( rp * RetentionPolicy ) error {
2014-11-18 22:57:10 +00:00
c := & createRetentionPolicyCommand {
Database : db . Name ( ) ,
2014-11-26 13:40:48 +00:00
Name : rp . Name ,
Duration : rp . Duration ,
ReplicaN : rp . ReplicaN ,
SplitN : rp . SplitN ,
2014-11-01 01:31:19 +00:00
}
2014-11-18 22:57:10 +00:00
_ , err := db . server . broadcast ( createRetentionPolicyMessageType , c )
2014-11-01 01:31:19 +00:00
return err
}
2014-11-18 22:57:10 +00:00
func ( db * Database ) applyCreateRetentionPolicy ( name string , duration time . Duration , replicaN , splitN uint32 ) error {
2014-11-01 01:31:19 +00:00
db . mu . Lock ( )
defer db . mu . Unlock ( )
2014-11-18 23:59:37 +00:00
// Validate retention policy.
2014-11-01 01:31:19 +00:00
if name == "" {
2014-11-18 22:57:10 +00:00
return ErrRetentionPolicyNameRequired
} else if db . policies [ name ] != nil {
return ErrRetentionPolicyExists
2014-11-01 01:31:19 +00:00
}
// Add space to the database.
2014-11-18 22:57:10 +00:00
db . policies [ name ] = & RetentionPolicy {
Name : name ,
Duration : duration ,
ReplicaN : replicaN ,
SplitN : splitN ,
2014-11-01 01:31:19 +00:00
}
return nil
}
2014-11-18 23:59:37 +00:00
// DeleteRetentionPolicy removes a retention policy from the database.
2014-11-18 22:57:10 +00:00
func ( db * Database ) DeleteRetentionPolicy ( name string ) error {
c := & deleteRetentionPolicyCommand { Database : db . Name ( ) , Name : name }
_ , err := db . server . broadcast ( deleteRetentionPolicyMessageType , c )
2014-11-01 01:31:19 +00:00
return err
}
2014-11-18 22:57:10 +00:00
func ( db * Database ) applyDeleteRetentionPolicy ( name string ) error {
2014-11-01 01:31:19 +00:00
db . mu . Lock ( )
defer db . mu . Unlock ( )
2014-11-18 23:59:37 +00:00
// Validate retention policy.
2014-11-01 01:31:19 +00:00
if name == "" {
2014-11-18 22:57:10 +00:00
return ErrRetentionPolicyNameRequired
} else if db . policies [ name ] == nil {
return ErrRetentionPolicyNotFound
2014-11-01 01:31:19 +00:00
}
2014-11-18 23:59:37 +00:00
// Remove retention policy.
2014-11-18 22:57:10 +00:00
delete ( db . policies , name )
2014-11-01 01:31:19 +00:00
return nil
}
2014-11-18 23:59:37 +00:00
// SetDefaultRetentionPolicy sets the default policy to write data into and query from on a database.
func ( db * Database ) SetDefaultRetentionPolicy ( name string ) error {
c := & setDefaultRetentionPolicyCommand { Database : db . Name ( ) , Name : name }
_ , err := db . server . broadcast ( setDefaultRetentionPolicyMessageType , c )
return err
}
func ( db * Database ) applySetDefaultRetentionPolicy ( name string ) error {
db . mu . Lock ( )
defer db . mu . Unlock ( )
// Check the retention policy exists
if db . policies [ name ] == nil {
return ErrRetentionPolicyNotFound
}
db . defaultRetentionPolicy = name
return nil
}
2014-11-19 17:12:46 +00:00
// Shards returns a list of all shards in the database
func ( db * Database ) Shards ( ) [ ] * Shard {
2014-11-20 15:58:12 +00:00
shards := make ( [ ] * Shard , 0 , len ( db . shards ) )
2014-11-19 17:12:46 +00:00
for _ , v := range db . shards {
2014-11-20 15:58:12 +00:00
shards = append ( shards , v )
2014-11-19 17:12:46 +00:00
}
return shards
}
2014-11-10 02:55:53 +00:00
// shard returns a shard by id.
func ( db * Database ) shard ( id uint64 ) * Shard {
2014-11-19 13:53:45 +00:00
return db . shards [ id ]
2014-11-10 02:55:53 +00:00
}
2014-11-19 23:14:38 +00:00
// RetentionPolicies returns a list of retention polocies for the database
func ( db * Database ) RetentionPolicies ( ) [ ] * RetentionPolicy {
2014-11-20 15:58:12 +00:00
policies := make ( [ ] * RetentionPolicy , 0 , len ( db . policies ) )
2014-11-19 18:30:39 +00:00
for _ , p := range db . policies {
2014-11-20 15:58:12 +00:00
policies = append ( policies , p )
2014-11-10 02:55:53 +00:00
}
2014-11-19 18:30:39 +00:00
return policies
2014-11-10 02:55:53 +00:00
}
2014-12-03 20:36:54 +00:00
// CreateShardsIfNotExist creates all the shards for a retention policy for the interval a timestamp falls into. Note that multiple shards can be created for each bucket of time.
2014-12-03 20:52:38 +00:00
func ( db * Database ) CreateShardsIfNotExists ( policy string , timestamp time . Time ) ( [ ] * Shard , error ) {
2014-12-03 20:36:54 +00:00
db . mu . RLock ( )
2014-12-03 20:52:38 +00:00
p := db . policies [ policy ]
2014-12-03 20:36:54 +00:00
db . mu . RUnlock ( )
if p == nil {
return nil , ErrRetentionPolicyNotFound
2014-12-03 15:12:20 +00:00
}
2014-12-03 20:52:38 +00:00
c := & createShardIfNotExistsCommand { Database : db . name , Policy : policy , Timestamp : timestamp }
2014-12-03 12:27:34 +00:00
if _ , err := db . server . broadcast ( createShardIfNotExistsMessageType , c ) ; err != nil {
return nil , err
}
2014-12-03 20:52:38 +00:00
return p . shardsByTimestamp ( timestamp ) , nil
2014-12-03 20:36:54 +00:00
}
// createShardIfNotExists returns the shard for a given retention policy, series, and timestamp. If it doesn't exist, it will create all shards for the given timestamp
func ( db * Database ) createShardIfNotExists ( policy * RetentionPolicy , id uint32 , timestamp time . Time ) ( * Shard , error ) {
if s := policy . shardBySeriesTimestamp ( id , timestamp ) ; s != nil {
return s , nil
}
2014-12-03 20:52:38 +00:00
if _ , err := db . CreateShardsIfNotExists ( policy . Name , timestamp ) ; err != nil {
2014-12-03 20:36:54 +00:00
return nil , err
}
2014-12-03 15:38:32 +00:00
return policy . shardBySeriesTimestamp ( id , timestamp ) , nil
2014-11-10 02:55:53 +00:00
}
2014-11-18 22:57:10 +00:00
func ( db * Database ) applyCreateShardIfNotExists ( id uint64 , policy string , timestamp time . Time ) ( error , bool ) {
2014-11-10 02:55:53 +00:00
db . mu . Lock ( )
defer db . mu . Unlock ( )
2014-11-18 23:59:37 +00:00
// Validate retention policy.
2014-11-26 13:40:48 +00:00
rp := db . policies [ policy ]
if rp == nil {
2014-11-18 22:57:10 +00:00
return ErrRetentionPolicyNotFound , false
2014-11-10 02:55:53 +00:00
}
// If we can match to an existing shard date range then just ignore request.
2014-11-26 13:40:48 +00:00
for _ , s := range rp . Shards {
2014-11-10 02:55:53 +00:00
if timeBetween ( timestamp , s . StartTime , s . EndTime ) {
return nil , false
}
}
// If no shards match then create a new one.
2014-11-26 13:40:48 +00:00
startTime := timestamp . Truncate ( rp . Duration ) . UTC ( )
endTime := startTime . Add ( rp . Duration ) . UTC ( )
2014-11-10 02:55:53 +00:00
s := newShard ( )
s . ID , s . StartTime , s . EndTime = id , startTime , endTime
// Open shard.
if err := s . open ( db . server . shardPath ( s . ID ) ) ; err != nil {
panic ( "unable to open shard: " + err . Error ( ) )
}
2014-11-18 23:59:37 +00:00
// Append to retention policy.
2014-11-26 13:40:48 +00:00
rp . Shards = append ( rp . Shards , s )
2014-11-19 23:14:38 +00:00
// Add to db's map of shards
2014-11-30 23:00:50 +00:00
db . shards [ s . ID ] = s
2014-11-10 02:55:53 +00:00
return nil , true
}
2014-11-29 00:50:02 +00:00
func ( db * Database ) applyCreateSeriesIfNotExists ( name string , tags map [ string ] string ) error {
2014-11-28 19:14:04 +00:00
db . mu . Lock ( )
defer db . mu . Unlock ( )
db . server . meta . mustUpdate ( func ( tx * metatx ) error {
2014-11-29 00:50:02 +00:00
return tx . createSeriesIfNotExists ( db . name , name , tags )
2014-11-28 19:14:04 +00:00
} )
2014-11-29 00:50:02 +00:00
return nil
2014-11-28 19:14:04 +00:00
}
2014-11-18 00:23:21 +00:00
2014-11-04 04:15:58 +00:00
// WriteSeries writes series data to the database.
2014-11-24 00:15:41 +00:00
func ( db * Database ) WriteSeries ( retentionPolicy , name string , tags map [ string ] string , timestamp time . Time , values map [ string ] interface { } ) error {
2014-11-18 23:59:37 +00:00
// Find retention policy matching the series and split points by shard.
2014-11-30 23:00:50 +00:00
db . mu . RLock ( )
rp , ok := db . policies [ retentionPolicy ]
db . mu . RUnlock ( )
2014-11-10 02:55:53 +00:00
2014-11-24 00:15:41 +00:00
// Ensure the policy was found
if ! ok {
2014-11-18 22:57:10 +00:00
return ErrRetentionPolicyNotFound
2014-11-10 02:55:53 +00:00
}
2014-11-24 00:15:41 +00:00
// get the id for the series and tagset
2014-12-03 15:10:27 +00:00
id , err := db . createSeriesIfNotExists ( name , tags )
if err != nil {
return err
2014-11-10 02:55:53 +00:00
}
2014-12-03 12:27:34 +00:00
// now write it into the shard
2014-12-03 20:36:54 +00:00
s , err := db . createShardIfNotExists ( rp , id , timestamp )
2014-12-03 15:12:20 +00:00
if err != nil {
return fmt . Errorf ( "create shard(%s/%d): %s" , retentionPolicy , timestamp . Format ( time . RFC3339Nano ) , err )
2014-11-10 02:55:53 +00:00
}
2014-11-30 23:00:50 +00:00
data , err := marshalPoint ( id , timestamp , values )
if err != nil {
return err
}
2014-11-10 02:55:53 +00:00
2014-11-30 23:00:50 +00:00
// Publish "write series" message on shard's topic to broker.
m := & messaging . Message {
Type : writeSeriesMessageType ,
TopicID : s . ID ,
Data : data ,
2014-11-10 02:55:53 +00:00
}
2014-12-03 12:27:34 +00:00
_ , err = db . server . client . Publish ( m )
return err
2014-11-10 02:55:53 +00:00
}
2014-11-30 23:00:50 +00:00
func marshalPoint ( seriesID uint32 , timestamp time . Time , values map [ string ] interface { } ) ( [ ] byte , error ) {
b := make ( [ ] byte , 12 )
* ( * uint32 ) ( unsafe . Pointer ( & b [ 0 ] ) ) = seriesID
* ( * int64 ) ( unsafe . Pointer ( & b [ 4 ] ) ) = timestamp . UnixNano ( )
d , err := json . Marshal ( values )
if err != nil {
return nil , err
}
return append ( b , d ... ) , err
}
func unmarshalPoint ( data [ ] byte ) ( uint32 , time . Time , map [ string ] interface { } , error ) {
id := * ( * uint32 ) ( unsafe . Pointer ( & data [ 0 ] ) )
ts := * ( * int64 ) ( unsafe . Pointer ( & data [ 4 ] ) )
timestamp := time . Unix ( 0 , ts )
var v map [ string ] interface { }
err := json . Unmarshal ( data [ 12 : ] , & v )
return id , timestamp , v , err
2014-11-24 00:15:41 +00:00
}
2014-12-03 15:32:39 +00:00
// seriesID returns the unique id of a series and tagset and a bool indicating if it was found
func ( db * Database ) seriesID ( name string , tags map [ string ] string ) ( uint32 , bool ) {
2014-11-24 00:15:41 +00:00
var id uint32
var err error
db . server . meta . view ( func ( tx * metatx ) error {
2014-12-03 15:32:39 +00:00
id , err = tx . seriesID ( db . name , name , tags )
2014-11-24 00:15:41 +00:00
return nil
} )
if err != nil {
return uint32 ( 0 ) , false
2014-11-10 02:55:53 +00:00
}
2014-11-24 00:15:41 +00:00
return id , true
}
2014-11-10 02:55:53 +00:00
2014-11-29 00:50:02 +00:00
func ( db * Database ) createSeriesIfNotExists ( name string , tags map [ string ] string ) ( uint32 , error ) {
2014-12-03 15:32:39 +00:00
if id , ok := db . seriesID ( name , tags ) ; ok {
2014-12-03 15:10:27 +00:00
return id , nil
}
2014-11-29 00:50:02 +00:00
c := & createSeriesIfNotExistsCommand {
2014-11-24 00:15:41 +00:00
Database : db . Name ( ) ,
Name : name ,
Tags : tags ,
}
2014-11-29 00:50:02 +00:00
_ , err := db . server . broadcast ( createSeriesIfNotExistsMessageType , c )
2014-11-24 00:15:41 +00:00
if err != nil {
return uint32 ( 0 ) , err
}
2014-12-03 15:32:39 +00:00
id , ok := db . seriesID ( name , tags )
2014-11-24 00:15:41 +00:00
if ! ok {
return uint32 ( 0 ) , ErrSeriesNotFound
}
return id , nil
2014-11-10 02:55:53 +00:00
}
2014-11-30 23:00:50 +00:00
func ( db * Database ) applyWriteSeries ( shardID uint64 , overwrite bool , data [ ] byte ) error {
db . mu . RLock ( )
s := db . shards [ shardID ]
db . mu . RUnlock ( )
2014-11-10 02:55:53 +00:00
// Find shard.
if s == nil {
return ErrShardNotFound
}
// Write to shard.
2014-11-30 23:00:50 +00:00
return s . writeSeries ( overwrite , data )
2014-11-04 04:15:58 +00:00
}
2014-11-06 06:03:35 +00:00
// timeBetween returns true if t is between min and max, inclusive.
func timeBetween ( t , min , max time . Time ) bool {
return ( t . Equal ( min ) || t . After ( min ) ) && ( t . Equal ( max ) || t . Before ( max ) )
}
2014-11-05 05:32:17 +00:00
// MarshalJSON encodes a database into a JSON-encoded byte slice.
func ( db * Database ) MarshalJSON ( ) ( [ ] byte , error ) {
// Copy over properties to intermediate type.
var o databaseJSON
o . Name = db . name
2014-11-19 00:41:50 +00:00
o . DefaultRetentionPolicy = db . defaultRetentionPolicy
2014-11-05 05:32:17 +00:00
for _ , u := range db . users {
o . Users = append ( o . Users , u )
}
2014-11-26 13:40:48 +00:00
for _ , rp := range db . policies {
o . Policies = append ( o . Policies , rp )
2014-11-05 05:32:17 +00:00
}
for _ , s := range db . shards {
o . Shards = append ( o . Shards , s )
}
2014-11-10 02:55:53 +00:00
for _ , s := range db . series {
o . Series = append ( o . Series , s )
}
2014-11-05 05:32:17 +00:00
return json . Marshal ( & o )
}
// UnmarshalJSON decodes a JSON-encoded byte slice to a database.
func ( db * Database ) UnmarshalJSON ( data [ ] byte ) error {
// Decode into intermediate type.
var o databaseJSON
if err := json . Unmarshal ( data , & o ) ; err != nil {
return err
}
// Copy over properties from intermediate type.
db . name = o . Name
2014-11-19 00:41:50 +00:00
db . defaultRetentionPolicy = o . DefaultRetentionPolicy
2014-11-05 05:32:17 +00:00
// Copy users.
db . users = make ( map [ string ] * DBUser )
for _ , u := range o . Users {
db . users [ u . Name ] = u
}
2014-11-18 22:57:10 +00:00
// Copy shard policies.
db . policies = make ( map [ string ] * RetentionPolicy )
2014-11-26 13:40:48 +00:00
for _ , rp := range o . Policies {
db . policies [ rp . Name ] = rp
2014-11-05 05:32:17 +00:00
}
// Copy shards.
2014-11-10 02:55:53 +00:00
db . shards = make ( map [ uint64 ] * Shard )
2014-11-05 05:32:17 +00:00
for _ , s := range o . Shards {
2014-11-06 06:03:35 +00:00
db . shards [ s . ID ] = s
2014-11-05 05:32:17 +00:00
}
2014-11-04 04:15:58 +00:00
2014-11-10 02:55:53 +00:00
// Copy series.
db . series = make ( map [ string ] * Series )
for _ , s := range o . Series {
db . series [ s . Name ] = s
}
2014-11-04 04:15:58 +00:00
return nil
}
2014-11-05 05:32:17 +00:00
// databaseJSON represents the JSON-serialization format for a database.
type databaseJSON struct {
2014-11-19 00:41:50 +00:00
Name string ` json:"name,omitempty" `
DefaultRetentionPolicy string ` json:"defaultRetentionPolicy,omitempty" `
Users [ ] * DBUser ` json:"users,omitempty" `
Policies [ ] * RetentionPolicy ` json:"policies,omitempty" `
Shards [ ] * Shard ` json:"shards,omitempty" `
Series [ ] * Series ` json:"series,omitempty" `
2014-11-05 05:32:17 +00:00
}
2014-11-01 01:31:19 +00:00
// databases represents a list of databases, sortable by name.
type databases [ ] * Database
func ( p databases ) Len ( ) int { return len ( p ) }
func ( p databases ) Less ( i , j int ) bool { return p [ i ] . name < p [ j ] . name }
func ( p databases ) Swap ( i , j int ) { p [ i ] , p [ j ] = p [ j ] , p [ i ] }
2014-11-18 22:57:10 +00:00
// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.
type RetentionPolicy struct {
2014-11-01 01:31:19 +00:00
// Unique name within database. Required.
Name string
2014-11-18 22:57:10 +00:00
// Length of time to keep data around
Duration time . Duration
2014-11-01 01:31:19 +00:00
ReplicaN uint32
SplitN uint32
2014-11-06 06:03:35 +00:00
Shards [ ] * Shard
2014-11-01 01:31:19 +00:00
}
2014-11-18 22:57:10 +00:00
// NewRetentionPolicy returns a new instance of RetentionPolicy with defaults set.
2014-11-19 23:14:38 +00:00
func NewRetentionPolicy ( name string ) * RetentionPolicy {
2014-11-18 22:57:10 +00:00
return & RetentionPolicy {
2014-11-26 14:36:43 +00:00
Name : name ,
2014-11-18 22:57:10 +00:00
ReplicaN : DefaultReplicaN ,
SplitN : DefaultSplitN ,
Duration : DefaultShardRetention ,
2014-11-01 01:31:19 +00:00
}
}
2014-12-03 15:38:32 +00:00
// shardBySeriesTimestamp returns the shard in the space that owns a given timestamp for a given series id.
2014-11-10 02:55:53 +00:00
// Returns nil if the shard does not exist.
2014-12-03 16:32:53 +00:00
func ( rp * RetentionPolicy ) shardBySeriesTimestamp ( id uint32 , timestamp time . Time ) * Shard {
2014-12-03 20:36:54 +00:00
shards := rp . shardsByTimestamp ( timestamp )
if len ( shards ) > 0 {
return shards [ int ( id ) % len ( shards ) ]
2014-11-10 02:55:53 +00:00
}
2014-12-03 20:36:54 +00:00
return nil
2014-11-10 02:55:53 +00:00
}
2014-12-03 20:36:54 +00:00
func ( rp * RetentionPolicy ) shardsByTimestamp ( timestamp time . Time ) [ ] * Shard {
2014-12-03 16:32:53 +00:00
shards := make ( [ ] * Shard , 0 , rp . SplitN )
2014-11-26 13:40:48 +00:00
for _ , s := range rp . Shards {
2014-11-10 02:55:53 +00:00
if timeBetween ( timestamp , s . StartTime , s . EndTime ) {
2014-11-24 00:15:41 +00:00
shards = append ( shards , s )
2014-11-10 02:55:53 +00:00
}
}
2014-12-03 20:36:54 +00:00
return shards
2014-11-10 02:55:53 +00:00
}
2014-11-18 23:59:37 +00:00
// MarshalJSON encodes a retention policy to a JSON-encoded byte slice.
2014-11-26 13:40:48 +00:00
func ( rp * RetentionPolicy ) MarshalJSON ( ) ( [ ] byte , error ) {
2014-11-18 22:57:10 +00:00
return json . Marshal ( & retentionPolicyJSON {
2014-11-26 13:40:48 +00:00
Name : rp . Name ,
Duration : rp . Duration ,
ReplicaN : rp . ReplicaN ,
SplitN : rp . SplitN ,
2014-11-06 06:03:35 +00:00
} )
2014-11-05 05:32:17 +00:00
}
2014-11-18 23:59:37 +00:00
// UnmarshalJSON decodes a JSON-encoded byte slice to a retention policy.
2014-11-26 13:40:48 +00:00
func ( rp * RetentionPolicy ) UnmarshalJSON ( data [ ] byte ) error {
2014-11-01 01:31:19 +00:00
// Decode into intermediate type.
2014-11-18 22:57:10 +00:00
var o retentionPolicyJSON
2014-11-01 01:31:19 +00:00
if err := json . Unmarshal ( data , & o ) ; err != nil {
return err
}
// Copy over properties from intermediate type.
2014-11-26 13:40:48 +00:00
rp . Name = o . Name
rp . ReplicaN = o . ReplicaN
rp . SplitN = o . SplitN
rp . Duration = o . Duration
rp . Shards = o . Shards
2014-11-01 01:31:19 +00:00
return nil
}
2014-11-18 22:57:10 +00:00
// retentionPolicyJSON represents an intermediate struct for JSON marshaling.
type retentionPolicyJSON struct {
Name string ` json:"name" `
ReplicaN uint32 ` json:"replicaN,omitempty" `
SplitN uint32 ` json:"splitN,omitempty" `
Duration time . Duration ` json:"duration,omitempty" `
Shards [ ] * Shard ` json:"shards,omitempty" `
2014-11-06 06:03:35 +00:00
}
2014-11-19 23:14:38 +00:00
// RetentionPolicies represents a list of shard policies.
type RetentionPolicies [ ] * RetentionPolicy
2014-11-06 06:03:35 +00:00
2014-11-18 22:57:10 +00:00
// Shards returns a list of all shards for all policies.
2014-11-26 13:40:48 +00:00
func ( rps RetentionPolicies ) Shards ( ) [ ] * Shard {
2014-11-06 06:03:35 +00:00
var shards [ ] * Shard
2014-11-26 13:40:48 +00:00
for _ , rp := range rps {
shards = append ( shards , rp . Shards ... )
2014-11-06 06:03:35 +00:00
}
return shards
}
// Series represents a series of timeseries points.
type Series struct {
2014-11-10 02:55:53 +00:00
Name string ` json:"name,omitempty" `
Fields [ ] * Field ` json:"fields,omitempty" `
}
func ( s * Series ) FieldsByNames ( names [ ] string ) ( a [ ] * Field ) {
for _ , f := range s . Fields {
for _ , name := range names {
if f . Name == name {
a = append ( a , f )
}
}
}
return
2014-11-06 06:03:35 +00:00
}
// Field represents a series field.
type Field struct {
ID uint64 ` json:"id,omitempty" `
Name string ` json:"name,omitempty" `
}
// String returns a string representation of the field.
func ( f * Field ) String ( ) string {
return fmt . Sprintf ( "Name: %s, ID: %d" , f . Name , f . ID )
}
// Fields represents a list of fields.
type Fields [ ] * Field
// Names returns a list of all field names.
func ( a Fields ) Names ( ) [ ] string {
names := make ( [ ] string , len ( a ) )
for i , f := range a {
names [ i ] = f . Name
2014-11-01 01:31:19 +00:00
}
2014-11-06 06:03:35 +00:00
return names
2014-11-01 01:31:19 +00:00
}