From 0a2f6191a63b729b549861d21043f4dcdbd82568 Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Wed, 21 Nov 2018 17:53:22 -0700 Subject: [PATCH 1/2] tsdb: clean up fields index for every kind of delete Before this, if you deleted everything with `delete where true` for example, then you would be left with all of your measurements in the fields index. That would cause ghost fields to reappear if someone reinserted to the measurement. This fixes that by making it so the deepest most delete code checks if the measurement was removed from the index, and if so cleaning it up out of the fields index. Additionally, it fixes bugs in that cleanup code where if you had a measurement like "m1" and "m10", when iterating over the cache or file store, "m1" would match "m10" due to it only checking the prefix. This also has it check the character right after the measurement to be either a comma because tags started, or the first character of the field separator. --- tsdb/engine/tsm1/engine.go | 35 +++++++++++++++++++---------------- tsdb/index.go | 2 +- tsdb/index/inmem/inmem.go | 10 +++++----- tsdb/index/tsi1/index.go | 8 ++++---- 4 files changed, 29 insertions(+), 26 deletions(-) diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 7d83e64217..3240deceaa 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1667,8 +1667,19 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { ids.Add(sid) } + fielsetChanged := false for k := range measurements { - if err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil { + if dropped, err := e.index.DropMeasurementIfSeriesNotExist([]byte(k)); err != nil { + return err + } else if dropped { + if err := e.cleanupMeasurement([]byte(k)); err != nil { + return err + } + fielsetChanged = true + } + } + if fielsetChanged { + if err := e.fieldset.Save(); err != nil { return err } } @@ -1688,9 +1699,6 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { name, tags := e.sfile.Series(id) if err1 := e.sfile.DeleteSeriesID(id); err1 != nil { err = err1 - } - - if err != nil { return } @@ -1711,13 +1719,7 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { return nil } -// DeleteMeasurement deletes a measurement and all related series. -func (e *Engine) DeleteMeasurement(name []byte) error { - // Delete the bulk of data outside of the fields lock. - if err := e.deleteMeasurement(name); err != nil { - return err - } - +func (e *Engine) cleanupMeasurement(name []byte) error { // A sentinel error message to cause DeleteWithLock to not delete the measurement abortErr := fmt.Errorf("measurements still exist") @@ -1726,10 +1728,11 @@ func (e *Engine) DeleteMeasurement(name []byte) error { // were writes to the measurement while we are deleting it. if err := e.fieldset.DeleteWithLock(string(name), func() error { encodedName := models.EscapeMeasurement(name) + sep := len(encodedName) // First scan the cache to see if any series exists for this measurement. if err := e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { - if bytes.HasPrefix(k, encodedName) { + if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) { return abortErr } return nil @@ -1738,8 +1741,8 @@ func (e *Engine) DeleteMeasurement(name []byte) error { } // Check the filestore. - return e.FileStore.WalkKeys(name, func(k []byte, typ byte) error { - if bytes.HasPrefix(k, encodedName) { + return e.FileStore.WalkKeys(name, func(k []byte, _ byte) error { + if bytes.HasPrefix(k, encodedName) && (k[sep] == ',' || k[sep] == keyFieldSeparator[0]) { return abortErr } return nil @@ -1750,11 +1753,11 @@ func (e *Engine) DeleteMeasurement(name []byte) error { return err } - return e.fieldset.Save() + return nil } // DeleteMeasurement deletes a measurement and all related series. -func (e *Engine) deleteMeasurement(name []byte) error { +func (e *Engine) DeleteMeasurement(name []byte) error { // Attempt to find the series keys. indexSet := tsdb.IndexSet{Indexes: []tsdb.Index{e.index}, SeriesFile: e.sfile} itr, err := indexSet.MeasurementSeriesByExprIterator(name, nil) diff --git a/tsdb/index.go b/tsdb/index.go index eff3292d23..da6ca6d673 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -42,7 +42,7 @@ type Index interface { CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error DropSeries(seriesID uint64, key []byte, cascade bool) error - DropMeasurementIfSeriesNotExist(name []byte) error + DropMeasurementIfSeriesNotExist(name []byte) (bool, error) // Used to clean up series in inmem index that were dropped with a shard. DropSeriesGlobal(key []byte) error diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 0afbb8ebd5..024dfaac0e 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -756,20 +756,20 @@ func (i *Index) dropMeasurement(name string) error { // DropMeasurementIfSeriesNotExist drops a measurement only if there are no more // series for the measurment. -func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error { +func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { i.mu.Lock() defer i.mu.Unlock() m := i.measurements[string(name)] if m == nil { - return nil + return false, nil } if m.HasSeries() { - return nil + return false, nil } - return i.dropMeasurement(string(name)) + return true, i.dropMeasurement(string(name)) } // DropSeriesGlobal removes the series key and its tags from the index. @@ -1110,7 +1110,7 @@ func (idx *ShardIndex) DropSeries(seriesID uint64, _ []byte, _ bool) error { // DropMeasurementIfSeriesNotExist drops a measurement only if there are no more // series for the measurment. -func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) error { +func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { return idx.Index.DropMeasurementIfSeriesNotExist(name) } diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index 033585c601..7c59bb1dec 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -832,16 +832,16 @@ func (i *Index) DropSeriesGlobal(key []byte) error { return nil } // DropMeasurementIfSeriesNotExist drops a measurement only if there are no more // series for the measurment. -func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) error { +func (i *Index) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { // Check if that was the last series for the measurement in the entire index. if ok, err := i.MeasurementHasSeries(name); err != nil { - return err + return false, err } else if ok { - return nil + return false, nil } // If no more series exist in the measurement then delete the measurement. - return i.DropMeasurement(name) + return true, i.DropMeasurement(name) } // MeasurementsSketches returns the two measurement sketches for the index. From 259f3fe6e56f1652c4a080d42fcbaab7054969cb Mon Sep 17 00:00:00 2001 From: Jeff Wendling Date: Tue, 27 Nov 2018 16:59:17 -0700 Subject: [PATCH 2/2] tsdb: consider measurement drops per shard on inmem --- tsdb/index/inmem/inmem.go | 60 ++++++++++++++++++++++++++-------- tsdb/index/inmem/inmem_test.go | 41 +++++++++++++++++++++++ 2 files changed, 87 insertions(+), 14 deletions(-) diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 024dfaac0e..bad6580215 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -186,7 +186,9 @@ func (i *Index) MeasurementIterator() (tsdb.MeasurementIterator, error) { // CreateSeriesListIfNotExists adds the series for the given measurement to the // index and sets its ID or returns the existing series object -func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys, names [][]byte, tagsSlice []models.Tags, opt *tsdb.EngineOptions, ignoreLimits bool) error { +func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, measurements map[string]int, + keys, names [][]byte, tagsSlice []models.Tags, opt *tsdb.EngineOptions, ignoreLimits bool) error { + seriesIDs, err := i.sfile.CreateSeriesListIfNotExists(names, tagsSlice) if err != nil { return err @@ -212,6 +214,7 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys, seriesIDSet.Lock() if !seriesIDSet.ContainsNoLock(ss.ID) { seriesIDSet.AddNoLock(ss.ID) + measurements[ss.Measurement.Name]++ } seriesIDSet.Unlock() } @@ -247,6 +250,7 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys, seriesIDSet.Lock() if !seriesIDSet.ContainsNoLock(ss.ID) { seriesIDSet.AddNoLock(ss.ID) + measurements[ss.Measurement.Name]++ } seriesIDSet.Unlock() } @@ -278,7 +282,10 @@ func (i *Index) CreateSeriesListIfNotExists(seriesIDSet *tsdb.SeriesIDSet, keys, i.seriesSketch.Add(key) // This series needs to be added to the bitset tracking undeleted series IDs. - seriesIDSet.Add(seriesIDs[j]) + seriesIDSet.Lock() + seriesIDSet.AddNoLock(seriesIDs[j]) + measurements[mms[j].Name]++ + seriesIDSet.Unlock() } return nil @@ -1054,7 +1061,9 @@ func (i *Index) Rebuild() { // assignExistingSeries assigns the existing series to shardID and returns the series, names and tags that // do not exists yet. -func (i *Index) assignExistingSeries(shardID uint64, seriesIDSet *tsdb.SeriesIDSet, keys, names [][]byte, tagsSlice []models.Tags) ([][]byte, [][]byte, []models.Tags) { +func (i *Index) assignExistingSeries(shardID uint64, seriesIDSet *tsdb.SeriesIDSet, measurements map[string]int, + keys, names [][]byte, tagsSlice []models.Tags) ([][]byte, [][]byte, []models.Tags) { + i.mu.RLock() var n int for j, key := range keys { @@ -1070,6 +1079,7 @@ func (i *Index) assignExistingSeries(shardID uint64, seriesIDSet *tsdb.SeriesIDS seriesIDSet.Lock() if !seriesIDSet.ContainsNoLock(ss.ID) { seriesIDSet.AddNoLock(ss.ID) + measurements[string(names[j])]++ } seriesIDSet.Unlock() } @@ -1093,16 +1103,27 @@ type ShardIndex struct { // Bitset storing all undeleted series IDs associated with this shard. seriesIDSet *tsdb.SeriesIDSet + // mapping of measurements to the count of series ids in the set. protected + // by the seriesIDSet lock. + measurements map[string]int + opt tsdb.EngineOptions } // DropSeries removes the provided series id from the local bitset that tracks // series in this shard only. -func (idx *ShardIndex) DropSeries(seriesID uint64, _ []byte, _ bool) error { +func (idx *ShardIndex) DropSeries(seriesID uint64, key []byte, _ bool) error { // Remove from shard-local bitset if it exists. idx.seriesIDSet.Lock() if idx.seriesIDSet.ContainsNoLock(seriesID) { idx.seriesIDSet.RemoveNoLock(seriesID) + + name := models.ParseName(key) + if curr := idx.measurements[string(name)]; curr <= 1 { + delete(idx.measurements, string(name)) + } else { + idx.measurements[string(name)] = curr - 1 + } } idx.seriesIDSet.Unlock() return nil @@ -1111,12 +1132,22 @@ func (idx *ShardIndex) DropSeries(seriesID uint64, _ []byte, _ bool) error { // DropMeasurementIfSeriesNotExist drops a measurement only if there are no more // series for the measurment. func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { - return idx.Index.DropMeasurementIfSeriesNotExist(name) + idx.seriesIDSet.Lock() + curr := idx.measurements[string(name)] + idx.seriesIDSet.Unlock() + if curr > 0 { + return false, nil + } + + // we always report the measurement was dropped if it does not exist in our + // measurements mapping. + _, err := idx.Index.DropMeasurementIfSeriesNotExist(name) + return err == nil, err } // CreateSeriesListIfNotExists creates a list of series if they doesn't exist in bulk. func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSlice []models.Tags) error { - keys, names, tagsSlice = idx.assignExistingSeries(idx.id, idx.seriesIDSet, keys, names, tagsSlice) + keys, names, tagsSlice = idx.assignExistingSeries(idx.id, idx.seriesIDSet, idx.measurements, keys, names, tagsSlice) if len(keys) == 0 { return nil } @@ -1165,7 +1196,7 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli keys, names, tagsSlice = keys[:n], names[:n], tagsSlice[:n] } - if err := idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, keys, names, tagsSlice, &idx.opt, idx.opt.Config.MaxSeriesPerDatabase == 0); err != nil { + if err := idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, idx.measurements, keys, names, tagsSlice, &idx.opt, idx.opt.Config.MaxSeriesPerDatabase == 0); err != nil { reason = err.Error() droppedKeys = append(droppedKeys, keys...) } @@ -1174,7 +1205,7 @@ func (idx *ShardIndex) CreateSeriesListIfNotExists(keys, names [][]byte, tagsSli if len(droppedKeys) > 0 { dropped := len(droppedKeys) // number dropped before deduping bytesutil.SortDedup(droppedKeys) - return &tsdb.PartialWriteError{ + return tsdb.PartialWriteError{ Reason: reason, Dropped: dropped, DroppedKeys: droppedKeys, @@ -1194,13 +1225,13 @@ func (idx *ShardIndex) SeriesN() int64 { // InitializeSeries is called during start-up. // This works the same as CreateSeriesListIfNotExists except it ignore limit errors. func (idx *ShardIndex) InitializeSeries(keys, names [][]byte, tags []models.Tags) error { - return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, keys, names, tags, &idx.opt, true) + return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, idx.measurements, keys, names, tags, &idx.opt, true) } // CreateSeriesIfNotExists creates the provided series on the index if it is not // already present. func (idx *ShardIndex) CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error { - return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, false) + return idx.Index.CreateSeriesListIfNotExists(idx.seriesIDSet, idx.measurements, [][]byte{key}, [][]byte{name}, []models.Tags{tags}, &idx.opt, false) } // TagSets returns a list of tag sets based on series filtering. @@ -1216,10 +1247,11 @@ func (idx *ShardIndex) SeriesIDSet() *tsdb.SeriesIDSet { // NewShardIndex returns a new index for a shard. func NewShardIndex(id uint64, seriesIDSet *tsdb.SeriesIDSet, opt tsdb.EngineOptions) tsdb.Index { return &ShardIndex{ - Index: opt.InmemIndex.(*Index), - id: id, - seriesIDSet: seriesIDSet, - opt: opt, + Index: opt.InmemIndex.(*Index), + id: id, + seriesIDSet: seriesIDSet, + measurements: make(map[string]int), + opt: opt, } } diff --git a/tsdb/index/inmem/inmem_test.go b/tsdb/index/inmem/inmem_test.go index 0ae2e3ccdd..c4a8ff097b 100644 --- a/tsdb/index/inmem/inmem_test.go +++ b/tsdb/index/inmem/inmem_test.go @@ -115,6 +115,47 @@ func TestIndex_Bytes(t *testing.T) { } } +func TestIndex_MeasurementTracking(t *testing.T) { + sfile := mustOpenSeriesFile() + defer sfile.Close() + opt := tsdb.EngineOptions{InmemIndex: inmem.NewIndex("foo", sfile.SeriesFile)} + s1 := inmem.NewShardIndex(1, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex) + s2 := inmem.NewShardIndex(2, tsdb.NewSeriesIDSet(), opt).(*inmem.ShardIndex) + b := func(s string) []byte { return []byte(s) } + mt := func(k, v string) models.Tag { return models.Tag{Key: b(k), Value: b(v)} } + + s1.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")}) + s1.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")}) + s2.CreateSeriesIfNotExists(b("m,t=t1"), b("m"), models.Tags{mt("t", "t1")}) + s2.CreateSeriesIfNotExists(b("m,t=t2"), b("m"), models.Tags{mt("t", "t2")}) + series1, _ := s1.Series(b("m,t=t1")) + series2, _ := s1.Series(b("m,t=t2")) + + if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok { + t.Fatal("invalid drop") + } + if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok { + t.Fatal("invalid drop") + } + + s1.DropSeries(series1.ID, b(series1.Key), false) + s1.DropSeries(series2.ID, b(series2.Key), false) + + if ok, err := s1.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok { + t.Fatal("invalid drop") + } + if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || ok { + t.Fatal("invalid drop") + } + + s2.DropSeries(series1.ID, b(series1.Key), false) + s2.DropSeries(series2.ID, b(series2.Key), false) + + if ok, err := s2.DropMeasurementIfSeriesNotExist(b("m")); err != nil || !ok { + t.Fatal("invalid drop") + } +} + // seriesFileWrapper is a test wrapper for tsdb.seriesFileWrapper. type seriesFileWrapper struct { *tsdb.SeriesFile