From 2bd5880d7ab5b3c997a5a64f4ae2ef4b8a762216 Mon Sep 17 00:00:00 2001
From: Jason Wilder <mail@jasonwilder.com>
Date: Wed, 27 Apr 2016 14:59:00 -0600
Subject: [PATCH] Remove series from index when shard is closed

When a shard is closed and removed due to retention policy enforcement,
the series contained in the shard would still exists in the index causing
a memory leak.  Restarting the server would cause them not to be loaded.

Fixes #6457
---
 CHANGELOG.md       |  1 +
 tsdb/meta.go       | 36 ++++++++++++++++++++++++++++++++++++
 tsdb/shard.go      |  3 +++
 tsdb/shard_test.go | 42 ++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 82 insertions(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0abed18eb0..04a6a55ee0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -46,6 +46,7 @@
 - [#6477](https://github.com/influxdata/influxdb/pull/6477): Don't catch SIGQUIT or SIGHUP signals.
 - [#6468](https://github.com/influxdata/influxdb/issues/6468): Panic with truncated wal segments
 - [#6491](https://github.com/influxdata/influxdb/pull/6491): Fix the CLI not to enter an infinite loop when the liner has an error.
+- [#6457](https://github.com/influxdata/influxdb/issues/6457): Retention policy cleanup does not remove series
 
 ## v0.12.2 [2016-04-20]
 
diff --git a/tsdb/meta.go b/tsdb/meta.go
index 035a8d126e..a05cfc5cda 100644
--- a/tsdb/meta.go
+++ b/tsdb/meta.go
@@ -163,6 +163,29 @@ func (d *DatabaseIndex) AssignShard(k string, shardID uint64) {
 	}
 }
 
+// RemoveShard removes all references to shardID from any series or measurements
+// in the index.  If the shard was the only owner of data for the series, the series
+// is removed from the index.
+func (d *DatabaseIndex) RemoveShard(shardID uint64) {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+
+	for k, series := range d.series {
+		if series.Assigned(shardID) {
+			// Remove the shard from any series
+			series.UnassignShard(shardID)
+
+			// If this series only had one shard assign, remove the series
+			if series.ShardN() == 0 {
+				for _, measurement := range d.measurements {
+					measurement.DropSeries(series.id)
+				}
+			}
+			delete(d.series, k)
+		}
+	}
+}
+
 // TagsForSeries returns the tag map for the passed in series
 func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
 	d.mu.RLock()
@@ -1386,6 +1409,12 @@ func (s *Series) AssignShard(shardID uint64) {
 	s.mu.Unlock()
 }
 
+func (s *Series) UnassignShard(shardID uint64) {
+	s.mu.Lock()
+	delete(s.shardIDs, shardID)
+	s.mu.Unlock()
+}
+
 func (s *Series) Assigned(shardID uint64) bool {
 	s.mu.RLock()
 	b := s.shardIDs[shardID]
@@ -1393,6 +1422,13 @@ func (s *Series) Assigned(shardID uint64) bool {
 	return b
 }
 
+func (s *Series) ShardN() int {
+	s.mu.RLock()
+	n := len(s.shardIDs)
+	s.mu.RUnlock()
+	return n
+}
+
 // MarshalBinary encodes the object to a binary format.
 func (s *Series) MarshalBinary() ([]byte, error) {
 	s.mu.RLock()
diff --git a/tsdb/shard.go b/tsdb/shard.go
index d012a318e7..b710a71e6d 100644
--- a/tsdb/shard.go
+++ b/tsdb/shard.go
@@ -185,6 +185,9 @@ func (s *Shard) close() error {
 		return nil
 	}
 
+	// Don't leak our shard ID and series keys in the index
+	s.index.RemoveShard(s.id)
+
 	err := s.engine.Close()
 	if err == nil {
 		s.engine = nil
diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go
index 0c926eb1d1..27644673ea 100644
--- a/tsdb/shard_test.go
+++ b/tsdb/shard_test.go
@@ -152,6 +152,48 @@ func TestShardWriteAddNewField(t *testing.T) {
 	}
 }
 
+// Ensures that when a shard is closed, it removes any series meta-data
+// from the index.
+func TestShard_Close_RemoveIndex(t *testing.T) {
+	tmpDir, _ := ioutil.TempDir("", "shard_test")
+	defer os.RemoveAll(tmpDir)
+	tmpShard := path.Join(tmpDir, "shard")
+	tmpWal := path.Join(tmpDir, "wal")
+
+	index := tsdb.NewDatabaseIndex("db")
+	opts := tsdb.NewEngineOptions()
+	opts.Config.WALDir = filepath.Join(tmpDir, "wal")
+
+	sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
+
+	if err := sh.Open(); err != nil {
+		t.Fatalf("error opening shard: %s", err.Error())
+	}
+
+	pt := models.MustNewPoint(
+		"cpu",
+		map[string]string{"host": "server"},
+		map[string]interface{}{"value": 1.0},
+		time.Unix(1, 2),
+	)
+
+	err := sh.WritePoints([]models.Point{pt})
+	if err != nil {
+		t.Fatalf(err.Error())
+	}
+
+	if got, exp := index.SeriesN(), 1; got != exp {
+		t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
+	}
+
+	// ensure the index gets loaded after closing and opening the shard
+	sh.Close()
+
+	if got, exp := index.SeriesN(), 0; got != exp {
+		t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
+	}
+}
+
 // Ensure a shard can create iterators for its underlying data.
 func TestShard_CreateIterator_Ascending(t *testing.T) {
 	sh := NewShard()