diff --git a/tsdb/series_set.go b/tsdb/series_set.go index 2b9d28ad4f..6e774700c1 100644 --- a/tsdb/series_set.go +++ b/tsdb/series_set.go @@ -59,6 +59,13 @@ func (s *SeriesIDSet) RemoveNoLock(id uint64) { s.bitmap.Remove(uint32(id)) } +// Cardinality returns the cardinality of the SeriesIDSet. +func (s *SeriesIDSet) Cardinality() uint64 { + s.RLock() + defer s.RUnlock() + return s.bitmap.GetCardinality() +} + // Merge merged the contents of others into s. The caller does not need to // provide s as an argument, and the contents of s will always be present in s // after Merge returns. @@ -96,7 +103,8 @@ func (s *SeriesIDSet) Equals(other *SeriesIDSet) bool { return s.bitmap.Equals(other.bitmap) } -// AndNot returns the set of elements that only exist in s. +// AndNot returns a new SeriesIDSet containing elements that were present in s, +// but not present in other. func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet { s.RLock() defer s.RUnlock() @@ -122,7 +130,7 @@ func (s *SeriesIDSet) String() string { return s.bitmap.String() } -// Diff deletes any bits set in other. +// Diff removes from s any elements also present in other. func (s *SeriesIDSet) Diff(other *SeriesIDSet) { other.RLock() defer other.RUnlock() diff --git a/tsdb/shard.go b/tsdb/shard.go index 633313e345..77ec2bd692 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -411,15 +411,6 @@ func (s *Shard) LastModified() time.Time { return engine.LastModified() } -// UnloadIndex removes all references to this shard from the DatabaseIndex -func (s *Shard) UnloadIndex() { - s.mu.RLock() - defer s.mu.RUnlock() - if err := s.ready(); err != nil { - return - } -} - // Index returns a reference to the underlying index. It returns an error if // the index is nil. func (s *Shard) Index() (Index, error) { diff --git a/tsdb/store.go b/tsdb/store.go index 6640fa7740..6aea703c65 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -534,28 +534,67 @@ func (s *Store) DeleteShard(shardID uint64) error { return nil } - // Remove the shard from the database indexes before closing the shard. - // Closing the shard will do this as well, but it will unload it while - // the shard is locked which can block stats collection and other calls. - sh.UnloadIndex() - - if err := sh.Close(); err != nil { - return err - } - - if err := os.RemoveAll(sh.path); err != nil { - return err - } - - if err := os.RemoveAll(sh.walPath); err != nil { - return err - } - + // Remove the shard from Store so it's not returned to callers requesting + // shards. s.mu.Lock() delete(s.shards, shardID) s.mu.Unlock() - return nil + // Get the shard's local bitset of series IDs. + index, err := sh.Index() + if err != nil { + return err + } + + var ss *SeriesIDSet + if i, ok := index.(interface { + SeriesIDSet() *SeriesIDSet + }); ok { + ss = i.SeriesIDSet() + } + + db := sh.Database() + if err := sh.Close(); err != nil { + return err + } + + // Determine if the shard contained any series that are not present in any + // other shards in the database. + shards := s.filterShards(byDatabase(db)) + + s.walkShards(shards, func(sh *Shard) error { + index, err := sh.Index() + if err != nil { + return err + } + + if i, ok := index.(interface { + SeriesIDSet() *SeriesIDSet + }); ok { + ss.Diff(i.SeriesIDSet()) + } else { + return fmt.Errorf("unable to get series id set for index in shard at %s", sh.Path()) + } + return nil + }) + + // Remove any remaining series in the set from the series file, as they don't + // exist in any of the database's remaining shards. + if ss.Cardinality() > 0 { + sfile := s.seriesFile(db) + if sfile != nil { + ss.ForEach(func(id uint64) { + sfile.DeleteSeriesID(id) + }) + } + } + + // Remove the on-disk shard data. + if err := os.RemoveAll(sh.path); err != nil { + return err + } + + return os.RemoveAll(sh.walPath) } // DeleteDatabase will close all shards associated with a database and remove the directory and files from disk. diff --git a/tsdb/store_test.go b/tsdb/store_test.go index 1ddd913d04..a55ad5ac4c 100644 --- a/tsdb/store_test.go +++ b/tsdb/store_test.go @@ -162,27 +162,88 @@ func TestStore_DeleteSeries_NonExistentDB(t *testing.T) { func TestStore_DeleteShard(t *testing.T) { t.Parallel() - test := func(index string) { + test := func(index string) error { s := MustOpenStore(index) defer s.Close() // Create a new shard and verify that it exists. if err := s.CreateShard("db0", "rp0", 1, true); err != nil { - t.Fatal(err) + return err } else if sh := s.Shard(1); sh == nil { - t.Fatalf("expected shard") + return fmt.Errorf("expected shard") } - // Reopen shard and recheck. - if err := s.Reopen(); err != nil { - t.Fatal(err) - } else if sh := s.Shard(1); sh == nil { - t.Fatalf("shard exists") + // Create another shard. + if err := s.CreateShard("db0", "rp0", 2, true); err != nil { + return err + } else if sh := s.Shard(2); sh == nil { + return fmt.Errorf("expected shard") } + + // and another, but in a different db. + if err := s.CreateShard("db1", "rp0", 3, true); err != nil { + return err + } else if sh := s.Shard(3); sh == nil { + return fmt.Errorf("expected shard") + } + + // Write series data to the db0 shards. + s.MustWriteToShardString(1, "cpu,servera=a v=1", "cpu,serverb=b v=1", "mem,serverc=a v=1") + s.MustWriteToShardString(2, "cpu,servera=a v=1", "mem,serverc=a v=1") + + // Write similar data to db1 database + s.MustWriteToShardString(3, "cpu,serverb=b v=1") + + // Reopen the store and check all shards still exist + if err := s.Reopen(); err != nil { + return err + } + for i := uint64(1); i <= 3; i++ { + if sh := s.Shard(i); sh == nil { + return fmt.Errorf("shard %d missing", i) + } + } + + // Remove the first shard from the store. + if err := s.DeleteShard(1); err != nil { + return err + } + + // cpu,serverb=b should be removed from the series file for db0 because + // shard 1 was the only owner of that series. + // Verify by getting all tag keys. + keys, err := s.TagKeys(nil, []uint64{2}, nil) + if err != nil { + return err + } + + expKeys := []tsdb.TagKeys{ + {Measurement: "cpu", Keys: []string{"servera"}}, + {Measurement: "mem", Keys: []string{"serverc"}}, + } + if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) { + return fmt.Errorf("got keys %v, expected %v", got, exp) + } + + // Verify that the same series was not removed from other databases' + // series files. + if keys, err = s.TagKeys(nil, []uint64{3}, nil); err != nil { + return err + } + + expKeys = []tsdb.TagKeys{{Measurement: "cpu", Keys: []string{"serverb"}}} + if got, exp := keys, expKeys; !reflect.DeepEqual(got, exp) { + return fmt.Errorf("got keys %v, expected %v", got, exp) + } + return nil } for _, index := range tsdb.RegisteredIndexes() { - t.Run(index, func(t *testing.T) { test(index) }) + t.Run(index, func(t *testing.T) { + if err := test(index); err != nil { + t.Error(err) + } + }) } }