Merge pull request #5691 from influxdata/er-retention-policies

Clean up shard data when dropping retention policies
pull/5746/merge
Edd Robinson 2016-02-19 11:32:54 +00:00
commit 2da8321128
5 changed files with 219 additions and 76 deletions

View File

@ -14,8 +14,8 @@
- [#5666](https://github.com/influxdata/influxdb/pull/5666): Manage dependencies with gdm
- [#5512](https://github.com/influxdata/influxdb/pull/5512): HTTP: Add config option to enable HTTP JSON write path which is now disabled by default.
- [#5336](https://github.com/influxdata/influxdb/pull/5366): Enabled golint for influxql. @gabelev
- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup
cleanup
- [#5706](https://github.com/influxdata/influxdb/pull/5706): Cluster setup cleanup
- [#5691](https://github.com/influxdata/influxdb/pull/5691): Remove associated shard data when retention policies are dropped.
### Bugfixes

View File

@ -293,24 +293,29 @@ func (e *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabase
return err
}
// Retrieve a list of all shard ids.
var shardIDs []uint64
for _, rp := range dbi.RetentionPolicies {
for _, sg := range rp.ShardGroups {
for _, s := range sg.Shards {
shardIDs = append(shardIDs, s.ID)
}
}
}
// Remove the database from the local store
if err := e.TSDBStore.DeleteDatabase(stmt.Name, shardIDs); err != nil {
if err := e.TSDBStore.DeleteDatabase(stmt.Name); err != nil {
return err
}
return nil
}
// executeDropRetentionPolicy closes all local shards for the retention
// policy and removes the directory.
func (q *QueryExecutor) executeDropRetentionPolicy(stmt *influxql.DropRetentionPolicyStatement) error {
// Check if the database and retention policy exist.
if _, err := q.MetaClient.RetentionPolicy(stmt.Database, stmt.Name); err != nil {
return err
}
// Remove the retention policy from the local store.
if err := q.TSDBStore.DeleteRetentionPolicy(stmt.Database, stmt.Name); err != nil {
return err
}
return q.MetaClient.DropRetentionPolicy(stmt.Database, stmt.Name)
}
func (e *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) error {
return e.TSDBStore.DeleteMeasurement(database, stmt.Name)
}

View File

@ -412,7 +412,7 @@ func (c *Client) RetentionPolicy(database, name string) (rpi *RetentionPolicyInf
// TODO: This should not be handled here
if db == nil {
return nil, ErrDatabaseNotExists
return nil, influxdb.ErrDatabaseNotFound(database)
}
return db.RetentionPolicy(name), nil

View File

@ -35,8 +35,13 @@ type Store struct {
path string
databaseIndexes map[string]*DatabaseIndex
// shards is a map of shard IDs to Shards for *ALL DATABASES*.
shards map[uint64]*Shard
// shardLocations is a map of shard IDs to both the associated
// Shard, and meta information about where the shard is located on
// disk.
//
// shardLocations stores mappings for all shards on all databases.
shardLocations map[uint64]*shardLocation
EngineOptions EngineOptions
Logger *log.Logger
@ -70,7 +75,7 @@ func (s *Store) Open() error {
s.closing = make(chan struct{})
s.shards = map[uint64]*Shard{}
s.shardLocations = map[uint64]*shardLocation{}
s.databaseIndexes = map[string]*DatabaseIndex{}
s.Logger.Printf("Using data dir: %v", s.Path())
@ -136,7 +141,7 @@ func (s *Store) loadShards() error {
// Shard file names are numeric shardIDs
shardID, err := strconv.ParseUint(sh.Name(), 10, 64)
if err != nil {
s.Logger.Printf("Skipping shard: %s. Not a valid path", rp.Name())
s.Logger.Printf("%s is not a valid ID. Skipping shard.", sh.Name())
continue
}
@ -145,7 +150,8 @@ func (s *Store) loadShards() error {
if err != nil {
return fmt.Errorf("failed to open shard %d: %s", shardID, err)
}
s.shards[shardID] = shard
s.shardLocations[shardID] = &shardLocation{Database: db, RetentionPolicy: rp.Name(), Shard: shard}
}
}
}
@ -164,13 +170,13 @@ func (s *Store) Close() error {
}
s.wg.Wait()
for _, sh := range s.shards {
if err := sh.Close(); err != nil {
for _, sl := range s.shardLocations {
if err := sl.Shard.Close(); err != nil {
return err
}
}
s.opened = false
s.shards = nil
s.shardLocations = nil
s.databaseIndexes = nil
return nil
@ -187,7 +193,11 @@ func (s *Store) DatabaseIndexN() int {
func (s *Store) Shard(id uint64) *Shard {
s.mu.RLock()
defer s.mu.RUnlock()
return s.shards[id]
sl, ok := s.shardLocations[id]
if !ok {
return nil
}
return sl.Shard
}
// Shards returns a list of shards by id.
@ -196,11 +206,11 @@ func (s *Store) Shards(ids []uint64) []*Shard {
defer s.mu.RUnlock()
a := make([]*Shard, 0, len(ids))
for _, id := range ids {
sh := s.shards[id]
if sh == nil {
sl, ok := s.shardLocations[id]
if !ok {
continue
}
a = append(a, sh)
a = append(a, sl.Shard)
}
return a
}
@ -209,7 +219,7 @@ func (s *Store) Shards(ids []uint64) []*Shard {
func (s *Store) ShardN() int {
s.mu.RLock()
defer s.mu.RUnlock()
return len(s.shards)
return len(s.shardLocations)
}
// CreateShard creates a shard with the given id and retention policy on a database.
@ -224,7 +234,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
}
// shard already exists
if _, ok := s.shards[shardID]; ok {
if _, ok := s.shardLocations[shardID]; ok {
return nil
}
@ -252,7 +262,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
return err
}
s.shards[shardID] = shard
s.shardLocations[shardID] = &shardLocation{Database: database, RetentionPolicy: retentionPolicy, Shard: shard}
return nil
}
@ -261,41 +271,48 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64) er
func (s *Store) DeleteShard(shardID uint64) error {
s.mu.Lock()
defer s.mu.Unlock()
return s.deleteShard(shardID)
}
// deleteShard removes a shard from disk. Callers of deleteShard need
// to handle locks appropriately.
func (s *Store) deleteShard(shardID uint64) error {
// ensure shard exists
sh, ok := s.shards[shardID]
sl, ok := s.shardLocations[shardID]
if !ok {
return nil
}
if err := sh.Close(); err != nil {
if err := sl.Shard.Close(); err != nil {
return err
}
if err := os.RemoveAll(sh.path); err != nil {
if err := os.RemoveAll(sl.Shard.path); err != nil {
return err
}
if err := os.RemoveAll(sh.walPath); err != nil {
if err := os.RemoveAll(sl.Shard.walPath); err != nil {
return err
}
delete(s.shards, shardID)
delete(s.shardLocations, shardID)
return nil
}
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.
func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error {
// DeleteDatabase will close all shards associated with a database and
// remove the directory and files from disk.
func (s *Store) DeleteDatabase(name string) error {
s.mu.Lock()
defer s.mu.Unlock()
for _, id := range shardIDs {
shard := s.shards[id]
if shard != nil {
shard.Close()
// Close and delete all shards on the database.
for shardID, location := range s.shardLocations {
if location.IsDatabase(name) {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
}
}
delete(s.shards, id)
}
if err := os.RemoveAll(filepath.Join(s.path, name)); err != nil {
@ -306,10 +323,36 @@ func (s *Store) DeleteDatabase(name string, shardIDs []uint64) error {
}
delete(s.databaseIndexes, name)
return nil
}
// DeleteRetentionPolicy will close all shards associated with the
// provided retention policy, remove the retention policy directories on
// both the DB and WAL, and remove all shard files from disk.
func (s *Store) DeleteRetentionPolicy(database, name string) error {
s.mu.Lock()
defer s.mu.Unlock()
// Close and delete all shards under the retention policy on the
// database.
for shardID, location := range s.shardLocations {
if location.IsDatabase(database) && location.IsRetentionPolicy(name) {
// Delete the shard from disk.
if err := s.deleteShard(shardID); err != nil {
return err
}
}
}
// Remove the rentention policy folder.
if err := os.RemoveAll(filepath.Join(s.path, database, name)); err != nil {
return err
}
// Remove the retention policy folder from the the WAL.
return os.RemoveAll(filepath.Join(s.EngineOptions.Config.WALDir, database, name))
}
// DeleteMeasurement removes a measurement and all associated series from a database.
func (s *Store) DeleteMeasurement(database, name string) error {
s.mu.Lock()
@ -331,11 +374,12 @@ func (s *Store) DeleteMeasurement(database, name string) error {
db.DropMeasurement(m.Name)
// Remove underlying data.
for _, sh := range s.shards {
if sh.index != db {
for _, sl := range s.shardLocations {
if !sl.IsDatabase(database) {
continue
}
if err := sh.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil {
if err := sl.Shard.DeleteMeasurement(m.Name, m.SeriesKeys()); err != nil {
return err
}
}
@ -351,8 +395,8 @@ func (s *Store) ShardIDs() []uint64 {
}
func (s *Store) shardIDs() []uint64 {
a := make([]uint64, 0, len(s.shards))
for shardID := range s.shards {
a := make([]uint64, 0, len(s.shardLocations))
for shardID := range s.shardLocations {
a = append(a, shardID)
}
return a
@ -360,9 +404,9 @@ func (s *Store) shardIDs() []uint64 {
// shardsSlice returns an ordered list of shards.
func (s *Store) shardsSlice() []*Shard {
a := make([]*Shard, 0, len(s.shards))
for _, sh := range s.shards {
a = append(a, sh)
a := make([]*Shard, 0, len(s.shardLocations))
for _, sl := range s.shardLocations {
a = append(a, sl.Shard)
}
sort.Sort(Shards(a))
return a
@ -504,16 +548,15 @@ func (s *Store) DeleteSeries(database string, sources []influxql.Source, conditi
}
func (s *Store) deleteSeries(database string, seriesKeys []string) error {
db, ok := s.databaseIndexes[database]
if !ok {
if _, ok := s.databaseIndexes[database]; !ok {
return influxql.ErrDatabaseNotFound(database)
}
for _, sh := range s.shards {
if sh.index != db {
for _, sl := range s.shardLocations {
if !sl.IsDatabase(database) {
continue
}
if err := sh.DeleteSeries(seriesKeys); err != nil {
if err := sl.Shard.DeleteSeries(seriesKeys); err != nil {
return err
}
}
@ -535,14 +578,14 @@ func (s *Store) periodicMaintenance() {
}
}
// performMaintenance will loop through the shars and tell them
// to perform any maintenance tasks. Those tasks should kick off
// their own goroutines if it's anything that could take time.
// performMaintenance loops through shards and executes any maintenance
// tasks. Those tasks should run in their own goroutines if they will
// take significant time.
func (s *Store) performMaintenance() {
s.mu.Lock()
defer s.mu.Unlock()
for _, sh := range s.shards {
s.performMaintenanceOnShard(sh)
for _, sl := range s.shardLocations {
s.performMaintenanceOnShard(sl.Shard)
}
}
@ -625,12 +668,12 @@ func (s *Store) WriteToShard(shardID uint64, points []models.Point) error {
default:
}
sh, ok := s.shards[shardID]
sl, ok := s.shardLocations[shardID]
if !ok {
return ErrShardNotFound
}
return sh.WritePoints(points)
return sl.Shard.WritePoints(points)
}
func (s *Store) ExecuteShowFieldKeysStatement(stmt *influxql.ShowFieldKeysStatement, database string) (models.Rows, error) {
@ -887,6 +930,30 @@ func (s *Store) ExecuteShowTagValuesStatement(stmt *influxql.ShowTagValuesStatem
return rows, nil
}
// shardLocation is a wrapper around a shard that provides extra
// information about which database and retention policy the shard
// belongs to.
//
// shardLocation is safe for use from multiple goroutines.
type shardLocation struct {
mu sync.RWMutex
Database string
RetentionPolicy string
Shard *Shard
}
func (s *shardLocation) IsDatabase(db string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.Database == db
}
func (s *shardLocation) IsRetentionPolicy(rp string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.RetentionPolicy == rp
}
// IsRetryable returns true if this error is temporary and could be retried
func IsRetryable(err error) bool {
if err == nil {

View File

@ -16,6 +16,75 @@ import (
"github.com/influxdata/influxdb/tsdb"
)
// Ensure the store can delete a retention policy and all shards under
// it.
func TestStore_DeleteRetentionPolicy(t *testing.T) {
s := MustOpenStore()
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
}
// Create a new shard under the same retention policy, and verify
// that it exists.
if err := s.CreateShard("db0", "rp0", 2); err != nil {
t.Fatal(err)
} else if sh := s.Shard(2); sh == nil {
t.Fatalf("expected shard")
}
// Create a new shard under a different retention policy, and
// verify that it exists.
if err := s.CreateShard("db0", "rp1", 3); err != nil {
t.Fatal(err)
} else if sh := s.Shard(3); sh == nil {
t.Fatalf("expected shard")
}
// Deleting the rp0 retention policy does not return an error.
if err := s.DeleteRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}
// It deletes the shards under that retention policy.
if sh := s.Shard(1); sh != nil {
t.Errorf("shard 1 was not deleted")
}
if sh := s.Shard(2); sh != nil {
t.Errorf("shard 2 was not deleted")
}
// It deletes the retention policy directory.
if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp0")), false; got != exp {
t.Error("directory exists, but should have been removed")
}
// It deletes the WAL retention policy directory.
if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp0")), false; got != exp {
t.Error("directory exists, but should have been removed")
}
// Reopen other shard and check it still exists.
if err := s.Reopen(); err != nil {
t.Error(err)
} else if sh := s.Shard(3); sh == nil {
t.Errorf("shard 3 does not exist")
}
// It does not delete other retention policy directories.
if got, exp := dirExists(filepath.Join(s.Path(), "db0", "rp1")), true; got != exp {
t.Error("directory does not exist, but should")
}
if got, exp := dirExists(filepath.Join(s.EngineOptions.Config.WALDir, "db0", "rp1")), true; got != exp {
t.Error("directory does not exist, but should")
}
}
// Ensure the store can create a new shard.
func TestStore_CreateShard(t *testing.T) {
s := MustOpenStore()
@ -38,7 +107,7 @@ func TestStore_CreateShard(t *testing.T) {
}
// Reopen shard and recheck.
if s, err := ReopenStore(s); err != nil {
if err := s.Reopen(); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard(1)")
@ -60,7 +129,7 @@ func TestStore_DeleteShard(t *testing.T) {
}
// Reopen shard and recheck.
if s, err := ReopenStore(s); err != nil {
if err := s.Reopen(); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("shard exists")
@ -262,19 +331,13 @@ func MustOpenStore() *Store {
return s
}
// ReopenStore closes and reopens the store as a new store.
func ReopenStore(s *Store) (*Store, error) {
// Reopen closes and reopens the store as a new store.
func (s *Store) Reopen() error {
if err := s.Store.Close(); err != nil {
return nil, err
return err
}
other := &Store{Store: tsdb.NewStore(s.Path())}
other.EngineOptions = s.EngineOptions
if err := other.Open(); err != nil {
return nil, err
}
return other, nil
s.Store = tsdb.NewStore(s.Path())
return s.Open()
}
// Close closes the store and removes the underlying data.
@ -341,3 +404,11 @@ func ParseTags(s string) influxql.Tags {
}
return influxql.NewTags(m)
}
func dirExists(path string) bool {
var err error
if _, err = os.Stat(path); err == nil {
return true
}
return !os.IsNotExist(err)
}