Merge pull request #17338 from influxdata/tsi-bulk-delete

feat(tsdb): Bulk delete series performance improvement
pull/17347/head
Ben Johnson 2020-03-18 16:24:04 -06:00 committed by GitHub
commit e2fcb7ef9d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 235 additions and 103 deletions

View File

@ -249,14 +249,21 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollecti
return nil
}
// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (f *SeriesFile) DeleteSeriesID(id tsdb.SeriesID) error {
p := f.SeriesIDPartition(id)
if p == nil {
return ErrInvalidSeriesPartitionID
// DeleteSeriesID flags a list of series as permanently deleted.
// If a series is reintroduced later then it must create a new id.
func (f *SeriesFile) DeleteSeriesIDs(ids []tsdb.SeriesID) error {
m := make(map[int][]tsdb.SeriesID)
for _, id := range ids {
partitionID := f.SeriesIDPartitionID(id)
m[partitionID] = append(m[partitionID], id)
}
return p.DeleteSeriesID(id)
var g errgroup.Group
for partitionID, partitionIDs := range m {
partitionID, partitionIDs := partitionID, partitionIDs
g.Go(func() error { return f.partitions[partitionID].DeleteSeriesIDs(partitionIDs) })
}
return g.Wait()
}
// IsDeleted returns true if the ID has been deleted before.

View File

@ -218,7 +218,7 @@ func TestSeriesFile_DeleteSeriesID(t *testing.T) {
}
// Delete and ensure deletion.
if err := sfile.DeleteSeriesID(id); err != nil {
if err := sfile.DeleteSeriesIDs([]tsdb.SeriesID{id}); err != nil {
t.Fatal(err)
} else if !sfile.IsDeleted(id) {
t.Fatal("expected deletion before compaction")
@ -301,7 +301,7 @@ func TestSeriesFile_Compaction(t *testing.T) {
if id := sfile.SeriesID(collection.Names[i], collection.Tags[i], nil); id.IsZero() {
t.Fatal("expected series id")
} else if err := sfile.DeleteSeriesID(id); err != nil {
} else if err := sfile.DeleteSeriesIDs([]tsdb.SeriesID{id}); err != nil {
t.Fatal(err)
}
}
@ -387,7 +387,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
// Delete a subset of keys.
for i := 0; i < len(ids); i += 10 {
if err := sfile.DeleteSeriesID(ids[i]); err != nil {
if err := sfile.DeleteSeriesIDs([]tsdb.SeriesID{ids[i]}); err != nil {
b.Fatal(err)
}
}

View File

@ -368,9 +368,9 @@ func (p *SeriesPartition) Compacting() bool {
return p.compacting
}
// DeleteSeriesID flags a series as permanently deleted.
// If the series is reintroduced later then it must create a new id.
func (p *SeriesPartition) DeleteSeriesID(id tsdb.SeriesID) error {
// DeleteSeriesID flags a list of series as permanently deleted.
// If a series is reintroduced later then it must create a new id.
func (p *SeriesPartition) DeleteSeriesIDs(ids []tsdb.SeriesID) error {
p.mu.Lock()
defer p.mu.Unlock()
@ -378,15 +378,19 @@ func (p *SeriesPartition) DeleteSeriesID(id tsdb.SeriesID) error {
return ErrSeriesPartitionClosed
}
// Already tombstoned, ignore.
if p.index.IsDeleted(id) {
return nil
}
var n uint64
for _, id := range ids {
// Already tombstoned, ignore.
if p.index.IsDeleted(id) {
continue
}
// Write tombstone entry. The type is ignored in tombstones.
_, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id.WithType(models.Empty), nil))
if err != nil {
return err
// Write tombstone entries. The type is ignored in tombstones.
_, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id.WithType(models.Empty), nil))
if err != nil {
return err
}
n++
}
// Flush active segment write.
@ -397,8 +401,11 @@ func (p *SeriesPartition) DeleteSeriesID(id tsdb.SeriesID) error {
}
// Mark tombstone in memory.
p.index.Delete(id)
p.tracker.SubSeries(1)
for _, id := range ids {
p.index.Delete(id)
}
p.tracker.SubSeries(n)
return nil
}

View File

@ -113,7 +113,7 @@ func NewTest(t *testing.T) *Test {
}
// delete one series
if err := seriesFile.DeleteSeriesID(tsdb.NewSeriesID(ids[0])); err != nil {
if err := seriesFile.DeleteSeriesIDs([]tsdb.SeriesID{tsdb.NewSeriesID(ids[0])}); err != nil {
return err
}

View File

@ -28,6 +28,7 @@ import (
"github.com/influxdata/influxql"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// ErrCompactionInterrupted is returned if compactions are disabled or
@ -373,11 +374,6 @@ func (i *Index) PartitionAt(index int) *Partition {
return i.partitions[index]
}
// partition returns the appropriate Partition for a provided series key.
func (i *Index) partition(key []byte) *Partition {
return i.partitions[int(xxhash.Sum64(key)&(i.PartitionN-1))]
}
// partitionIdx returns the index of the partition that key belongs in.
func (i *Index) partitionIdx(key []byte) int {
return int(xxhash.Sum64(key) & (i.PartitionN - 1))
@ -732,11 +728,23 @@ func (i *Index) InitializeSeries(*tsdb.SeriesCollection) error {
return nil
}
// DropSeries drops the provided series from the index. If cascade is true
// DropSeries drops the provided set of series from the index. If cascade is true
// and this is the last series to the measurement, the measurment will also be dropped.
func (i *Index) DropSeries(seriesID tsdb.SeriesID, key []byte, cascade bool) error {
// Remove from partition.
if err := i.partition(key).DropSeries(seriesID); err != nil {
func (i *Index) DropSeries(items []DropSeriesItem, cascade bool) error {
// Split into batches for each partition.
m := make(map[int][]tsdb.SeriesID)
for _, item := range items {
partitionID := i.partitionIdx(item.Key)
m[partitionID] = append(m[partitionID], item.SeriesID)
}
// Remove from all partitions in parallel.
var g errgroup.Group
for partitionID, ids := range m {
partitionID, ids := partitionID, ids
g.Go(func() error { return i.partitions[partitionID].DropSeries(ids) })
}
if err := g.Wait(); err != nil {
return err
}
@ -744,29 +752,38 @@ func (i *Index) DropSeries(seriesID tsdb.SeriesID, key []byte, cascade bool) err
return nil
}
// Extract measurement name & tags.
name, tags := models.ParseKeyBytes(key)
// Clear tag value cache & determine unique set of measurement names.
nameSet := make(map[string]struct{})
for _, item := range items {
// Extract measurement name & tags.
name, tags := models.ParseKeyBytes(item.Key)
nameSet[string(name)] = struct{}{}
// If there are cached sets for any of the tag pairs, they will need to be
// updated with the series id.
i.tagValueCache.RLock()
if i.tagValueCache.measurementContainsSets(name) {
for _, pair := range tags {
i.tagValueCache.delete(name, pair.Key, pair.Value, seriesID) // Takes a lock on the series id set
// If there are cached sets for any of the tag pairs, they will need to be
// updated with the series id.
i.tagValueCache.RLock()
if i.tagValueCache.measurementContainsSets(name) {
for _, pair := range tags {
i.tagValueCache.delete(name, pair.Key, pair.Value, item.SeriesID) // Takes a lock on the series id set
}
}
}
i.tagValueCache.RUnlock()
// Check if that was the last series for the measurement in the entire index.
if ok, err := i.MeasurementHasSeries(name); err != nil {
return err
} else if ok {
return nil
i.tagValueCache.RUnlock()
}
// If no more series exist in the measurement then delete the measurement.
if err := i.DropMeasurement(name); err != nil {
return err
for name := range nameSet {
namebytes := []byte(name)
// Check if that was the last series for the measurement in the entire index.
if ok, err := i.MeasurementHasSeries(namebytes); err != nil {
return err
} else if ok {
continue
}
// If no more series exist in the measurement then delete the measurement.
if err := i.DropMeasurement(namebytes); err != nil {
return err
}
}
return nil
}
@ -1664,3 +1681,8 @@ func (itr *filterUndeletedSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) {
return e, nil
}
}
type DropSeriesItem struct {
SeriesID tsdb.SeriesID
Key []byte
}

View File

@ -109,7 +109,7 @@ func TestIndex_MeasurementExists(t *testing.T) {
}
// Delete one series.
if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil {
if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: sid, Key: models.MakeKey(name, tags)}}, true); err != nil {
t.Fatal(err)
}
@ -128,7 +128,7 @@ func TestIndex_MeasurementExists(t *testing.T) {
if sid.IsZero() {
t.Fatalf("got 0 series id for %s/%v", name, tags)
}
if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil {
if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: sid, Key: models.MakeKey(name, tags)}}, true); err != nil {
t.Fatal(err)
}
@ -378,7 +378,7 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) {
}
seriesID := idx.SeriesFile.SeriesID([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}), nil)
if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil {
if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: seriesID, Key: idx.SeriesFile.SeriesKey(seriesID)}}, true); err != nil {
t.Fatal(err)
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
t.Fatal(err)
@ -387,7 +387,7 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) {
}
seriesID = idx.SeriesFile.SeriesID([]byte("mem"), models.NewTags(map[string]string{"region": "east"}), nil)
if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil {
if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: seriesID, Key: idx.SeriesFile.SeriesKey(seriesID)}}, true); err != nil {
t.Fatal(err)
} else if stats, err := idx.MeasurementCardinalityStats(); err != nil {
t.Fatal(err)

View File

@ -456,21 +456,33 @@ func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator {
return tk.TagValueIterator()
}
// DeleteTagKey adds a tombstone for a tag key to the log file.
func (f *LogFile) DeleteTagKey(name, key []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
// deleteTagKey adds a tombstone for a tag key to the log file without a lock.
func (f *LogFile) deleteTagKey(name, key []byte) error {
e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
return nil
}
// Flush buffer and sync to disk.
// DeleteTagKey adds a tombstone for a tag key to the log file.
func (f *LogFile) DeleteTagKey(name, key []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
if err := f.deleteTagKey(name, key); err != nil {
return err
}
return f.FlushAndSync()
}
// DeleteTagKeyNoSync adds a tombstone for a tag key to the log file without a sync.
func (f *LogFile) DeleteTagKeyNoSync(name, key []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
return f.deleteTagKey(name, key)
}
// TagValueSeriesIDSet returns a series iterator for a tag value.
func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) {
f.mu.RLock()
@ -525,21 +537,34 @@ func (f *LogFile) TagValueN() (n uint64) {
return n
}
// DeleteTagValue adds a tombstone for a tag value to the log file.
func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
// deleteTagValue adds a tombstone for a tag value to the log file without a lock.
func (f *LogFile) deleteTagValue(name, key, value []byte) error {
e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
return nil
}
// Flush buffer and sync to disk.
// DeleteTagValue adds a tombstone for a tag value to the log file.
func (f *LogFile) DeleteTagValue(name, key, value []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
if err := f.deleteTagValue(name, key, value); err != nil {
return err
}
return f.FlushAndSync()
}
// DeleteTagValueNoSync adds a tombstone for a tag value to the log file.
// Caller must call FlushAndSync().
func (f *LogFile) DeleteTagValueNoSync(name, key, value []byte) error {
f.mu.Lock()
defer f.mu.Unlock()
return f.deleteTagValue(name, key, value)
}
// AddSeriesList adds a list of series to the log file in bulk.
func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, collection *tsdb.SeriesCollection) ([]tsdb.SeriesID, error) {
var writeRequired bool
@ -608,16 +633,18 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, collection *tsdb.Se
return seriesIDs, nil
}
// DeleteSeriesID adds a tombstone for a series id.
func (f *LogFile) DeleteSeriesID(id tsdb.SeriesID) error {
// DeleteSeriesIDs adds a tombstone for a list of series ids.
func (f *LogFile) DeleteSeriesIDs(ids []tsdb.SeriesID) error {
f.mu.Lock()
defer f.mu.Unlock()
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
if err := f.appendEntry(&e); err != nil {
return err
for _, id := range ids {
e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id}
if err := f.appendEntry(&e); err != nil {
return err
}
f.execEntry(&e)
}
f.execEntry(&e)
// Flush buffer and sync to disk.
return f.FlushAndSync()

View File

@ -621,7 +621,7 @@ func (p *Partition) DropMeasurement(name []byte) error {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagKey(name, k.Key())
return p.activeLogFile.DeleteTagKeyNoSync(name, k.Key())
}(); err != nil {
return err
}
@ -634,7 +634,7 @@ func (p *Partition) DropMeasurement(name []byte) error {
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value())
return p.activeLogFile.DeleteTagValueNoSync(name, k.Key(), v.Value())
}(); err != nil {
return err
}
@ -687,6 +687,15 @@ func (p *Partition) DropMeasurement(name []byte) error {
return err
}
// Ensure log is flushed & synced.
if err := func() error {
p.mu.RLock()
defer p.mu.RUnlock()
return p.activeLogFile.FlushAndSync()
}(); err != nil {
return err
}
// Check if the log file needs to be swapped.
if err := p.CheckLogFile(); err != nil {
return err
@ -750,24 +759,27 @@ func (p *Partition) createSeriesListIfNotExists(collection *tsdb.SeriesCollectio
return ids, nil
}
// DropSeries removes the provided series id from the index.
//
// TODO(edd): We should support a bulk drop here.
func (p *Partition) DropSeries(seriesID tsdb.SeriesID) error {
// Ignore if the series is already deleted.
if !p.seriesIDSet.Contains(seriesID) {
return nil
// DropSeries removes the provided set of series id from the index.
func (p *Partition) DropSeries(ids []tsdb.SeriesID) error {
// Count total affected series.
var n uint64
for _, id := range ids {
if p.seriesIDSet.Contains(id) {
n++
}
}
// Delete series from index.
if err := p.activeLogFile.DeleteSeriesID(seriesID); err != nil {
if err := p.activeLogFile.DeleteSeriesIDs(ids); err != nil {
return err
}
// Update series set.
p.seriesIDSet.Remove(seriesID)
p.tracker.AddSeriesDropped(1)
p.tracker.SubSeries(1)
for _, id := range ids {
p.seriesIDSet.Remove(id)
}
p.tracker.AddSeriesDropped(n)
p.tracker.SubSeries(n)
// Swap log file, if necessary.
return p.CheckLogFile()

View File

@ -12,6 +12,7 @@ import (
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/tsdb/tsi1"
"github.com/influxdata/influxql"
)
@ -245,11 +246,11 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
// and remove from the series file.
span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "SFile Delete Series IDs")
span.LogKV("measurement_name", fmt.Sprintf("%x", name), "series_id_set_size", set.Cardinality())
set.ForEachNoLock(func(id tsdb.SeriesID) {
if err = e.sfile.DeleteSeriesID(id); err != nil {
return
}
})
var ids []tsdb.SeriesID
set.ForEachNoLock(func(id tsdb.SeriesID) { ids = append(ids, id) })
if err = e.sfile.DeleteSeriesIDs(ids); err != nil {
return err
}
span.Finish()
return err
}
@ -257,22 +258,41 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
// This is the slow path, when not dropping the entire bucket (measurement)
span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI/SFile Delete keys")
span.LogKV("measurement_name", fmt.Sprintf("%x", name), "keys_to_delete", len(possiblyDead.keys))
// Convert key map to a slice.
possiblyDeadKeysSlice := make([][]byte, 0, len(possiblyDead.keys))
for key := range possiblyDead.keys {
// TODO(jeff): ugh reduce copies here
keyb := []byte(key)
keyb, _ = SeriesAndFieldFromCompositeKey(keyb)
possiblyDeadKeysSlice = append(possiblyDeadKeysSlice, []byte(key))
}
name, tags := models.ParseKeyBytes(keyb)
sid := e.sfile.SeriesID(name, tags, buf)
if sid.IsZero() {
continue
const batchSize = 1000
batch := make([]tsi1.DropSeriesItem, 0, batchSize)
ids := make([]tsdb.SeriesID, 0, batchSize)
for i := 0; i < len(possiblyDeadKeysSlice); i += batchSize {
isLastBatch := i+batchSize > len(possiblyDeadKeysSlice)
batch, ids = batch[:0], ids[:0]
for j := 0; (i*batchSize)+j < len(possiblyDeadKeysSlice) && j < batchSize; j++ {
var item tsi1.DropSeriesItem
// TODO(jeff): ugh reduce copies here
key := possiblyDeadKeysSlice[(i*batchSize)+j]
item.Key = []byte(key)
item.Key, _ = SeriesAndFieldFromCompositeKey(item.Key)
name, tags := models.ParseKeyBytes(item.Key)
item.SeriesID = e.sfile.SeriesID(name, tags, buf)
if item.SeriesID.IsZero() {
continue
}
batch = append(batch, item)
ids = append(ids, item.SeriesID)
}
if err := e.index.DropSeries(sid, keyb, true); err != nil {
// Remove from index & series file.
if err := e.index.DropSeries(batch, isLastBatch); err != nil {
return err
}
if err := e.sfile.DeleteSeriesID(sid); err != nil {
} else if err := e.sfile.DeleteSeriesIDs(ids); err != nil {
return err
}
}

View File

@ -3,6 +3,7 @@ package tsm1_test
import (
"bytes"
"context"
"fmt"
"reflect"
"testing"
@ -119,3 +120,39 @@ func TestEngine_DeletePrefix(t *testing.T) {
}
}
}
func BenchmarkEngine_DeletePrefixRange(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
e, err := NewEngine(tsm1.NewConfig(), b)
if err != nil {
b.Fatal(err)
} else if err := e.Open(context.Background()); err != nil {
b.Fatal(err)
}
defer e.Close()
const n = 100000
var points []models.Point
for i := 0; i < n; i++ {
points = append(points, MustParsePointString(fmt.Sprintf("cpu,host=A%d value=1", i), "mm0"))
points = append(points, MustParsePointString(fmt.Sprintf("cpu,host=B%d value=1", i), "mm1"))
}
if err := e.writePoints(points...); err != nil {
b.Fatal(err)
}
if err := e.WriteSnapshot(context.Background(), tsm1.CacheStatusColdNoWrites); err != nil {
b.Fatal(err)
} else if got, want := len(e.FileStore.Keys()), n*2; got != want {
b.Fatalf("len(Keys())=%d, want %d", got, want)
}
b.StartTimer()
if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil {
b.Fatal(err)
} else if err := e.Close(); err != nil {
b.Fatal(err)
}
}
}