diff --git a/CHANGELOG.md b/CHANGELOG.md index 0abed18eb0..04a6a55ee0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ - [#6477](https://github.com/influxdata/influxdb/pull/6477): Don't catch SIGQUIT or SIGHUP signals. - [#6468](https://github.com/influxdata/influxdb/issues/6468): Panic with truncated wal segments - [#6491](https://github.com/influxdata/influxdb/pull/6491): Fix the CLI not to enter an infinite loop when the liner has an error. +- [#6457](https://github.com/influxdata/influxdb/issues/6457): Retention policy cleanup does not remove series ## v0.12.2 [2016-04-20] diff --git a/tsdb/meta.go b/tsdb/meta.go index 035a8d126e..a05cfc5cda 100644 --- a/tsdb/meta.go +++ b/tsdb/meta.go @@ -163,6 +163,29 @@ func (d *DatabaseIndex) AssignShard(k string, shardID uint64) { } } +// RemoveShard removes all references to shardID from any series or measurements +// in the index. If the shard was the only owner of data for the series, the series +// is removed from the index. +func (d *DatabaseIndex) RemoveShard(shardID uint64) { + d.mu.Lock() + defer d.mu.Unlock() + + for k, series := range d.series { + if series.Assigned(shardID) { + // Remove the shard from any series + series.UnassignShard(shardID) + + // If this series only had one shard assign, remove the series + if series.ShardN() == 0 { + for _, measurement := range d.measurements { + measurement.DropSeries(series.id) + } + } + delete(d.series, k) + } + } +} + // TagsForSeries returns the tag map for the passed in series func (d *DatabaseIndex) TagsForSeries(key string) map[string]string { d.mu.RLock() @@ -1386,6 +1409,12 @@ func (s *Series) AssignShard(shardID uint64) { s.mu.Unlock() } +func (s *Series) UnassignShard(shardID uint64) { + s.mu.Lock() + delete(s.shardIDs, shardID) + s.mu.Unlock() +} + func (s *Series) Assigned(shardID uint64) bool { s.mu.RLock() b := s.shardIDs[shardID] @@ -1393,6 +1422,13 @@ func (s *Series) Assigned(shardID uint64) bool { return b } +func (s *Series) ShardN() int { + s.mu.RLock() + n := len(s.shardIDs) + s.mu.RUnlock() + return n +} + // MarshalBinary encodes the object to a binary format. func (s *Series) MarshalBinary() ([]byte, error) { s.mu.RLock() diff --git a/tsdb/shard.go b/tsdb/shard.go index d012a318e7..b710a71e6d 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -185,6 +185,9 @@ func (s *Shard) close() error { return nil } + // Don't leak our shard ID and series keys in the index + s.index.RemoveShard(s.id) + err := s.engine.Close() if err == nil { s.engine = nil diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 0c926eb1d1..27644673ea 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -152,6 +152,48 @@ func TestShardWriteAddNewField(t *testing.T) { } } +// Ensures that when a shard is closed, it removes any series meta-data +// from the index. +func TestShard_Close_RemoveIndex(t *testing.T) { + tmpDir, _ := ioutil.TempDir("", "shard_test") + defer os.RemoveAll(tmpDir) + tmpShard := path.Join(tmpDir, "shard") + tmpWal := path.Join(tmpDir, "wal") + + index := tsdb.NewDatabaseIndex("db") + opts := tsdb.NewEngineOptions() + opts.Config.WALDir = filepath.Join(tmpDir, "wal") + + sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts) + + if err := sh.Open(); err != nil { + t.Fatalf("error opening shard: %s", err.Error()) + } + + pt := models.MustNewPoint( + "cpu", + map[string]string{"host": "server"}, + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err := sh.WritePoints([]models.Point{pt}) + if err != nil { + t.Fatalf(err.Error()) + } + + if got, exp := index.SeriesN(), 1; got != exp { + t.Fatalf("series count mismatch: got %v, exp %v", got, exp) + } + + // ensure the index gets loaded after closing and opening the shard + sh.Close() + + if got, exp := index.SeriesN(), 0; got != exp { + t.Fatalf("series count mismatch: got %v, exp %v", got, exp) + } +} + // Ensure a shard can create iterators for its underlying data. func TestShard_CreateIterator_Ascending(t *testing.T) { sh := NewShard()