Remove series from index when shard is closed

When a shard is closed and removed due to retention policy enforcement,
the series contained in the shard would still exists in the index causing
a memory leak.  Restarting the server would cause them not to be loaded.

Fixes #6457
pull/6485/head
Jason Wilder 2016-04-27 14:59:00 -06:00
parent 337d49bea3
commit 2bd5880d7a
4 changed files with 82 additions and 0 deletions

View File

@ -46,6 +46,7 @@
- [#6477](https://github.com/influxdata/influxdb/pull/6477): Don't catch SIGQUIT or SIGHUP signals.
- [#6468](https://github.com/influxdata/influxdb/issues/6468): Panic with truncated wal segments
- [#6491](https://github.com/influxdata/influxdb/pull/6491): Fix the CLI not to enter an infinite loop when the liner has an error.
- [#6457](https://github.com/influxdata/influxdb/issues/6457): Retention policy cleanup does not remove series
## v0.12.2 [2016-04-20]

View File

@ -163,6 +163,29 @@ func (d *DatabaseIndex) AssignShard(k string, shardID uint64) {
}
}
// RemoveShard removes all references to shardID from any series or measurements
// in the index. If the shard was the only owner of data for the series, the series
// is removed from the index.
func (d *DatabaseIndex) RemoveShard(shardID uint64) {
d.mu.Lock()
defer d.mu.Unlock()
for k, series := range d.series {
if series.Assigned(shardID) {
// Remove the shard from any series
series.UnassignShard(shardID)
// If this series only had one shard assign, remove the series
if series.ShardN() == 0 {
for _, measurement := range d.measurements {
measurement.DropSeries(series.id)
}
}
delete(d.series, k)
}
}
}
// TagsForSeries returns the tag map for the passed in series
func (d *DatabaseIndex) TagsForSeries(key string) map[string]string {
d.mu.RLock()
@ -1386,6 +1409,12 @@ func (s *Series) AssignShard(shardID uint64) {
s.mu.Unlock()
}
func (s *Series) UnassignShard(shardID uint64) {
s.mu.Lock()
delete(s.shardIDs, shardID)
s.mu.Unlock()
}
func (s *Series) Assigned(shardID uint64) bool {
s.mu.RLock()
b := s.shardIDs[shardID]
@ -1393,6 +1422,13 @@ func (s *Series) Assigned(shardID uint64) bool {
return b
}
func (s *Series) ShardN() int {
s.mu.RLock()
n := len(s.shardIDs)
s.mu.RUnlock()
return n
}
// MarshalBinary encodes the object to a binary format.
func (s *Series) MarshalBinary() ([]byte, error) {
s.mu.RLock()

View File

@ -185,6 +185,9 @@ func (s *Shard) close() error {
return nil
}
// Don't leak our shard ID and series keys in the index
s.index.RemoveShard(s.id)
err := s.engine.Close()
if err == nil {
s.engine = nil

View File

@ -152,6 +152,48 @@ func TestShardWriteAddNewField(t *testing.T) {
}
}
// Ensures that when a shard is closed, it removes any series meta-data
// from the index.
func TestShard_Close_RemoveIndex(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "shard_test")
defer os.RemoveAll(tmpDir)
tmpShard := path.Join(tmpDir, "shard")
tmpWal := path.Join(tmpDir, "wal")
index := tsdb.NewDatabaseIndex("db")
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, tmpWal, opts)
if err := sh.Open(); err != nil {
t.Fatalf("error opening shard: %s", err.Error())
}
pt := models.MustNewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := sh.WritePoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error())
}
if got, exp := index.SeriesN(), 1; got != exp {
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
}
// ensure the index gets loaded after closing and opening the shard
sh.Close()
if got, exp := index.SeriesN(), 0; got != exp {
t.Fatalf("series count mismatch: got %v, exp %v", got, exp)
}
}
// Ensure a shard can create iterators for its underlying data.
func TestShard_CreateIterator_Ascending(t *testing.T) {
sh := NewShard()