Merge pull request #11042 from influxdata/er-drop-bucket-perf

Improve delete bucket performance
pull/11052/head
Edd Robinson 2019-01-15 11:57:02 +00:00 committed by GitHub
commit 98abd940f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 387 additions and 27 deletions

View File

@ -377,9 +377,9 @@ func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error {
// TODO(edd): we need to clean up how we're encoding the prefix so that we
// don't have to remember to get it right everywhere we need to touch TSM data.
encoded := tsdb.EncodeName(orgID, bucketID)
prefix := models.EscapeMeasurement(encoded[:])
name := models.EscapeMeasurement(encoded[:])
return e.engine.DeletePrefix(prefix, math.MinInt64, math.MaxInt64)
return e.engine.DeleteBucket(name, math.MinInt64, math.MaxInt64)
}
// DeleteSeriesRangeWithPredicate deletes all series data iterated over if fn returns

View File

@ -1,12 +1,14 @@
package storage_test
import (
"fmt"
"io/ioutil"
"math"
"os"
"testing"
"time"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/storage"
"github.com/influxdata/influxdb/tsdb"
@ -149,6 +151,68 @@ func TestEngine_WriteAddNewField(t *testing.T) {
}
}
func TestEngine_DeleteBucket(t *testing.T) {
engine := NewDefaultEngine()
defer engine.Close()
engine.MustOpen()
pt := models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
err := engine.Write1xPoints([]models.Point{pt})
if err != nil {
t.Fatalf(err.Error())
}
pt = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": 1.0, "value2": 2.0},
time.Unix(1, 3),
)
// Same org, different bucket.
err = engine.Write1xPointsWithOrgBucket([]models.Point{pt}, "3131313131313131", "8888888888888888")
if err != nil {
t.Fatalf(err.Error())
}
if got, exp := engine.SeriesCardinality(), int64(3); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
// Remove the original bucket.
if err := engine.DeleteBucket(engine.org, engine.bucket); err != nil {
t.Fatal(err)
}
// Check only one bucket was removed.
if got, exp := engine.SeriesCardinality(), int64(2); got != exp {
t.Fatalf("got %d series, exp %d series in index", got, exp)
}
}
func TestEngine_OpenClose(t *testing.T) {
engine := NewDefaultEngine()
engine.MustOpen()
if err := engine.Close(); err != nil {
t.Fatal(err)
}
if err := engine.Open(); err != nil {
t.Fatal(err)
}
if err := engine.Close(); err != nil {
t.Fatal(err)
}
}
// Ensures that when a shard is closed, it removes any series meta-data
// from the index.
func TestEngineClose_RemoveIndex(t *testing.T) {
@ -201,8 +265,53 @@ func TestEngine_WALDisabled(t *testing.T) {
}
}
func BenchmarkDeleteBucket(b *testing.B) {
var engine *Engine
setup := func(card int) {
engine = NewDefaultEngine()
engine.MustOpen()
points := make([]models.Point, card)
for i := 0; i < card; i++ {
points[i] = models.MustNewPoint(
"cpu",
models.NewTags(map[string]string{"host": "server"}),
map[string]interface{}{"value": i},
time.Unix(1, 2),
)
}
if err := engine.Write1xPoints(points); err != nil {
panic(err)
}
}
for i := 1; i <= 5; i++ {
card := int(math.Pow10(i))
b.Run(fmt.Sprintf("cardinality_%d", card), func(b *testing.B) {
setup(card)
for i := 0; i < b.N; i++ {
if err := engine.DeleteBucket(engine.org, engine.bucket); err != nil {
b.Fatal(err)
}
b.StopTimer()
if err := engine.Close(); err != nil {
panic(err)
}
setup(card)
b.StartTimer()
}
})
}
}
type Engine struct {
path string
path string
org, bucket influxdb.ID
*storage.Engine
}
@ -211,8 +320,21 @@ func NewEngine(c storage.Config) *Engine {
path, _ := ioutil.TempDir("", "storage_engine_test")
engine := storage.NewEngine(path, c)
org, err := influxdb.IDFromString("3131313131313131")
if err != nil {
panic(err)
}
bucket, err := influxdb.IDFromString("3232323232323232")
if err != nil {
panic(err)
}
return &Engine{
path: path,
org: *org,
bucket: *bucket,
Engine: engine,
}
}
@ -233,9 +355,26 @@ func (e *Engine) MustOpen() {
// This allows us to use the old `models` package helper functions and still write
// the points in the correct format.
func (e *Engine) Write1xPoints(pts []models.Point) error {
org, _ := platform.IDFromString("3131313131313131")
bucket, _ := platform.IDFromString("3232323232323232")
points, err := tsdb.ExplodePoints(*org, *bucket, pts)
points, err := tsdb.ExplodePoints(e.org, e.bucket, pts)
if err != nil {
return err
}
return e.Engine.WritePoints(points)
}
// Write1xPointsWithOrgBucket writes 1.x points with the provided org and bucket id strings.
func (e *Engine) Write1xPointsWithOrgBucket(pts []models.Point, org, bucket string) error {
o, err := influxdb.IDFromString(org)
if err != nil {
return err
}
b, err := influxdb.IDFromString(bucket)
if err != nil {
return err
}
points, err := tsdb.ExplodePoints(*o, *b, pts)
if err != nil {
return err
}

View File

@ -609,7 +609,7 @@ func DifferenceSeriesIDIterators(itr0, itr1 SeriesIDIterator) SeriesIDIterator {
if a := NewSeriesIDSetIterators([]SeriesIDIterator{itr0, itr1}); a != nil {
itr0.Close()
itr1.Close()
return NewSeriesIDSetIterator(a[0].SeriesIDSet().AndNot(a[1].SeriesIDSet()))
return NewSeriesIDSetIterator(NewSeriesIDSetNegate(a[0].SeriesIDSet(), a[1].SeriesIDSet()))
}
return &seriesIDDifferenceIterator{itrs: [2]SeriesIDIterator{itr0, itr1}}

View File

@ -27,6 +27,17 @@ func NewSeriesIDSet(a ...SeriesID) *SeriesIDSet {
return ss
}
// NewSeriesIDSetNegate returns a new SeriesIDSet containing all the elements in a
// that are not present in b. That is, the set difference between a and b.
func NewSeriesIDSetNegate(a, b *SeriesIDSet) *SeriesIDSet {
a.RLock()
defer a.RUnlock()
b.RLock()
defer b.RUnlock()
return &SeriesIDSet{bitmap: roaring.AndNot(a.bitmap, b.bitmap)}
}
// Bytes estimates the memory footprint of this SeriesIDSet, in bytes.
func (s *SeriesIDSet) Bytes() int {
var b int
@ -170,15 +181,13 @@ func (s *SeriesIDSet) And(other *SeriesIDSet) *SeriesIDSet {
return &SeriesIDSet{bitmap: roaring.And(s.bitmap, other.bitmap)}
}
// AndNot returns a new SeriesIDSet containing elements that were present in s,
// but not present in other.
func (s *SeriesIDSet) AndNot(other *SeriesIDSet) *SeriesIDSet {
// RemoveSet removes all values in other from s, if they exist.
func (s *SeriesIDSet) RemoveSet(other *SeriesIDSet) {
s.RLock()
defer s.RUnlock()
other.RLock()
defer other.RUnlock()
return &SeriesIDSet{bitmap: roaring.AndNot(s.bitmap, other.bitmap)}
s.bitmap.AndNot(other.bitmap)
}
// ForEach calls f for each id in the set. The function is applied to the IDs

View File

@ -10,7 +10,7 @@ import (
"testing"
)
func TestSeriesIDSet_AndNot(t *testing.T) {
func TestSeriesIDSet_NewSeriesIDSetNegate(t *testing.T) {
examples := [][3][]uint64{
[3][]uint64{
{1, 10, 20, 30},
@ -55,7 +55,7 @@ func TestSeriesIDSet_AndNot(t *testing.T) {
expected.Add(NewSeriesID(v))
}
got := a.AndNot(b)
got := NewSeriesIDSetNegate(a, b)
if got.String() != expected.String() {
t.Fatalf("got %s, expected %s", got.String(), expected.String())
}
@ -63,6 +63,59 @@ func TestSeriesIDSet_AndNot(t *testing.T) {
}
}
func TestSeriesIDSet_RemoveSet(t *testing.T) {
examples := [][3][]uint64{
[3][]uint64{
{1, 10, 20, 30},
{10, 12, 13, 14, 20},
{1, 30},
},
[3][]uint64{
{},
{10},
{},
},
[3][]uint64{
{1, 10, 20, 30},
{1, 10, 20, 30},
{},
},
[3][]uint64{
{1, 10},
{1, 10, 100},
{},
},
[3][]uint64{
{1, 10},
{},
{1, 10},
},
}
for i, example := range examples {
t.Run(fmt.Sprint(i), func(t *testing.T) {
// Build sets.
a, b := NewSeriesIDSet(), NewSeriesIDSet()
for _, v := range example[0] {
a.Add(NewSeriesID(v))
}
for _, v := range example[1] {
b.Add(NewSeriesID(v))
}
expected := NewSeriesIDSet()
for _, v := range example[2] {
expected.Add(NewSeriesID(v))
}
a.RemoveSet(b)
if a.String() != expected.String() {
t.Fatalf("got %s, expected %s", a.String(), expected.String())
}
})
}
}
// Ensure that cloning is race-free.
func TestSeriesIDSet_Clone_Race(t *testing.T) {
main := NewSeriesIDSet()
@ -556,6 +609,78 @@ func BenchmarkSeriesIDSet_Remove(b *testing.B) {
})
}
// BenchmarkSeriesIDSet_MassRemove benchmarks the cost of removing a large set of values.
func BenchmarkSeriesIDSet_MassRemove(b *testing.B) {
var size = uint64(1000000)
// Setup...
set = NewSeriesIDSet()
for i := uint64(0); i < size; i++ {
set.Add(NewSeriesID(i))
}
// Remove one at a time
b.Run(fmt.Sprint("cardinality_1000000_remove_each"), func(b *testing.B) {
clone := set.Clone()
for i := 0; i < b.N; i++ {
for j := uint64(0); j < size/2; j++ {
clone.RemoveNoLock(NewSeriesID(j))
}
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
// This is the case where a target series id set exists.
b.Run(fmt.Sprint("cardinality_1000000_remove_set_exists"), func(b *testing.B) {
clone := set.Clone()
other := NewSeriesIDSet()
for j := uint64(0); j < size/2; j++ {
other.AddNoLock(NewSeriesID(j))
}
for i := 0; i < b.N; i++ {
clone.RemoveSet(other)
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
// Make a target series id set and negate it
b.Run(fmt.Sprint("cardinality_1000000_remove_set"), func(b *testing.B) {
clone := set.Clone()
for i := 0; i < b.N; i++ {
other := NewSeriesIDSet()
for j := uint64(0); j < size/2; j++ {
other.AddNoLock(NewSeriesID(j))
}
clone.RemoveSet(other)
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
// This is the case where a new result set is created.
b.Run(fmt.Sprint("cardinality_1000000_remove_set_new"), func(b *testing.B) {
clone := set.Clone()
other := NewSeriesIDSet()
for j := uint64(0); j < size/2; j++ {
other.AddNoLock(NewSeriesID(j))
}
for i := 0; i < b.N; i++ {
_ = NewSeriesIDSetNegate(clone, other)
b.StopTimer()
clone = set.Clone()
b.StartTimer()
}
})
}
// Typical benchmarks for a laptop:
//
// BenchmarkSeriesIDSet_Merge_Duplicates/cardinality_1/shards_1-4 200000 8095 ns/op 16656 B/op 11 allocs/op

View File

@ -153,6 +153,13 @@ func (c *TagValueSeriesIDCache) Delete(name, key, value []byte, x tsdb.SeriesID)
c.Unlock()
}
// DeleteMeasurement removes all cached entries for the provided measurement name.
func (c *TagValueSeriesIDCache) DeleteMeasurement(name []byte) {
c.Lock()
delete(c.cache, string(name))
c.Unlock()
}
// delete removes x from the tuple {name, key, value} if it exists.
func (c *TagValueSeriesIDCache) delete(name, key, value []byte, x tsdb.SeriesID) {
if mmap, ok := c.cache[string(name)]; ok {

View File

@ -387,7 +387,7 @@ func (fs *FileSet) TagValueSeriesIDIterator(name, key, value []byte) (tsdb.Serie
// Remove tombstones set in previous file.
if ftss != nil && ftss.Cardinality() > 0 {
ss = ss.AndNot(ftss)
ss.RemoveSet(ftss)
}
// Fetch tag value series set for this file and merge into overall set.

View File

@ -592,13 +592,15 @@ func (i *Index) DropMeasurement(name []byte) error {
}()
}
// Remove any cached bitmaps for the measurement.
i.tagValueCache.DeleteMeasurement(name)
// Check for error
for i := 0; i < cap(errC); i++ {
if err := <-errC; err != nil {
return err
}
}
return nil
}

View File

@ -593,6 +593,24 @@ func (f *LogFile) DeleteSeriesID(id tsdb.SeriesID) error {
return f.FlushAndSync()
}
// DeleteSeriesIDList marks a tombstone for all the series IDs. DeleteSeriesIDList
// should be preferred to repeatedly calling DeleteSeriesID for many series ids.
func (f *LogFile) DeleteSeriesIDList(ids []tsdb.SeriesID) error {
f.mu.Lock()
defer f.mu.Unlock()
for _, id := range ids {
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
}
// Flush buffer and sync to disk.
return f.FlushAndSync()
}
// SeriesN returns the total number of series in the file.
func (f *LogFile) SeriesN() (n uint64) {
f.mu.RLock()

View File

@ -626,8 +626,14 @@ func (p *Partition) DropMeasurement(name []byte) error {
}
// Delete all series.
// TODO(edd): it's not clear to me why we have to delete all series IDs from
// the index when we could just mark the measurement as deleted.
if itr := fs.MeasurementSeriesIDIterator(name); itr != nil {
defer itr.Close()
// 1024 is assuming that typically a bucket (measurement) will have at least
// 1024 series in it.
all := make([]tsdb.SeriesID, 0, 1024)
for {
elem, err := itr.Next()
if err != nil {
@ -635,10 +641,19 @@ func (p *Partition) DropMeasurement(name []byte) error {
} else if elem.SeriesID.IsZero() {
break
}
if err := p.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil {
return err
}
all = append(all, elem.SeriesID)
// Update series set.
p.seriesIDSet.Remove(elem.SeriesID)
}
if err := p.activeLogFile.DeleteSeriesIDList(all); err != nil {
return err
}
p.tracker.AddSeriesDropped(uint64(len(all)))
p.tracker.SubSeries(uint64(len(all)))
if err = itr.Close(); err != nil {
return err
}

View File

@ -7,10 +7,14 @@ import (
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/pkg/bytesutil"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxql"
)
func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
// DeleteBucket removes all TSM data belonging to a bucket, and removes all index
// and series file data associated with the bucket. The provided time range ensures
// that only bucket data for that range is removed.
func (e *Engine) DeleteBucket(name []byte, min, max int64) error {
// TODO(jeff): we need to block writes to this prefix while deletes are in progress
// otherwise we can end up in a situation where we have staged data in the cache or
// WAL that was deleted from the index, or worse. This needs to happen at a higher
@ -63,7 +67,7 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
possiblyDead.keys = make(map[string]struct{})
if err := e.FileStore.Apply(func(r TSMFile) error {
return r.DeletePrefix(prefix, min, max, func(key []byte) {
return r.DeletePrefix(name, min, max, func(key []byte) {
possiblyDead.Lock()
possiblyDead.keys[string(key)] = struct{}{}
possiblyDead.Unlock()
@ -79,7 +83,7 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
// ApplySerialEntryFn cannot return an error in this invocation.
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
if bytes.HasPrefix(k, prefix) {
if bytes.HasPrefix(k, name) {
if deleteKeys == nil {
deleteKeys = make([][]byte, 0, 10000)
}
@ -107,10 +111,10 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
possiblyDead.RLock()
defer possiblyDead.RUnlock()
iter := r.Iterator(prefix)
iter := r.Iterator(name)
for i := 0; iter.Next(); i++ {
key := iter.Key()
if !bytes.HasPrefix(key, prefix) {
if !bytes.HasPrefix(key, name) {
break
}
@ -143,6 +147,46 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
// TODO(jeff): it's also important that all of the deletes happen atomically with
// the deletes of the data in the tsm files.
// In this case the entire measurement (bucket) can be removed from the index.
if min == math.MinInt64 && max == math.MaxInt64 {
// Build up a set of series IDs that we need to remove from the series file.
set := tsdb.NewSeriesIDSet()
itr, err := e.index.MeasurementSeriesIDIterator(name)
if err != nil {
return err
}
var elem tsdb.SeriesIDElem
for elem, err = itr.Next(); err != nil; elem, err = itr.Next() {
if elem.SeriesID.IsZero() {
break
}
set.AddNoLock(elem.SeriesID)
}
if err != nil {
return err
} else if err := itr.Close(); err != nil {
return err
}
// Remove the measurement from the index before the series file.
if err := e.index.DropMeasurement(name); err != nil {
return err
}
// Iterate over the series ids we previously extracted from the index
// and remove from the series file.
set.ForEachNoLock(func(id tsdb.SeriesID) {
if err = e.sfile.DeleteSeriesID(id); err != nil {
return
}
})
return err
}
// This is the slow path, when not dropping the entire bucket (measurement)
for key := range possiblyDead.keys {
// TODO(jeff): ugh reduce copies here
keyb := []byte(key)
@ -157,6 +201,7 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
if err := e.index.DropSeries(sid, keyb, true); err != nil {
return err
}
if err := e.sfile.DeleteSeriesID(sid); err != nil {
return err
}

View File

@ -44,7 +44,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
}
if err := e.DeletePrefix([]byte("cpu"), 0, 3); err != nil {
if err := e.DeleteBucket([]byte("cpu"), 0, 3); err != nil {
t.Fatalf("failed to delete series: %v", err)
}
@ -90,7 +90,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
iter.Close()
// Deleting remaining series should remove them from the series.
if err := e.DeletePrefix([]byte("cpu"), 0, 9); err != nil {
if err := e.DeleteBucket([]byte("cpu"), 0, 9); err != nil {
t.Fatalf("failed to delete series: %v", err)
}