From 20a85283378eba2e4ae28b3bf2c983f4406362c0 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 11 Jan 2019 16:28:36 +0000 Subject: [PATCH 1/7] Ensure TSI bitset cache cleaned up on m drop --- tsdb/tsi1/cache.go | 7 +++++++ tsdb/tsi1/index.go | 4 +++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/tsdb/tsi1/cache.go b/tsdb/tsi1/cache.go index 7fcb4ea2de..94fea328d8 100644 --- a/tsdb/tsi1/cache.go +++ b/tsdb/tsi1/cache.go @@ -153,6 +153,13 @@ func (c *TagValueSeriesIDCache) Delete(name, key, value []byte, x tsdb.SeriesID) c.Unlock() } +// DeleteMeasurement removes all cached entries for the provided measurement name. +func (c *TagValueSeriesIDCache) DeleteMeasurement(name []byte) { + c.Lock() + delete(c.cache, string(name)) + c.Unlock() +} + // delete removes x from the tuple {name, key, value} if it exists. func (c *TagValueSeriesIDCache) delete(name, key, value []byte, x tsdb.SeriesID) { if mmap, ok := c.cache[string(name)]; ok { diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index 8208f0aec7..312e411e91 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -592,13 +592,15 @@ func (i *Index) DropMeasurement(name []byte) error { }() } + // Remove any cached bitmaps for the measurement. + i.tagValueCache.DeleteMeasurement(name) + // Check for error for i := 0; i < cap(errC); i++ { if err := <-errC; err != nil { return err } } - return nil } From c7d26d8950dc4270a4d6b918501601c2e0b42b1d Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 11 Jan 2019 19:27:08 +0000 Subject: [PATCH 2/7] Rename delete method --- .../{engine_delete_prefix.go => engine_delete_measurement.go} | 0 ...ne_delete_prefix_test.go => engine_delete_measurement_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename tsdb/tsm1/{engine_delete_prefix.go => engine_delete_measurement.go} (100%) rename tsdb/tsm1/{engine_delete_prefix_test.go => engine_delete_measurement_test.go} (100%) diff --git a/tsdb/tsm1/engine_delete_prefix.go b/tsdb/tsm1/engine_delete_measurement.go similarity index 100% rename from tsdb/tsm1/engine_delete_prefix.go rename to tsdb/tsm1/engine_delete_measurement.go diff --git a/tsdb/tsm1/engine_delete_prefix_test.go b/tsdb/tsm1/engine_delete_measurement_test.go similarity index 100% rename from tsdb/tsm1/engine_delete_prefix_test.go rename to tsdb/tsm1/engine_delete_measurement_test.go From 7ee4f499e164308a2dc9390caef489bc459b4fbb Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 11 Jan 2019 19:27:53 +0000 Subject: [PATCH 3/7] Clarify best method of set difference --- tsdb/index.go | 2 +- tsdb/series_set.go | 19 ++++-- tsdb/series_set_test.go | 129 +++++++++++++++++++++++++++++++++++++++- tsdb/tsi1/file_set.go | 2 +- 4 files changed, 143 insertions(+), 9 deletions(-) diff --git a/tsdb/index.go b/tsdb/index.go index d621150c51..a45df95696 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -609,7 +609,7 @@ func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator { if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil { itr0.Close() itr1.Close() - return NewSeriesIDSetIterator(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet())) + return NewSeriesIDSetIterator(NewSeriesIDSetNegate(a[0].SeriesIDSet(), a[1].SeriesIDSet())) } return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}} diff --git a/tsdb/series_set.go b/tsdb/series_set.go index c5558ebd06..8cfac4de4e 100644 --- a/tsdb/series_set.go +++ b/tsdb/series_set.go @@ -27,6 +27,17 @@ func NewSeriesIDSet(a ...SeriesID) *SeriesIDSet { return ss } +// NewSeriesIDSetNegate returns a new SeriesIDSet containing all the elements in a +// that are not present in b. That is, the set difference between a and b. +func NewSeriesIDSetNegate(a, b *SeriesIDSet) *SeriesIDSet { + a.RLock() + defer a.RUnlock() + b.RLock() + defer b.RUnlock() + + return &SeriesIDSet{bitmap: roaring.AndNot(a.bitmap, b.bitmap)} +} + // Bytes estimates the memory footprint of this SeriesIDSet, in bytes. func (s *SeriesIDSet) Bytes() int { var b int @@ -170,15 +181,13 @@ func (s *SeriesIDSet) And(other *SeriesIDSet) *SeriesIDSet { return &SeriesIDSet{bitmap: roaring.And(s.bitmap, other.bitmap)} } -// AndNot returns a new SeriesIDSet containing elements that were present in s, -// but not present in other. -func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet { +// RemoveSet removes all values in other from s, if they exist. +func (s *SeriesIDSet) RemoveSet(other *SeriesIDSet) { s.RLock() defer s.RUnlock() other.RLock() defer other.RUnlock() - - return &SeriesIDSet{bitmap: roaring.AndNot(s.bitmap, other.bitmap)} + s.bitmap.AndNot(other.bitmap) } // ForEach calls f for each id in the set. The function is applied to the IDs diff --git a/tsdb/series_set_test.go b/tsdb/series_set_test.go index ebb1b3fb8d..10bd7887ce 100644 --- a/tsdb/series_set_test.go +++ b/tsdb/series_set_test.go @@ -10,7 +10,7 @@ import ( "testing" ) -func TestSeriesIDSet_AndNot(t *testing.T) { +func TestSeriesIDSet_NewSeriesIDSetNegate(t *testing.T) { examples := [][3][]uint64{ [3][]uint64{ {1, 10, 20, 30}, @@ -55,7 +55,7 @@ func TestSeriesIDSet_AndNot(t *testing.T) { expected.Add(NewSeriesID(v)) } - got := a.AndNot(b) + got := NewSeriesIDSetNegate(a, b) if got.String() != expected.String() { t.Fatalf("got %s, expected %s", got.String(), expected.String()) } @@ -63,6 +63,59 @@ func TestSeriesIDSet_AndNot(t *testing.T) { } } +func TestSeriesIDSet_RemoveSet(t *testing.T) { + examples := [][3][]uint64{ + [3][]uint64{ + {1, 10, 20, 30}, + {10, 12, 13, 14, 20}, + {1, 30}, + }, + [3][]uint64{ + {}, + {10}, + {}, + }, + [3][]uint64{ + {1, 10, 20, 30}, + {1, 10, 20, 30}, + {}, + }, + [3][]uint64{ + {1, 10}, + {1, 10, 100}, + {}, + }, + [3][]uint64{ + {1, 10}, + {}, + {1, 10}, + }, + } + + for i, example := range examples { + t.Run(fmt.Sprint(i), func(t *testing.T) { + // Build sets. + a, b := NewSeriesIDSet(), NewSeriesIDSet() + for _, v := range example[0] { + a.Add(NewSeriesID(v)) + } + for _, v := range example[1] { + b.Add(NewSeriesID(v)) + } + + expected := NewSeriesIDSet() + for _, v := range example[2] { + expected.Add(NewSeriesID(v)) + } + + a.RemoveSet(b) + if a.String() != expected.String() { + t.Fatalf("got %s, expected %s", a.String(), expected.String()) + } + }) + } +} + // Ensure that cloning is race-free. func TestSeriesIDSet_Clone_Race(t *testing.T) { main := NewSeriesIDSet() @@ -556,6 +609,78 @@ func BenchmarkSeriesIDSet_Remove(b *testing.B) { }) } +// BenchmarkSeriesIDSet_MassRemove benchmarks the cost of removing a large set of values. +func BenchmarkSeriesIDSet_MassRemove(b *testing.B) { + var size = uint64(1000000) + // Setup... + set = NewSeriesIDSet() + for i := uint64(0); i < size; i++ { + set.Add(NewSeriesID(i)) + } + + // Remove one at a time + b.Run(fmt.Sprint("cardinality_1000000_remove_each"), func(b *testing.B) { + clone := set.Clone() + for i := 0; i < b.N; i++ { + for j := uint64(0); j < size/2; j++ { + clone.RemoveNoLock(NewSeriesID(j)) + } + + b.StopTimer() + clone = set.Clone() + b.StartTimer() + } + }) + + // This is the case where a target series id set exists. + b.Run(fmt.Sprint("cardinality_1000000_remove_set_exists"), func(b *testing.B) { + clone := set.Clone() + other := NewSeriesIDSet() + for j := uint64(0); j < size/2; j++ { + other.AddNoLock(NewSeriesID(j)) + } + + for i := 0; i < b.N; i++ { + clone.RemoveSet(other) + b.StopTimer() + clone = set.Clone() + b.StartTimer() + } + }) + + // Make a target series id set and negate it + b.Run(fmt.Sprint("cardinality_1000000_remove_set"), func(b *testing.B) { + clone := set.Clone() + for i := 0; i < b.N; i++ { + other := NewSeriesIDSet() + for j := uint64(0); j < size/2; j++ { + other.AddNoLock(NewSeriesID(j)) + } + + clone.RemoveSet(other) + b.StopTimer() + clone = set.Clone() + b.StartTimer() + } + }) + + // This is the case where a new result set is created. + b.Run(fmt.Sprint("cardinality_1000000_remove_set_new"), func(b *testing.B) { + clone := set.Clone() + other := NewSeriesIDSet() + for j := uint64(0); j < size/2; j++ { + other.AddNoLock(NewSeriesID(j)) + } + + for i := 0; i < b.N; i++ { + _ = NewSeriesIDSetNegate(clone, other) + b.StopTimer() + clone = set.Clone() + b.StartTimer() + } + }) +} + // Typical benchmarks for a laptop: // // BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1/shards_1-4 200000 8095 ns/op 16656 B/op 11 allocs/op diff --git a/tsdb/tsi1/file_set.go b/tsdb/tsi1/file_set.go index 1a20f9cb58..3e47d969a3 100644 --- a/tsdb/tsi1/file_set.go +++ b/tsdb/tsi1/file_set.go @@ -387,7 +387,7 @@ func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Serie // Remove tombstones set in previous file. if ftss != nil && ftss.Cardinality() > 0 { - ss = ss.AndNot(ftss) + ss.RemoveSet(ftss) } // Fetch tag value series set for this file and merge into overall set. From 9e11602b6a3b9d3203d19a5c3019b03e2899b234 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 14 Jan 2019 11:55:38 +0000 Subject: [PATCH 4/7] Add DeleteBucket benchmark --- storage/engine_test.go | 149 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 144 insertions(+), 5 deletions(-) diff --git a/storage/engine_test.go b/storage/engine_test.go index d058ac25a3..b3387afca6 100644 --- a/storage/engine_test.go +++ b/storage/engine_test.go @@ -1,12 +1,14 @@ package storage_test import ( + "fmt" "io/ioutil" + "math" "os" "testing" "time" - platform "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/storage" "github.com/influxdata/influxdb/tsdb" @@ -149,6 +151,68 @@ func TestEngine_WriteAddNewField(t *testing.T) { } } +func TestEngine_DeleteBucket(t *testing.T) { + engine := NewDefaultEngine() + defer engine.Close() + engine.MustOpen() + + pt := models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": 1.0}, + time.Unix(1, 2), + ) + + err := engine.Write1xPoints([]models.Point{pt}) + if err != nil { + t.Fatalf(err.Error()) + } + + pt = models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": 1.0, "value2": 2.0}, + time.Unix(1, 3), + ) + + // Same org, different bucket. + err = engine.Write1xPointsWithOrgBucket([]models.Point{pt}, "3131313131313131", "8888888888888888") + if err != nil { + t.Fatalf(err.Error()) + } + + if got, exp := engine.SeriesCardinality(), int64(3); got != exp { + t.Fatalf("got %d series, exp %d series in index", got, exp) + } + + // Remove the original bucket. + if err := engine.DeleteBucket(engine.org, engine.bucket); err != nil { + t.Fatal(err) + } + + // Check only one bucket was removed. + if got, exp := engine.SeriesCardinality(), int64(2); got != exp { + t.Fatalf("got %d series, exp %d series in index", got, exp) + } +} + +func TestEngine_OpenClose(t *testing.T) { + engine := NewDefaultEngine() + engine.MustOpen() + + if err := engine.Close(); err != nil { + t.Fatal(err) + } + + if err := engine.Open(); err != nil { + t.Fatal(err) + } + + if err := engine.Close(); err != nil { + t.Fatal(err) + } +} + // Ensures that when a shard is closed, it removes any series meta-data // from the index. func TestEngineClose_RemoveIndex(t *testing.T) { @@ -201,8 +265,53 @@ func TestEngine_WALDisabled(t *testing.T) { } } +func BenchmarkDeleteBucket(b *testing.B) { + var engine *Engine + setup := func(card int) { + engine = NewDefaultEngine() + engine.MustOpen() + + points := make([]models.Point, card) + for i := 0; i < card; i++ { + points[i] = models.MustNewPoint( + "cpu", + models.NewTags(map[string]string{"host": "server"}), + map[string]interface{}{"value": i}, + time.Unix(1, 2), + ) + } + + if err := engine.Write1xPoints(points); err != nil { + panic(err) + } + } + + for i := 1; i <= 5; i++ { + card := int(math.Pow10(i)) + + b.Run(fmt.Sprintf("cardinality_%d", card), func(b *testing.B) { + setup(card) + for i := 0; i < b.N; i++ { + if err := engine.DeleteBucket(engine.org, engine.bucket); err != nil { + b.Fatal(err) + } + + b.StopTimer() + if err := engine.Close(); err != nil { + panic(err) + } + setup(card) + b.StartTimer() + } + }) + + } +} + type Engine struct { - path string + path string + org, bucket influxdb.ID + *storage.Engine } @@ -211,8 +320,21 @@ func NewEngine(c storage.Config) *Engine { path, _ := ioutil.TempDir("", "storage_engine_test") engine := storage.NewEngine(path, c) + + org, err := influxdb.IDFromString("3131313131313131") + if err != nil { + panic(err) + } + + bucket, err := influxdb.IDFromString("3232323232323232") + if err != nil { + panic(err) + } + return &Engine{ path: path, + org: *org, + bucket: *bucket, Engine: engine, } } @@ -233,9 +355,26 @@ func (e *Engine) MustOpen() { // This allows us to use the old `models` package helper functions and still write // the points in the correct format. func (e *Engine) Write1xPoints(pts []models.Point) error { - org, _ := platform.IDFromString("3131313131313131") - bucket, _ := platform.IDFromString("3232323232323232") - points, err := tsdb.ExplodePoints(*org, *bucket, pts) + points, err := tsdb.ExplodePoints(e.org, e.bucket, pts) + if err != nil { + return err + } + return e.Engine.WritePoints(points) +} + +// Write1xPointsWithOrgBucket writes 1.x points with the provided org and bucket id strings. +func (e *Engine) Write1xPointsWithOrgBucket(pts []models.Point, org, bucket string) error { + o, err := influxdb.IDFromString(org) + if err != nil { + return err + } + + b, err := influxdb.IDFromString(bucket) + if err != nil { + return err + } + + points, err := tsdb.ExplodePoints(*o, *b, pts) if err != nil { return err } From b025d9afa9c2d8feb590aabbb2a10a1a6ebddc41 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Fri, 11 Jan 2019 19:28:46 +0000 Subject: [PATCH 5/7] Improve efficiency of TSI index series drop This commit improves the performance of a mass delete on the TSI index by deleting at the measurement level instead of deleting each series individually. --- storage/engine.go | 4 +-- tsdb/tsi1/partition.go | 9 +++++++ tsdb/tsm1/engine_delete_measurement.go | 28 +++++++++++++++------ tsdb/tsm1/engine_delete_measurement_test.go | 4 +-- 4 files changed, 34 insertions(+), 11 deletions(-) diff --git a/storage/engine.go b/storage/engine.go index c1ed60ccd4..c0c7429648 100644 --- a/storage/engine.go +++ b/storage/engine.go @@ -377,9 +377,9 @@ func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error { // TODO(edd): we need to clean up how we're encoding the prefix so that we // don't have to remember to get it right everywhere we need to touch TSM data. encoded := tsdb.EncodeName(orgID, bucketID) - prefix := models.EscapeMeasurement(encoded[:]) + name := models.EscapeMeasurement(encoded[:]) - return e.engine.DeletePrefix(prefix, math.MinInt64, math.MaxInt64) + return e.engine.DeleteBucket(name, math.MinInt64, math.MaxInt64) } // DeleteSeriesRangeWithPredicate deletes all series data iterated over if fn returns diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index 83d6c051cf..33ff039493 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -628,6 +628,7 @@ func (p *Partition) DropMeasurement(name []byte) error { // Delete all series. if itr := fs.MeasurementSeriesIDIterator(name); itr != nil { defer itr.Close() + var total uint64 for { elem, err := itr.Next() if err != nil { @@ -638,7 +639,15 @@ func (p *Partition) DropMeasurement(name []byte) error { if err := p.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil { return err } + + // Update series set. + p.seriesIDSet.Remove(elem.SeriesID) + total++ } + + p.tracker.AddSeriesDropped(total) + p.tracker.SubSeries(total) + if err = itr.Close(); err != nil { return err } diff --git a/tsdb/tsm1/engine_delete_measurement.go b/tsdb/tsm1/engine_delete_measurement.go index 9de91831b8..6391cb930f 100644 --- a/tsdb/tsm1/engine_delete_measurement.go +++ b/tsdb/tsm1/engine_delete_measurement.go @@ -10,7 +10,10 @@ import ( "github.com/influxdata/influxql" ) -func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error { +// DeleteBucket removes all TSM data belonging to a bucket, and removes all index +// and series file data associated with the bucket. The provided time range ensures +// that only bucket data for that range is removed. +func (e *Engine) DeleteBucket(name []byte, min, max int64) error { // TODO(jeff): we need to block writes to this prefix while deletes are in progress // otherwise we can end up in a situation where we have staged data in the cache or // WAL that was deleted from the index, or worse. This needs to happen at a higher @@ -63,7 +66,7 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error { possiblyDead.keys = make(map[string]struct{}) if err := e.FileStore.Apply(func(r TSMFile) error { - return r.DeletePrefix(prefix, min, max, func(key []byte) { + return r.DeletePrefix(name, min, max, func(key []byte) { possiblyDead.Lock() possiblyDead.keys[string(key)] = struct{}{} possiblyDead.Unlock() @@ -79,7 +82,7 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error { // ApplySerialEntryFn cannot return an error in this invocation. _ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { - if bytes.HasPrefix(k, prefix) { + if bytes.HasPrefix(k, name) { if deleteKeys == nil { deleteKeys = make([][]byte, 0, 10000) } @@ -107,10 +110,10 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error { possiblyDead.RLock() defer possiblyDead.RUnlock() - iter := r.Iterator(prefix) + iter := r.Iterator(name) for i := 0; iter.Next(); i++ { key := iter.Key() - if !bytes.HasPrefix(key, prefix) { + if !bytes.HasPrefix(key, name) { break } @@ -154,13 +157,24 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error { continue } - if err := e.index.DropSeries(sid, keyb, true); err != nil { - return err + // If the time range is not full then the index must drop each series + // individually. + if min != math.MinInt64 || max != math.MaxInt64 { + if err := e.index.DropSeries(sid, keyb, true); err != nil { + return err + } } if err := e.sfile.DeleteSeriesID(sid); err != nil { return err } } + + // In this case the entire measurement (bucket) can be removed from the index. + if min == math.MinInt64 && max == math.MaxInt64 { + if err := e.index.DropMeasurement(name); err != nil { + return err + } + } } return nil diff --git a/tsdb/tsm1/engine_delete_measurement_test.go b/tsdb/tsm1/engine_delete_measurement_test.go index c0fa52df79..c0b21bb378 100644 --- a/tsdb/tsm1/engine_delete_measurement_test.go +++ b/tsdb/tsm1/engine_delete_measurement_test.go @@ -44,7 +44,7 @@ func TestEngine_DeletePrefix(t *testing.T) { t.Fatalf("series count mismatch: exp %v, got %v", exp, got) } - if err := e.DeletePrefix([]byte("cpu"), 0, 3); err != nil { + if err := e.DeleteBucket([]byte("cpu"), 0, 3); err != nil { t.Fatalf("failed to delete series: %v", err) } @@ -90,7 +90,7 @@ func TestEngine_DeletePrefix(t *testing.T) { iter.Close() // Deleting remaining series should remove them from the series. - if err := e.DeletePrefix([]byte("cpu"), 0, 9); err != nil { + if err := e.DeleteBucket([]byte("cpu"), 0, 9); err != nil { t.Fatalf("failed to delete series: %v", err) } From 9ff65f60168848b45ee8a2176692e53e67987407 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 14 Jan 2019 10:56:43 +0000 Subject: [PATCH 6/7] Track deleted series ids to remove from series file Previously series that were being removed were tracked at the key level. This means that when removing them from the series file, the series id first had to be looked up. This can cause lock thrashing when there are many series ids to look up (such as with a bulk delete), because there are no bulk methods to do this. This commit changes how the series file delete is done by extracting the series ids from the index before we remove the index entries. It's then possible to delete all those series ids from the series file without having to lookup the ids. --- tsdb/tsm1/engine_delete_measurement.go | 55 ++++++++++++++++++++------ 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/tsdb/tsm1/engine_delete_measurement.go b/tsdb/tsm1/engine_delete_measurement.go index 6391cb930f..6775b8a252 100644 --- a/tsdb/tsm1/engine_delete_measurement.go +++ b/tsdb/tsm1/engine_delete_measurement.go @@ -7,6 +7,7 @@ import ( "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" + "github.com/influxdata/influxdb/tsdb" "github.com/influxdata/influxql" ) @@ -146,6 +147,46 @@ func (e *Engine) DeleteBucket(name []byte, min, max int64) error { // TODO(jeff): it's also important that all of the deletes happen atomically with // the deletes of the data in the tsm files. + // In this case the entire measurement (bucket) can be removed from the index. + if min == math.MinInt64 && max == math.MaxInt64 { + // Build up a set of series IDs that we need to remove from the series file. + set := tsdb.NewSeriesIDSet() + itr, err := e.index.MeasurementSeriesIDIterator(name) + if err != nil { + return err + } + + var elem tsdb.SeriesIDElem + for elem, err = itr.Next(); err != nil; elem, err = itr.Next() { + if elem.SeriesID.IsZero() { + break + } + + set.AddNoLock(elem.SeriesID) + } + + if err != nil { + return err + } else if err := itr.Close(); err != nil { + return err + } + + // Remove the measurement from the index before the series file. + if err := e.index.DropMeasurement(name); err != nil { + return err + } + + // Iterate over the series ids we previously extracted from the index + // and remove from the series file. + set.ForEachNoLock(func(id tsdb.SeriesID) { + if err = e.sfile.DeleteSeriesID(id); err != nil { + return + } + }) + return err + } + + // This is the slow path, when not dropping the entire bucket (measurement) for key := range possiblyDead.keys { // TODO(jeff): ugh reduce copies here keyb := []byte(key) @@ -157,21 +198,11 @@ func (e *Engine) DeleteBucket(name []byte, min, max int64) error { continue } - // If the time range is not full then the index must drop each series - // individually. - if min != math.MinInt64 || max != math.MaxInt64 { - if err := e.index.DropSeries(sid, keyb, true); err != nil { - return err - } - } - if err := e.sfile.DeleteSeriesID(sid); err != nil { + if err := e.index.DropSeries(sid, keyb, true); err != nil { return err } - } - // In this case the entire measurement (bucket) can be removed from the index. - if min == math.MinInt64 && max == math.MaxInt64 { - if err := e.index.DropMeasurement(name); err != nil { + if err := e.sfile.DeleteSeriesID(sid); err != nil { return err } } From 810b5d92814e1cca2dfdab001dc360c726207083 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 14 Jan 2019 12:16:17 +0000 Subject: [PATCH 7/7] Bulk log file delete This commit adds a method to delete many series ids from the LogFile in bulk, reducing the number of fsyncs required. --- tsdb/tsi1/log_file.go | 18 ++++++++++++++++++ tsdb/tsi1/partition.go | 20 +++++++++++++------- 2 files changed, 31 insertions(+), 7 deletions(-) diff --git a/tsdb/tsi1/log_file.go b/tsdb/tsi1/log_file.go index db81b771cb..9ef2ba0889 100644 --- a/tsdb/tsi1/log_file.go +++ b/tsdb/tsi1/log_file.go @@ -593,6 +593,24 @@ func (f *LogFile) DeleteSeriesID(id tsdb.SeriesID) error { return f.FlushAndSync() } +// DeleteSeriesIDList marks a tombstone for all the series IDs. DeleteSeriesIDList +// should be preferred to repeatedly calling DeleteSeriesID for many series ids. +func (f *LogFile) DeleteSeriesIDList(ids []tsdb.SeriesID) error { + f.mu.Lock() + defer f.mu.Unlock() + + for _, id := range ids { + e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} + if err := f.appendEntry(&e); err != nil { + return err + } + f.execEntry(&e) + } + + // Flush buffer and sync to disk. + return f.FlushAndSync() +} + // SeriesN returns the total number of series in the file. func (f *LogFile) SeriesN() (n uint64) { f.mu.RLock() diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index 33ff039493..6b737fbebb 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -626,9 +626,14 @@ func (p *Partition) DropMeasurement(name []byte) error { } // Delete all series. + // TODO(edd): it's not clear to me why we have to delete all series IDs from + // the index when we could just mark the measurement as deleted. if itr := fs.MeasurementSeriesIDIterator(name); itr != nil { defer itr.Close() - var total uint64 + + // 1024 is assuming that typically a bucket (measurement) will have at least + // 1024 series in it. + all := make([]tsdb.SeriesID, 0, 1024) for { elem, err := itr.Next() if err != nil { @@ -636,17 +641,18 @@ func (p *Partition) DropMeasurement(name []byte) error { } else if elem.SeriesID.IsZero() { break } - if err := p.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil { - return err - } + all = append(all, elem.SeriesID) // Update series set. p.seriesIDSet.Remove(elem.SeriesID) - total++ } - p.tracker.AddSeriesDropped(total) - p.tracker.SubSeries(total) + if err := p.activeLogFile.DeleteSeriesIDList(all); err != nil { + return err + } + + p.tracker.AddSeriesDropped(uint64(len(all))) + p.tracker.SubSeries(uint64(len(all))) if err = itr.Close(); err != nil { return err