Remove series when shard rolls over

Series should only be removed from the series file when they're no
longer present in any shard. This commit ensures that during a shard
rollover, the series local to the shard are checked against all other
series in the database.

Series that are no longer present in any other shards' bitsets, are then
marked as deleted in the series file.
pull/9319/head
Edd Robinson 2018-01-15 14:30:02 +00:00
parent ba3fcea53e
commit ceb3abd118
4 changed files with 137 additions and 38 deletions

View File

@ -59,6 +59,13 @@ func (s *SeriesIDSet) RemoveNoLock(id uint64) {
s.bitmap.Remove(uint32(id))
}
// Cardinality returns the cardinality of the SeriesIDSet.
func (s *SeriesIDSet) Cardinality() uint64 {
s.RLock()
defer s.RUnlock()
return s.bitmap.GetCardinality()
}
// Merge merged the contents of others into s. The caller does not need to
// provide s as an argument, and the contents of s will always be present in s
// after Merge returns.
@ -96,7 +103,8 @@ func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool {
return s.bitmap.Equals(other.bitmap)
}
// AndNot returns the set of elements that only exist in s.
// AndNot returns a new SeriesIDSet containing elements that were present in s,
// but not present in other.
func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet {
s.RLock()
defer s.RUnlock()
@ -122,7 +130,7 @@ func (s *SeriesIDSet) String() string {
return s.bitmap.String()
}
// Diff deletes any bits set in other.
// Diff removes from s any elements also present in other.
func (s *SeriesIDSet) Diff(other *SeriesIDSet) {
other.RLock()
defer other.RUnlock()

View File

@ -411,15 +411,6 @@ func (s *Shard) LastModified() time.Time {
return engine.LastModified()
}
// UnloadIndex removes all references to this shard from the DatabaseIndex
func (s *Shard) UnloadIndex() {
s.mu.RLock()
defer s.mu.RUnlock()
if err := s.ready(); err != nil {
return
}
}
// Index returns a reference to the underlying index. It returns an error if
// the index is nil.
func (s *Shard) Index() (Index, error) {

View File

@ -534,28 +534,67 @@ func (s *Store) DeleteShard(shardID uint64) error {
return nil
}
// Remove the shard from the database indexes before closing the shard.
// Closing the shard will do this as well, but it will unload it while
// the shard is locked which can block stats collection and other calls.
sh.UnloadIndex()
if err := sh.Close(); err != nil {
return err
}
if err := os.RemoveAll(sh.path); err != nil {
return err
}
if err := os.RemoveAll(sh.walPath); err != nil {
return err
}
// Remove the shard from Store so it's not returned to callers requesting
// shards.
s.mu.Lock()
delete(s.shards, shardID)
s.mu.Unlock()
return nil
// Get the shard's local bitset of series IDs.
index, err := sh.Index()
if err != nil {
return err
}
var ss *SeriesIDSet
if i, ok := index.(interface {
SeriesIDSet() *SeriesIDSet
}); ok {
ss = i.SeriesIDSet()
}
db := sh.Database()
if err := sh.Close(); err != nil {
return err
}
// Determine if the shard contained any series that are not present in any
// other shards in the database.
shards := s.filterShards(byDatabase(db))
s.walkShards(shards, func(sh *Shard) error {
index, err := sh.Index()
if err != nil {
return err
}
if i, ok := index.(interface {
SeriesIDSet() *SeriesIDSet
}); ok {
ss.Diff(i.SeriesIDSet())
} else {
return fmt.Errorf("unable to get series id set for index in shard at %s", sh.Path())
}
return nil
})
// Remove any remaining series in the set from the series file, as they don't
// exist in any of the database's remaining shards.
if ss.Cardinality() > 0 {
sfile := s.seriesFile(db)
if sfile != nil {
ss.ForEach(func(id uint64) {
sfile.DeleteSeriesID(id)
})
}
}
// Remove the on-disk shard data.
if err := os.RemoveAll(sh.path); err != nil {
return err
}
return os.RemoveAll(sh.walPath)
}
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.

View File

@ -162,27 +162,88 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
func TestStore_DeleteShard(t *testing.T) {
t.Parallel()
test := func(index string) {
test := func(index string) error {
s := MustOpenStore(index)
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
t.Fatal(err)
return err
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
return fmt.Errorf("expected shard")
}
// Reopen shard and recheck.
if err := s.Reopen(); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("shard exists")
// Create another shard.
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
return err
} else if sh := s.Shard(2); sh == nil {
return fmt.Errorf("expected shard")
}
// and another, but in a different db.
if err := s.CreateShard("db1", "rp0", 3, true); err != nil {
return err
} else if sh := s.Shard(3); sh == nil {
return fmt.Errorf("expected shard")
}
// Write series data to the db0 shards.
s.MustWriteToShardString(1, "cpu,servera=a v=1", "cpu,serverb=b v=1", "mem,serverc=a v=1")
s.MustWriteToShardString(2, "cpu,servera=a v=1", "mem,serverc=a v=1")
// Write similar data to db1 database
s.MustWriteToShardString(3, "cpu,serverb=b v=1")
// Reopen the store and check all shards still exist
if err := s.Reopen(); err != nil {
return err
}
for i := uint64(1); i <= 3; i++ {
if sh := s.Shard(i); sh == nil {
return fmt.Errorf("shard %d missing", i)
}
}
// Remove the first shard from the store.
if err := s.DeleteShard(1); err != nil {
return err
}
// cpu,serverb=b should be removed from the series file for db0 because
// shard 1 was the only owner of that series.
// Verify by getting all tag keys.
keys, err := s.TagKeys(nil, []uint64{2}, nil)
if err != nil {
return err
}
expKeys := []tsdb.TagKeys{
{Measurement: "cpu", Keys: []string{"servera"}},
{Measurement: "mem", Keys: []string{"serverc"}},
}
if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
return fmt.Errorf("got keys %v, expected %v", got, exp)
}
// Verify that the same series was not removed from other databases'
// series files.
if keys, err = s.TagKeys(nil, []uint64{3}, nil); err != nil {
return err
}
expKeys = []tsdb.TagKeys{{Measurement: "cpu", Keys: []string{"serverb"}}}
if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
return fmt.Errorf("got keys %v, expected %v", got, exp)
}
return nil
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
t.Error(err)
}
})
}
}