diff --git a/tsdb/engine.go b/tsdb/engine.go index 9fe90b4c23..9fcbffb968 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -81,6 +81,11 @@ type Engine interface { io.WriterTo } +// SeriesIDSets provides access to the total set of series IDs +type SeriesIDSets interface { + ForEach(f func(ids *SeriesIDSet)) error +} + // EngineFormat represents the format for an engine. type EngineFormat int @@ -150,7 +155,8 @@ type EngineOptions struct { CompactionLimiter limiter.Fixed CompactionThroughputLimiter limiter.Rate - Config Config + Config Config + SeriesIDSets SeriesIDSets } // NewEngineOptions returns the default options. diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 42fe92274c..4482ee5408 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -178,6 +178,9 @@ type Engine struct { compactionLimiter limiter.Fixed scheduler *scheduler + + // provides access to the total set of series IDs + seriesIDSets tsdb.SeriesIDSets } // NewEngine returns a new instance of Engine. @@ -219,6 +222,7 @@ func NewEngine(id uint64, idx tsdb.Index, database, path string, walPath string, stats: stats, compactionLimiter: opt.CompactionLimiter, scheduler: newScheduler(stats, opt.CompactionLimiter.Capacity()), + seriesIDSets: opt.SeriesIDSets, } if e.traceLogging { @@ -1359,7 +1363,15 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { // the series from the index. if len(seriesKeys) > 0 { buf := make([]byte, 1024) // For use when accessing series file. + ids := tsdb.NewSeriesIDSet() for _, k := range seriesKeys { + name, tags := models.ParseKey(k) + sid := e.sfile.SeriesID([]byte(name), tags, buf) + if sid == 0 { + return fmt.Errorf("unable to find id for series key %s during deletion", k) + } + id := (sid << 32) | e.id + // This key was crossed out earlier, skip it if k == nil { continue @@ -1379,30 +1391,38 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { i++ } - // Some cache values still exists, leave the series in the index. if hasCacheValues { continue } - // Remove the series from the series file and index. - - // TODO(edd): we need to first check with all other shards if it's - // OK to tombstone the series in the series file. - // - // Further, in the case of the inmem index, we should only remove - // the series from the index if we also tombstone it in the series - // file. - name, tags := models.ParseKey(k) - sid := e.sfile.SeriesID([]byte(name), tags, buf) - if sid == 0 { - return fmt.Errorf("unable to find id for series key %s during deletion", k) - } - // Remove the series from the index for this shard - id := (sid << 32) | e.id if err := e.index.UnassignShard(string(k), id, ts); err != nil { return err } + + // Add the id to the set of delete ids. + ids.Add(sid) + + } + + // Remove any series IDs for our set that still exist in other shards. + // We cannot remove these from the series file yet. + if err := e.seriesIDSets.ForEach(func(s *tsdb.SeriesIDSet) { + ids = ids.AndNot(s) + }); err != nil { + return err + } + + // Remove the remaining ids from the series file as they no longer exist + // in any shard. + var err error + ids.ForEach(func(id uint64) { + if err1 := e.sfile.DeleteSeriesID(id); err1 != nil { + err = err1 + } + }) + if err != nil { + return err } } diff --git a/tsdb/engine/tsm1/engine_test.go b/tsdb/engine/tsm1/engine_test.go index 534c398fe1..53929c45ec 100644 --- a/tsdb/engine/tsm1/engine_test.go +++ b/tsdb/engine/tsm1/engine_test.go @@ -1817,15 +1817,16 @@ func NewEngine(index string) (*Engine, error) { if err = sfile.Open(); err != nil { return nil, err } + seriesIDs := tsdb.NewSeriesIDSet() opt := tsdb.NewEngineOptions() opt.IndexVersion = index if index == "inmem" { opt.InmemIndex = inmem.NewIndex(db, sfile) } + opt.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{seriesIDs}) idxPath := filepath.Join(dbPath, "index") - seriesIDs := tsdb.NewSeriesIDSet() idx := tsdb.MustOpenIndex(1, db, idxPath, seriesIDs, sfile, opt) tsm1Engine := tsm1.NewEngine(1, idx, db, filepath.Join(root, "data"), filepath.Join(root, "wal"), sfile, opt).(*tsm1.Engine) @@ -2053,3 +2054,12 @@ func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) { itr.keys = itr.keys[1:] return s, nil } + +type seriesIDSets []*tsdb.SeriesIDSet + +func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error { + for _, v := range a { + f(v) + } + return nil +} diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index f5b5f4905b..f30c2f59a4 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -1050,6 +1050,7 @@ func (idx *ShardIndex) UnassignShard(key string, id uint64, ts int64) error { // TODO(edd): temporarily munging series id and shard id into same value, // to test prototype without having to change Index API. sid, shardID := id>>32, id&0xFFFFFFFF + idx.seriesIDSet.Remove(sid) return idx.Index.UnassignShard(key, shardID, ts) } diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index f8f6043e29..14eff6c8c4 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -528,17 +528,17 @@ func (i *Index) InitializeSeries(key, name []byte, tags models.Tags) error { // DropSeries drops the provided series from the index. func (i *Index) DropSeries(key []byte, ts int64) error { + // Extract measurement name. + name, tags := models.ParseKeyBytes(key) + partitionKey := tsdb.AppendSeriesKey(nil, name, tags) + // Remove from partition. - if err := i.partition(key).DropSeries(key, ts); err != nil { + if err := i.partition(partitionKey).DropSeries(key, ts); err != nil { return err } - // Extract measurement name. - name, _ := models.ParseKey(key) - mname := []byte(name) - // Check if that was the last series for the measurement in the entire index. - itr, err := i.MeasurementSeriesIDIterator(mname) + itr, err := i.MeasurementSeriesIDIterator(name) if err != nil { return err } else if itr == nil { @@ -554,7 +554,7 @@ func (i *Index) DropSeries(key []byte, ts int64) error { } // If no more series exist in the measurement then delete the measurement. - if err := i.DropMeasurement(mname); err != nil { + if err := i.DropMeasurement(name); err != nil { return err } return nil diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 0e7e59d71e..49d646d5c8 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -295,6 +295,10 @@ func (i *Partition) buildSeriesSet() error { return nil } + if i.sfile.IsDeleted(elem.SeriesID) { + continue + } + // Add id to series set. i.seriesSet.Add(elem.SeriesID) } @@ -605,19 +609,12 @@ func (i *Partition) DropSeries(key []byte, ts int64) error { i.mu.RLock() defer i.mu.RUnlock() - name, tags := models.ParseKey(key) - mname := []byte(name) - seriesID := i.sfile.SeriesID(mname, tags, nil) + name, tags := models.ParseKeyBytes(key) + seriesID := i.sfile.SeriesID(name, tags, nil) // Remove from series id set. i.seriesSet.Remove(seriesID) - // TODO(edd): this should only happen when there are no shards containing - // this series. - if err := i.sfile.DeleteSeriesID(seriesID); err != nil { - return err - } - return nil }(); err != nil { return err diff --git a/tsdb/series_set.go b/tsdb/series_set.go index 92f5d06d54..1322d4692c 100644 --- a/tsdb/series_set.go +++ b/tsdb/series_set.go @@ -71,3 +71,29 @@ func (s *SeriesIDSet) Merge(others ...*SeriesIDSet) { defer s.Unlock() s.bitmap = roaring.FastOr(bms...) } + +// AndNot returns the set of elements that only exist in s. +func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet { + s.RLock() + defer s.RUnlock() + other.RLock() + defer other.RUnlock() + + return &SeriesIDSet{bitmap: roaring.AndNot(s.bitmap, other.bitmap)} +} + +// ForEach calls f for each id in the set. +func (s *SeriesIDSet) ForEach(f func(id uint64)) { + s.RLock() + defer s.RUnlock() + itr := s.bitmap.Iterator() + for itr.HasNext() { + f(uint64(itr.Next())) + } +} + +func (s *SeriesIDSet) String() string { + s.RLock() + defer s.RUnlock() + return s.bitmap.String() +} diff --git a/tsdb/shard_test.go b/tsdb/shard_test.go index 226a87f1a8..9e7c90aa70 100644 --- a/tsdb/shard_test.go +++ b/tsdb/shard_test.go @@ -365,6 +365,7 @@ func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) { opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir), sfile.SeriesFile) + opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{}) sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts) if err := sh.Open(); err != nil { @@ -452,6 +453,7 @@ func TestShard_WritePoints_FieldConflictConcurrentQuery(t *testing.T) { opts := tsdb.NewEngineOptions() opts.Config.WALDir = filepath.Join(tmpDir, "wal") opts.InmemIndex = inmem.NewIndex(path.Base(tmpDir), sfile.SeriesFile) + opts.SeriesIDSets = seriesIDSets([]*tsdb.SeriesIDSet{}) sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts) if err := sh.Open(); err != nil { @@ -1924,3 +1926,12 @@ func (itr *seriesIterator) Next() (tsdb.SeriesElem, error) { itr.keys = itr.keys[1:] return s, nil } + +type seriesIDSets []*tsdb.SeriesIDSet + +func (a seriesIDSets) ForEach(f func(ids *tsdb.SeriesIDSet)) error { + for _, v := range a { + f(v) + } + return nil +} diff --git a/tsdb/store.go b/tsdb/store.go index 2fd18e4576..612933dd84 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -267,6 +267,9 @@ func (s *Store) loadShards() error { opt := s.EngineOptions opt.InmemIndex = idx + // Provide an implementation of the ShardIDSets + opt.SeriesIDSets = shardSet{store: s, db: db} + // Existing shards should continue to use inmem index. if _, err := os.Stat(filepath.Join(path, "index")); os.IsNotExist(err) { opt.IndexVersion = "inmem" @@ -486,6 +489,7 @@ func (s *Store) CreateShard(database, retentionPolicy string, shardID uint64, en // Copy index options and pass in shared index. opt := s.EngineOptions opt.InmemIndex = idx + opt.SeriesIDSets = shardSet{store: s, db: database} path := filepath.Join(s.path, database, retentionPolicy, strconv.FormatUint(shardID, 10)) shard := NewShard(shardID, path, walPath, sfile, opt) @@ -1731,3 +1735,28 @@ func relativePath(storePath, shardPath string) (string, error) { return name, nil } + +type shardSet struct { + store *Store + db string +} + +func (s shardSet) ForEach(f func(ids *SeriesIDSet)) error { + s.store.mu.RLock() + shards := s.store.filterShards(byDatabase(s.db)) + s.store.mu.RUnlock() + + for _, sh := range shards { + idx, err := sh.Index() + if err != nil { + return err + } + + if t, ok := idx.(interface { + SeriesIDSet() *SeriesIDSet + }); ok { + f(t.SeriesIDSet()) + } + } + return nil +}