Improve efficiency of TSI index series drop
This commit improves the performance of a mass delete on the TSI index by deleting at the measurement level instead of deleting each series individually.pull/11042/head
parent
9e11602b6a
commit
b025d9afa9
|
@ -377,9 +377,9 @@ func (e *Engine) DeleteBucket(orgID, bucketID platform.ID) error {
|
|||
// TODO(edd): we need to clean up how we're encoding the prefix so that we
|
||||
// don't have to remember to get it right everywhere we need to touch TSM data.
|
||||
encoded := tsdb.EncodeName(orgID, bucketID)
|
||||
prefix := models.EscapeMeasurement(encoded[:])
|
||||
name := models.EscapeMeasurement(encoded[:])
|
||||
|
||||
return e.engine.DeletePrefix(prefix, math.MinInt64, math.MaxInt64)
|
||||
return e.engine.DeleteBucket(name, math.MinInt64, math.MaxInt64)
|
||||
}
|
||||
|
||||
// DeleteSeriesRangeWithPredicate deletes all series data iterated over if fn returns
|
||||
|
|
|
@ -628,6 +628,7 @@ func (p *Partition) DropMeasurement(name []byte) error {
|
|||
// Delete all series.
|
||||
if itr := fs.MeasurementSeriesIDIterator(name); itr != nil {
|
||||
defer itr.Close()
|
||||
var total uint64
|
||||
for {
|
||||
elem, err := itr.Next()
|
||||
if err != nil {
|
||||
|
@ -638,7 +639,15 @@ func (p *Partition) DropMeasurement(name []byte) error {
|
|||
if err := p.activeLogFile.DeleteSeriesID(elem.SeriesID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update series set.
|
||||
p.seriesIDSet.Remove(elem.SeriesID)
|
||||
total++
|
||||
}
|
||||
|
||||
p.tracker.AddSeriesDropped(total)
|
||||
p.tracker.SubSeries(total)
|
||||
|
||||
if err = itr.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -10,7 +10,10 @@ import (
|
|||
"github.com/influxdata/influxql"
|
||||
)
|
||||
|
||||
func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
|
||||
// DeleteBucket removes all TSM data belonging to a bucket, and removes all index
|
||||
// and series file data associated with the bucket. The provided time range ensures
|
||||
// that only bucket data for that range is removed.
|
||||
func (e *Engine) DeleteBucket(name []byte, min, max int64) error {
|
||||
// TODO(jeff): we need to block writes to this prefix while deletes are in progress
|
||||
// otherwise we can end up in a situation where we have staged data in the cache or
|
||||
// WAL that was deleted from the index, or worse. This needs to happen at a higher
|
||||
|
@ -63,7 +66,7 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
|
|||
possiblyDead.keys = make(map[string]struct{})
|
||||
|
||||
if err := e.FileStore.Apply(func(r TSMFile) error {
|
||||
return r.DeletePrefix(prefix, min, max, func(key []byte) {
|
||||
return r.DeletePrefix(name, min, max, func(key []byte) {
|
||||
possiblyDead.Lock()
|
||||
possiblyDead.keys[string(key)] = struct{}{}
|
||||
possiblyDead.Unlock()
|
||||
|
@ -79,7 +82,7 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
|
|||
|
||||
// ApplySerialEntryFn cannot return an error in this invocation.
|
||||
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
||||
if bytes.HasPrefix(k, prefix) {
|
||||
if bytes.HasPrefix(k, name) {
|
||||
if deleteKeys == nil {
|
||||
deleteKeys = make([][]byte, 0, 10000)
|
||||
}
|
||||
|
@ -107,10 +110,10 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
|
|||
possiblyDead.RLock()
|
||||
defer possiblyDead.RUnlock()
|
||||
|
||||
iter := r.Iterator(prefix)
|
||||
iter := r.Iterator(name)
|
||||
for i := 0; iter.Next(); i++ {
|
||||
key := iter.Key()
|
||||
if !bytes.HasPrefix(key, prefix) {
|
||||
if !bytes.HasPrefix(key, name) {
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -154,13 +157,24 @@ func (e *Engine) DeletePrefix(prefix []byte, min, max int64) error {
|
|||
continue
|
||||
}
|
||||
|
||||
if err := e.index.DropSeries(sid, keyb, true); err != nil {
|
||||
return err
|
||||
// If the time range is not full then the index must drop each series
|
||||
// individually.
|
||||
if min != math.MinInt64 || max != math.MaxInt64 {
|
||||
if err := e.index.DropSeries(sid, keyb, true); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := e.sfile.DeleteSeriesID(sid); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// In this case the entire measurement (bucket) can be removed from the index.
|
||||
if min == math.MinInt64 && max == math.MaxInt64 {
|
||||
if err := e.index.DropMeasurement(name); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -44,7 +44,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
|||
t.Fatalf("series count mismatch: exp %v, got %v", exp, got)
|
||||
}
|
||||
|
||||
if err := e.DeletePrefix([]byte("cpu"), 0, 3); err != nil {
|
||||
if err := e.DeleteBucket([]byte("cpu"), 0, 3); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
|
@ -90,7 +90,7 @@ func TestEngine_DeletePrefix(t *testing.T) {
|
|||
iter.Close()
|
||||
|
||||
// Deleting remaining series should remove them from the series.
|
||||
if err := e.DeletePrefix([]byte("cpu"), 0, 9); err != nil {
|
||||
if err := e.DeleteBucket([]byte("cpu"), 0, 9); err != nil {
|
||||
t.Fatalf("failed to delete series: %v", err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue