diff --git a/cmd/influxd/config.go b/cmd/influxd/config.go index eedffc6179..cf89ffe2ac 100644 --- a/cmd/influxd/config.go +++ b/cmd/influxd/config.go @@ -91,14 +91,10 @@ type Config struct { } `toml:"broker"` Data struct { - Dir string `toml:"dir"` - Port int `toml:"port"` - WriteBufferSize int `toml:"write-buffer-size"` - MaxOpenShards int `toml:"max-open-shards"` - PointBatchSize int `toml:"point-batch-size"` - WriteBatchSize int `toml:"write-batch-size"` - Engines map[string]toml.Primitive `toml:"engines"` - RetentionSweepPeriod Duration `toml:"retention-sweep-period"` + Dir string `toml:"dir"` + Port int `toml:"port"` + RetentionCheckEnabled bool `toml:"retention-check-enabled"` + RetentionCheckPeriod Duration `toml:"retention-check-period"` } `toml:"data"` Cluster struct { @@ -115,13 +111,13 @@ func NewConfig() *Config { u, _ := user.Current() c := &Config{} - c.Data.RetentionSweepPeriod = Duration(10 * time.Minute) c.Broker.Dir = filepath.Join(u.HomeDir, ".influxdb/broker") c.Broker.Port = DefaultBrokerPort c.Broker.Timeout = Duration(1 * time.Second) c.Data.Dir = filepath.Join(u.HomeDir, ".influxdb/data") c.Data.Port = DefaultDataPort - c.Data.WriteBufferSize = 1000 + c.Data.RetentionCheckEnabled = true + c.Data.RetentionCheckPeriod = Duration(10 * time.Minute) // Detect hostname (or set to localhost). if c.Hostname, _ = os.Hostname(); c.Hostname == "" { @@ -138,31 +134,6 @@ func NewConfig() *Config { return c } -// PointBatchSize returns the data point batch size, if set. -// If not set, the LevelDB point batch size is returned. -// If that is not set then the default point batch size is returned. -func (c *Config) PointBatchSize() int { - if c.Data.PointBatchSize != 0 { - return c.Data.PointBatchSize - } - return DefaultPointBatchSize -} - -// WriteBatchSize returns the data write batch size, if set. -// If not set, the LevelDB write batch size is returned. -// If that is not set then the default write batch size is returned. -func (c *Config) WriteBatchSize() int { - if c.Data.WriteBatchSize != 0 { - return c.Data.WriteBatchSize - } - return DefaultWriteBatchSize -} - -// MaxOpenShards returns the maximum number of shards to keep open at once. -func (c *Config) MaxOpenShards() int { - return c.Data.MaxOpenShards -} - // DataAddr returns the binding address the data server func (c *Config) DataAddr() string { return net.JoinHostPort(c.BindAddress, strconv.Itoa(c.Data.Port)) diff --git a/cmd/influxd/config_test.go b/cmd/influxd/config_test.go index b193e22f97..8735d0f85c 100644 --- a/cmd/influxd/config_test.go +++ b/cmd/influxd/config_test.go @@ -122,6 +122,12 @@ func TestParseConfig(t *testing.T) { if c.Data.Dir != "/tmp/influxdb/development/db" { t.Fatalf("data dir mismatch: %v", c.Data.Dir) } + if c.Data.RetentionCheckEnabled != true { + t.Fatalf("Retention check enabled mismatch: %v", c.Data.RetentionCheckEnabled) + } + if c.Data.RetentionCheckPeriod != main.Duration(5*time.Minute) { + t.Fatalf("Retention check period mismatch: %v", c.Data.RetentionCheckPeriod) + } if c.Cluster.Dir != "/tmp/influxdb/development/cluster" { t.Fatalf("cluster dir mismatch: %v", c.Cluster.Dir) @@ -217,6 +223,8 @@ dir = "/tmp/influxdb/development/broker" [data] dir = "/tmp/influxdb/development/db" +retention-check-enabled = true +retention-check-period = "5m" [cluster] dir = "/tmp/influxdb/development/cluster" diff --git a/cmd/influxd/run.go b/cmd/influxd/run.go index dc3092021e..d07d27ac18 100644 --- a/cmd/influxd/run.go +++ b/cmd/influxd/run.go @@ -11,6 +11,7 @@ import ( "path/filepath" "strconv" "strings" + "time" "github.com/influxdb/influxdb" "github.com/influxdb/influxdb/admin" @@ -56,6 +57,15 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B s := openServer(config.DataDir(), config.DataURL(), b, initializing, configExists, joinURLs, logWriter) s.SetAuthenticationEnabled(config.Authentication.Enabled) + // Enable retention policy enforcement if requested. + if config.Data.RetentionCheckEnabled { + interval := time.Duration(config.Data.RetentionCheckPeriod) + if err := s.StartRetentionPolicyEnforcement(interval); err != nil { + log.Fatalf("retention policy enforcement failed: %s", err.Error()) + } + log.Printf("broker enforcing retention policies with check interval of %s", interval) + } + // Start the server handler. Attach to broker if listening on the same port. if s != nil { sh := httpd.NewHandler(s, config.Authentication.Enabled, version) diff --git a/database.go b/database.go index f0f010934a..109d21b6e2 100644 --- a/database.go +++ b/database.go @@ -795,6 +795,26 @@ func (rp *RetentionPolicy) shardGroupByTimestamp(timestamp time.Time) *ShardGrou return nil } +// shardGroupByID returns the group in the policy for the given ID. +// Returns nil if group does not exist. +func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup { + for _, g := range rp.shardGroups { + if g.ID == shardID { + return g + } + } + return nil +} + +func (rp *RetentionPolicy) removeShardGroupByID(shardID uint64) { + for i, g := range rp.shardGroups { + if g.ID == shardID { + rp.shardGroups[i] = nil + rp.shardGroups = append(rp.shardGroups[:i], rp.shardGroups[i+1:]...) + } + } +} + // MarshalJSON encodes a retention policy to a JSON-encoded byte slice. func (rp *RetentionPolicy) MarshalJSON() ([]byte, error) { var o retentionPolicyJSON diff --git a/etc/config.sample.toml b/etc/config.sample.toml index d3efb9e10c..8901e68066 100644 --- a/etc/config.sample.toml +++ b/etc/config.sample.toml @@ -79,8 +79,13 @@ port = 8086 # Data node configuration. Data nodes are where the time-series data, in the form of # shards, is stored. [data] -dir = "/tmp/influxdb/development/db" -port = 8086 + dir = "/tmp/influxdb/development/db" + port = 8086 + + # Control whether retention policies are enforced and how long the system waits between + # enforcing those policies. + retention-check-enabled = true + retention-check-period = "10m" [cluster] # Location for cluster state storage. For storing state persistently across restarts. diff --git a/server.go b/server.go index b0da00d1d3..b2a764f246 100644 --- a/server.go +++ b/server.go @@ -66,6 +66,7 @@ const ( // Shard messages createShardGroupIfNotExistsMessageType = messaging.MessageType(0x40) + deleteShardGroupMessageType = messaging.MessageType(0x41) // Series messages createSeriesIfNotExistsMessageType = messaging.MessageType(0x50) @@ -80,10 +81,11 @@ const ( // Server represents a collection of metadata and raw metric data. type Server struct { - mu sync.RWMutex - id uint64 - path string - done chan struct{} // goroutine close notification + mu sync.RWMutex + id uint64 + path string + done chan struct{} // goroutine close notification + rpDone chan struct{} // retention policies goroutine close notification client MessagingClient // broker client index uint64 // highest broadcast index seen @@ -220,6 +222,10 @@ func (s *Server) Close() error { return ErrServerClosed } + if s.rpDone != nil { + close(s.rpDone) + } + // Remove path. s.path = "" @@ -288,6 +294,47 @@ func (s *Server) load() error { }) } +// StartRetentionPolicyEnforcement launches retention policy enforcement. +func (s *Server) StartRetentionPolicyEnforcement(checkInterval time.Duration) error { + if checkInterval == 0 { + return fmt.Errorf("retention policy check interval must be non-zero") + } + rpDone := make(chan struct{}, 0) + s.rpDone = rpDone + go func() { + for { + select { + case <-rpDone: + return + case <-time.After(checkInterval): + s.EnforceRetentionPolicies() + } + } + }() + return nil +} + +// EnforceRetentionPolicies ensures that data that is aging-out due to retention policies +// is removed from the server. +func (s *Server) EnforceRetentionPolicies() { + log.Println("retention policy enforcement check commencing") + + // Check all shard groups. + for _, db := range s.databases { + for _, rp := range db.policies { + for _, g := range rp.shardGroups { + if g.EndTime.Add(rp.Duration).Before(time.Now()) { + log.Printf("shard group %d, retention policy %s, database %s due for deletion", + g.ID, rp.Name, db.name) + if err := s.DeleteShardGroup(db.name, rp.Name, g.ID); err != nil { + log.Printf("failed to request deletion of shard group %d: %s", g.ID, err.Error()) + } + } + } + } + } +} + // Client retrieves the current messaging client. func (s *Server) Client() MessagingClient { s.mu.RLock() @@ -890,6 +937,69 @@ type createShardGroupIfNotExistsCommand struct { Timestamp time.Time `json:"timestamp"` } +// DeleteShardGroup deletes the shard group identified by shardID. +func (s *Server) DeleteShardGroup(database, policy string, shardID uint64) error { + c := &deleteShardGroupCommand{Database: database, Policy: policy, ID: shardID} + _, err := s.broadcast(deleteShardGroupMessageType, c) + return err +} + +// applyDeleteShardGroup deletes shard data from disk and updates the metastore. +func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) { + var c deleteShardGroupCommand + mustUnmarshalJSON(m.Data, &c) + + s.mu.Lock() + defer s.mu.Unlock() + + // Retrieve database. + db := s.databases[c.Database] + if s.databases[c.Database] == nil { + return ErrDatabaseNotFound + } + + // Validate retention policy. + rp := db.policies[c.Policy] + if rp == nil { + return ErrRetentionPolicyNotFound + } + + // If shard group no longer exists, then ignore request. This can occur if multiple + // data nodes triggered the deletion. + g := rp.shardGroupByID(c.ID) + if g == nil { + return nil + } + + for _, shard := range g.Shards { + // Ignore shards not on this server. + if !shard.HasDataNodeID(s.id) { + continue + } + + path := shard.store.Path() + shard.close() + if err := os.Remove(path); err != nil { + // Log, but keep going. This can happen if shards were deleted, but the server exited + // before it acknowledged the delete command. + log.Printf("error deleting shard %s, group ID %d, policy %s: %s", path, g.ID, rp.Name, err.Error()) + } + } + + // Remove from metastore. + rp.removeShardGroupByID(c.ID) + err = s.meta.mustUpdate(func(tx *metatx) error { + return tx.saveDatabase(db) + }) + return +} + +type deleteShardGroupCommand struct { + Database string `json:"database"` + Policy string `json:"policy"` + ID uint64 `json:"id"` +} + // User returns a user by username // Returns nil if the user does not exist. func (s *Server) User(name string) *User { @@ -2532,6 +2642,8 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) { err = s.applyDeleteRetentionPolicy(m) case createShardGroupIfNotExistsMessageType: err = s.applyCreateShardGroupIfNotExists(m) + case deleteShardGroupMessageType: + err = s.applyDeleteShardGroup(m) case setDefaultRetentionPolicyMessageType: err = s.applySetDefaultRetentionPolicy(m) case createSeriesIfNotExistsMessageType: diff --git a/server_test.go b/server_test.go index 7fe5b70b0d..e10d6501c1 100644 --- a/server_test.go +++ b/server_test.go @@ -715,7 +715,7 @@ func TestServer_SetDefaultRetentionPolicy(t *testing.T) { } } -// Ensure the server returns an error when setting the deafult retention policy to a non-existant one. +// Ensure the server returns an error when setting the default retention policy to a non-existant one. func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing.T) { s := OpenServer(NewMessagingClient()) defer s.Close() @@ -725,6 +725,51 @@ func TestServer_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testing. } } +// Ensure the server prohibits a zero check interval for retention policy enforcement. +func TestServer_StartRetentionPolicyEnforcement_ErrZeroInterval(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + if err := s.StartRetentionPolicyEnforcement(time.Duration(0)); err == nil { + t.Fatal("failed to prohibit retention policies zero check interval") + } +} + +func TestServer_EnforceRetentionPolices(t *testing.T) { + c := NewMessagingClient() + s := OpenServer(c) + defer s.Close() + s.CreateDatabase("foo") + s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "mypolicy", Duration: 30 * time.Minute}) + + // Create two shard groups for the the new retention policy -- 1 which will age out immediately + // the other in more than an hour. + s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(-1*time.Hour)) + s.CreateShardGroupIfNotExists("foo", "mypolicy", time.Now().Add(time.Hour)) + + // Check the two shard groups exist. + var g []*influxdb.ShardGroup + g, err := s.ShardGroups("foo") + if err != nil { + t.Fatal(err) + } else if len(g) != 2 { + t.Fatalf("expected 2 shard group but found %d", len(g)) + } + + // Run retention enforcement. + s.EnforceRetentionPolicies() + + // Ensure enforcement is in effect across restarts. + s.Restart() + + // First shard group should have been removed. + g, err = s.ShardGroups("foo") + if err != nil { + t.Fatal(err) + } else if len(g) != 1 { + t.Fatalf("expected 1 shard group but found %d", len(g)) + } +} + // Ensure the database can write data to the database. func TestServer_WriteSeries(t *testing.T) { c := NewMessagingClient() @@ -883,6 +928,41 @@ func TestServer_CreateShardGroupIfNotExist(t *testing.T) { } } +func TestServer_DeleteShardGroup(t *testing.T) { + s := OpenServer(NewMessagingClient()) + defer s.Close() + s.CreateDatabase("foo") + + if err := s.CreateRetentionPolicy("foo", &influxdb.RetentionPolicy{Name: "bar"}); err != nil { + t.Fatal(err) + } + + if err := s.CreateShardGroupIfNotExists("foo", "bar", time.Time{}); err != nil { + t.Fatal(err) + } + + // Get the new shard's ID. + var g []*influxdb.ShardGroup + g, err := s.ShardGroups("foo") + if err != nil { + t.Fatal(err) + } else if len(g) != 1 { + t.Fatalf("expected 1 shard group but found %d", len(g)) + } + id := g[0].ID + + // Delete the shard group and verify it's gone. + if err := s.DeleteShardGroup("foo", "bar", id); err != nil { + t.Fatal(err) + } + g, err = s.ShardGroups("foo") + if err != nil { + t.Fatal(err) + } else if len(g) != 0 { + t.Fatalf("expected 0 shard group but found %d", len(g)) + } +} + /* TODO(benbjohnson): Change test to not expose underlying series ids directly. func TestServer_Measurements(t *testing.T) { s := OpenServer(NewMessagingClient())