Merge pull request #9319 from influxdata/er-shard-rollover

Remove series when shard rolls over
pull/9323/head
Edd Robinson 2018-01-16 16:23:48 +00:00 committed by GitHub
commit fd80af3117
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 137 additions and 38 deletions

View File

@ -59,6 +59,13 @@ func (s *SeriesIDSet) RemoveNoLock(id uint64) {
s.bitmap.Remove(uint32(id))
}
// Cardinality returns the cardinality of the SeriesIDSet.
func (s *SeriesIDSet) Cardinality() uint64 {
s.RLock()
defer s.RUnlock()
return s.bitmap.GetCardinality()
}
// Merge merged the contents of others into s. The caller does not need to
// provide s as an argument, and the contents of s will always be present in s
// after Merge returns.
@ -96,7 +103,8 @@ func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool {
return s.bitmap.Equals(other.bitmap)
}
// AndNot returns the set of elements that only exist in s.
// AndNot returns a new SeriesIDSet containing elements that were present in s,
// but not present in other.
func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet {
s.RLock()
defer s.RUnlock()
@ -122,7 +130,7 @@ func (s *SeriesIDSet) String() string {
return s.bitmap.String()
}
// Diff deletes any bits set in other.
// Diff removes from s any elements also present in other.
func (s *SeriesIDSet) Diff(other *SeriesIDSet) {
other.RLock()
defer other.RUnlock()

View File

@ -411,15 +411,6 @@ func (s *Shard) LastModified() time.Time {
return engine.LastModified()
}
// UnloadIndex removes all references to this shard from the DatabaseIndex
func (s *Shard) UnloadIndex() {
s.mu.RLock()
defer s.mu.RUnlock()
if err := s.ready(); err != nil {
return
}
}
// Index returns a reference to the underlying index. It returns an error if
// the index is nil.
func (s *Shard) Index() (Index, error) {

View File

@ -534,28 +534,67 @@ func (s *Store) DeleteShard(shardID uint64) error {
return nil
}
// Remove the shard from the database indexes before closing the shard.
// Closing the shard will do this as well, but it will unload it while
// the shard is locked which can block stats collection and other calls.
sh.UnloadIndex()
if err := sh.Close(); err != nil {
return err
}
if err := os.RemoveAll(sh.path); err != nil {
return err
}
if err := os.RemoveAll(sh.walPath); err != nil {
return err
}
// Remove the shard from Store so it's not returned to callers requesting
// shards.
s.mu.Lock()
delete(s.shards, shardID)
s.mu.Unlock()
return nil
// Get the shard's local bitset of series IDs.
index, err := sh.Index()
if err != nil {
return err
}
var ss *SeriesIDSet
if i, ok := index.(interface {
SeriesIDSet() *SeriesIDSet
}); ok {
ss = i.SeriesIDSet()
}
db := sh.Database()
if err := sh.Close(); err != nil {
return err
}
// Determine if the shard contained any series that are not present in any
// other shards in the database.
shards := s.filterShards(byDatabase(db))
s.walkShards(shards, func(sh *Shard) error {
index, err := sh.Index()
if err != nil {
return err
}
if i, ok := index.(interface {
SeriesIDSet() *SeriesIDSet
}); ok {
ss.Diff(i.SeriesIDSet())
} else {
return fmt.Errorf("unable to get series id set for index in shard at %s", sh.Path())
}
return nil
})
// Remove any remaining series in the set from the series file, as they don't
// exist in any of the database's remaining shards.
if ss.Cardinality() > 0 {
sfile := s.seriesFile(db)
if sfile != nil {
ss.ForEach(func(id uint64) {
sfile.DeleteSeriesID(id)
})
}
}
// Remove the on-disk shard data.
if err := os.RemoveAll(sh.path); err != nil {
return err
}
return os.RemoveAll(sh.walPath)
}
// DeleteDatabase will close all shards associated with a database and remove the directory and files from disk.

View File

@ -162,27 +162,88 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) {
func TestStore_DeleteShard(t *testing.T) {
t.Parallel()
test := func(index string) {
test := func(index string) error {
s := MustOpenStore(index)
defer s.Close()
// Create a new shard and verify that it exists.
if err := s.CreateShard("db0", "rp0", 1, true); err != nil {
t.Fatal(err)
return err
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("expected shard")
return fmt.Errorf("expected shard")
}
// Reopen shard and recheck.
if err := s.Reopen(); err != nil {
t.Fatal(err)
} else if sh := s.Shard(1); sh == nil {
t.Fatalf("shard exists")
// Create another shard.
if err := s.CreateShard("db0", "rp0", 2, true); err != nil {
return err
} else if sh := s.Shard(2); sh == nil {
return fmt.Errorf("expected shard")
}
// and another, but in a different db.
if err := s.CreateShard("db1", "rp0", 3, true); err != nil {
return err
} else if sh := s.Shard(3); sh == nil {
return fmt.Errorf("expected shard")
}
// Write series data to the db0 shards.
s.MustWriteToShardString(1, "cpu,servera=a v=1", "cpu,serverb=b v=1", "mem,serverc=a v=1")
s.MustWriteToShardString(2, "cpu,servera=a v=1", "mem,serverc=a v=1")
// Write similar data to db1 database
s.MustWriteToShardString(3, "cpu,serverb=b v=1")
// Reopen the store and check all shards still exist
if err := s.Reopen(); err != nil {
return err
}
for i := uint64(1); i <= 3; i++ {
if sh := s.Shard(i); sh == nil {
return fmt.Errorf("shard %d missing", i)
}
}
// Remove the first shard from the store.
if err := s.DeleteShard(1); err != nil {
return err
}
// cpu,serverb=b should be removed from the series file for db0 because
// shard 1 was the only owner of that series.
// Verify by getting all tag keys.
keys, err := s.TagKeys(nil, []uint64{2}, nil)
if err != nil {
return err
}
expKeys := []tsdb.TagKeys{
{Measurement: "cpu", Keys: []string{"servera"}},
{Measurement: "mem", Keys: []string{"serverc"}},
}
if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
return fmt.Errorf("got keys %v, expected %v", got, exp)
}
// Verify that the same series was not removed from other databases'
// series files.
if keys, err = s.TagKeys(nil, []uint64{3}, nil); err != nil {
return err
}
expKeys = []tsdb.TagKeys{{Measurement: "cpu", Keys: []string{"serverb"}}}
if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) {
return fmt.Errorf("got keys %v, expected %v", got, exp)
}
return nil
}
for _, index := range tsdb.RegisteredIndexes() {
t.Run(index, func(t *testing.T) { test(index) })
t.Run(index, func(t *testing.T) {
if err := test(index); err != nil {
t.Error(err)
}
})
}
}