diff --git a/tsdb/seriesfile/series_file.go b/tsdb/seriesfile/series_file.go index 8b3ce35d64..ad5f9718d9 100644 --- a/tsdb/seriesfile/series_file.go +++ b/tsdb/seriesfile/series_file.go @@ -249,14 +249,21 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(collection *tsdb.SeriesCollecti return nil } -// DeleteSeriesID flags a series as permanently deleted. -// If the series is reintroduced later then it must create a new id. -func (f *SeriesFile) DeleteSeriesID(id tsdb.SeriesID) error { - p := f.SeriesIDPartition(id) - if p == nil { - return ErrInvalidSeriesPartitionID +// DeleteSeriesID flags a list of series as permanently deleted. +// If a series is reintroduced later then it must create a new id. +func (f *SeriesFile) DeleteSeriesIDs(ids []tsdb.SeriesID) error { + m := make(map[int][]tsdb.SeriesID) + for _, id := range ids { + partitionID := f.SeriesIDPartitionID(id) + m[partitionID] = append(m[partitionID], id) } - return p.DeleteSeriesID(id) + + var g errgroup.Group + for partitionID, partitionIDs := range m { + partitionID, partitionIDs := partitionID, partitionIDs + g.Go(func() error { return f.partitions[partitionID].DeleteSeriesIDs(partitionIDs) }) + } + return g.Wait() } // IsDeleted returns true if the ID has been deleted before. diff --git a/tsdb/seriesfile/series_file_test.go b/tsdb/seriesfile/series_file_test.go index b54ea14469..b5d5f15238 100644 --- a/tsdb/seriesfile/series_file_test.go +++ b/tsdb/seriesfile/series_file_test.go @@ -218,7 +218,7 @@ func TestSeriesFile_DeleteSeriesID(t *testing.T) { } // Delete and ensure deletion. - if err := sfile.DeleteSeriesID(id); err != nil { + if err := sfile.DeleteSeriesIDs([]tsdb.SeriesID{id}); err != nil { t.Fatal(err) } else if !sfile.IsDeleted(id) { t.Fatal("expected deletion before compaction") @@ -301,7 +301,7 @@ func TestSeriesFile_Compaction(t *testing.T) { if id := sfile.SeriesID(collection.Names[i], collection.Tags[i], nil); id.IsZero() { t.Fatal("expected series id") - } else if err := sfile.DeleteSeriesID(id); err != nil { + } else if err := sfile.DeleteSeriesIDs([]tsdb.SeriesID{id}); err != nil { t.Fatal(err) } } @@ -387,7 +387,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) { // Delete a subset of keys. for i := 0; i < len(ids); i += 10 { - if err := sfile.DeleteSeriesID(ids[i]); err != nil { + if err := sfile.DeleteSeriesIDs([]tsdb.SeriesID{ids[i]}); err != nil { b.Fatal(err) } } diff --git a/tsdb/seriesfile/series_partition.go b/tsdb/seriesfile/series_partition.go index aa6a98af74..03eaa22f82 100644 --- a/tsdb/seriesfile/series_partition.go +++ b/tsdb/seriesfile/series_partition.go @@ -368,9 +368,9 @@ func (p *SeriesPartition) Compacting() bool { return p.compacting } -// DeleteSeriesID flags a series as permanently deleted. -// If the series is reintroduced later then it must create a new id. -func (p *SeriesPartition) DeleteSeriesID(id tsdb.SeriesID) error { +// DeleteSeriesID flags a list of series as permanently deleted. +// If a series is reintroduced later then it must create a new id. +func (p *SeriesPartition) DeleteSeriesIDs(ids []tsdb.SeriesID) error { p.mu.Lock() defer p.mu.Unlock() @@ -378,15 +378,19 @@ func (p *SeriesPartition) DeleteSeriesID(id tsdb.SeriesID) error { return ErrSeriesPartitionClosed } - // Already tombstoned, ignore. - if p.index.IsDeleted(id) { - return nil - } + var n uint64 + for _, id := range ids { + // Already tombstoned, ignore. + if p.index.IsDeleted(id) { + continue + } - // Write tombstone entry. The type is ignored in tombstones. - _, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id.WithType(models.Empty), nil)) - if err != nil { - return err + // Write tombstone entries. The type is ignored in tombstones. + _, err := p.writeLogEntry(AppendSeriesEntry(nil, SeriesEntryTombstoneFlag, id.WithType(models.Empty), nil)) + if err != nil { + return err + } + n++ } // Flush active segment write. @@ -397,8 +401,11 @@ func (p *SeriesPartition) DeleteSeriesID(id tsdb.SeriesID) error { } // Mark tombstone in memory. - p.index.Delete(id) - p.tracker.SubSeries(1) + for _, id := range ids { + p.index.Delete(id) + } + p.tracker.SubSeries(n) + return nil } diff --git a/tsdb/seriesfile/series_verify_test.go b/tsdb/seriesfile/series_verify_test.go index badcf9cd4e..873e4f37b4 100644 --- a/tsdb/seriesfile/series_verify_test.go +++ b/tsdb/seriesfile/series_verify_test.go @@ -113,7 +113,7 @@ func NewTest(t *testing.T) *Test { } // delete one series - if err := seriesFile.DeleteSeriesID(tsdb.NewSeriesID(ids[0])); err != nil { + if err := seriesFile.DeleteSeriesIDs([]tsdb.SeriesID{tsdb.NewSeriesID(ids[0])}); err != nil { return err } diff --git a/tsdb/tsi1/index.go b/tsdb/tsi1/index.go index 20930b3d79..4d33325776 100644 --- a/tsdb/tsi1/index.go +++ b/tsdb/tsi1/index.go @@ -28,6 +28,7 @@ import ( "github.com/influxdata/influxql" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) // ErrCompactionInterrupted is returned if compactions are disabled or @@ -373,11 +374,6 @@ func (i *Index) PartitionAt(index int) *Partition { return i.partitions[index] } -// partition returns the appropriate Partition for a provided series key. -func (i *Index) partition(key []byte) *Partition { - return i.partitions[int(xxhash.Sum64(key)&(i.PartitionN-1))] -} - // partitionIdx returns the index of the partition that key belongs in. func (i *Index) partitionIdx(key []byte) int { return int(xxhash.Sum64(key) & (i.PartitionN - 1)) @@ -732,11 +728,23 @@ func (i *Index) InitializeSeries(*tsdb.SeriesCollection) error { return nil } -// DropSeries drops the provided series from the index. If cascade is true +// DropSeries drops the provided set of series from the index. If cascade is true // and this is the last series to the measurement, the measurment will also be dropped. -func (i *Index) DropSeries(seriesID tsdb.SeriesID, key []byte, cascade bool) error { - // Remove from partition. - if err := i.partition(key).DropSeries(seriesID); err != nil { +func (i *Index) DropSeries(items []DropSeriesItem, cascade bool) error { + // Split into batches for each partition. + m := make(map[int][]tsdb.SeriesID) + for _, item := range items { + partitionID := i.partitionIdx(item.Key) + m[partitionID] = append(m[partitionID], item.SeriesID) + } + + // Remove from all partitions in parallel. + var g errgroup.Group + for partitionID, ids := range m { + partitionID, ids := partitionID, ids + g.Go(func() error { return i.partitions[partitionID].DropSeries(ids) }) + } + if err := g.Wait(); err != nil { return err } @@ -744,29 +752,38 @@ func (i *Index) DropSeries(seriesID tsdb.SeriesID, key []byte, cascade bool) err return nil } - // Extract measurement name & tags. - name, tags := models.ParseKeyBytes(key) + // Clear tag value cache & determine unique set of measurement names. + nameSet := make(map[string]struct{}) + for _, item := range items { + // Extract measurement name & tags. + name, tags := models.ParseKeyBytes(item.Key) + nameSet[string(name)] = struct{}{} - // If there are cached sets for any of the tag pairs, they will need to be - // updated with the series id. - i.tagValueCache.RLock() - if i.tagValueCache.measurementContainsSets(name) { - for _, pair := range tags { - i.tagValueCache.delete(name, pair.Key, pair.Value, seriesID) // Takes a lock on the series id set + // If there are cached sets for any of the tag pairs, they will need to be + // updated with the series id. + i.tagValueCache.RLock() + if i.tagValueCache.measurementContainsSets(name) { + for _, pair := range tags { + i.tagValueCache.delete(name, pair.Key, pair.Value, item.SeriesID) // Takes a lock on the series id set + } } - } - i.tagValueCache.RUnlock() - - // Check if that was the last series for the measurement in the entire index. - if ok, err := i.MeasurementHasSeries(name); err != nil { - return err - } else if ok { - return nil + i.tagValueCache.RUnlock() } - // If no more series exist in the measurement then delete the measurement. - if err := i.DropMeasurement(name); err != nil { - return err + for name := range nameSet { + namebytes := []byte(name) + + // Check if that was the last series for the measurement in the entire index. + if ok, err := i.MeasurementHasSeries(namebytes); err != nil { + return err + } else if ok { + continue + } + + // If no more series exist in the measurement then delete the measurement. + if err := i.DropMeasurement(namebytes); err != nil { + return err + } } return nil } @@ -1664,3 +1681,8 @@ func (itr *filterUndeletedSeriesIDIterator) Next() (tsdb.SeriesIDElem, error) { return e, nil } } + +type DropSeriesItem struct { + SeriesID tsdb.SeriesID + Key []byte +} diff --git a/tsdb/tsi1/index_test.go b/tsdb/tsi1/index_test.go index da236291b5..625e3ad55b 100644 --- a/tsdb/tsi1/index_test.go +++ b/tsdb/tsi1/index_test.go @@ -109,7 +109,7 @@ func TestIndex_MeasurementExists(t *testing.T) { } // Delete one series. - if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil { + if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: sid, Key: models.MakeKey(name, tags)}}, true); err != nil { t.Fatal(err) } @@ -128,7 +128,7 @@ func TestIndex_MeasurementExists(t *testing.T) { if sid.IsZero() { t.Fatalf("got 0 series id for %s/%v", name, tags) } - if err := idx.DropSeries(sid, models.MakeKey(name, tags), true); err != nil { + if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: sid, Key: models.MakeKey(name, tags)}}, true); err != nil { t.Fatal(err) } @@ -378,7 +378,7 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) { } seriesID := idx.SeriesFile.SeriesID([]byte("cpu"), models.NewTags(map[string]string{"region": "west"}), nil) - if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil { + if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: seriesID, Key: idx.SeriesFile.SeriesKey(seriesID)}}, true); err != nil { t.Fatal(err) } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { t.Fatal(err) @@ -387,7 +387,7 @@ func TestIndex_MeasurementCardinalityStats(t *testing.T) { } seriesID = idx.SeriesFile.SeriesID([]byte("mem"), models.NewTags(map[string]string{"region": "east"}), nil) - if err := idx.DropSeries(seriesID, idx.SeriesFile.SeriesKey(seriesID), true); err != nil { + if err := idx.DropSeries([]tsi1.DropSeriesItem{{SeriesID: seriesID, Key: idx.SeriesFile.SeriesKey(seriesID)}}, true); err != nil { t.Fatal(err) } else if stats, err := idx.MeasurementCardinalityStats(); err != nil { t.Fatal(err) diff --git a/tsdb/tsi1/log_file.go b/tsdb/tsi1/log_file.go index 07ed794db2..bef122ba24 100644 --- a/tsdb/tsi1/log_file.go +++ b/tsdb/tsi1/log_file.go @@ -456,21 +456,33 @@ func (f *LogFile) TagValueIterator(name, key []byte) TagValueIterator { return tk.TagValueIterator() } -// DeleteTagKey adds a tombstone for a tag key to the log file. -func (f *LogFile) DeleteTagKey(name, key []byte) error { - f.mu.Lock() - defer f.mu.Unlock() - +// deleteTagKey adds a tombstone for a tag key to the log file without a lock. +func (f *LogFile) deleteTagKey(name, key []byte) error { e := LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: key} if err := f.appendEntry(&e); err != nil { return err } f.execEntry(&e) + return nil +} - // Flush buffer and sync to disk. +// DeleteTagKey adds a tombstone for a tag key to the log file. +func (f *LogFile) DeleteTagKey(name, key []byte) error { + f.mu.Lock() + defer f.mu.Unlock() + if err := f.deleteTagKey(name, key); err != nil { + return err + } return f.FlushAndSync() } +// DeleteTagKeyNoSync adds a tombstone for a tag key to the log file without a sync. +func (f *LogFile) DeleteTagKeyNoSync(name, key []byte) error { + f.mu.Lock() + defer f.mu.Unlock() + return f.deleteTagKey(name, key) +} + // TagValueSeriesIDSet returns a series iterator for a tag value. func (f *LogFile) TagValueSeriesIDSet(name, key, value []byte) (*tsdb.SeriesIDSet, error) { f.mu.RLock() @@ -525,21 +537,34 @@ func (f *LogFile) TagValueN() (n uint64) { return n } -// DeleteTagValue adds a tombstone for a tag value to the log file. -func (f *LogFile) DeleteTagValue(name, key, value []byte) error { - f.mu.Lock() - defer f.mu.Unlock() - +// deleteTagValue adds a tombstone for a tag value to the log file without a lock. +func (f *LogFile) deleteTagValue(name, key, value []byte) error { e := LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: key, Value: value} if err := f.appendEntry(&e); err != nil { return err } f.execEntry(&e) + return nil +} - // Flush buffer and sync to disk. +// DeleteTagValue adds a tombstone for a tag value to the log file. +func (f *LogFile) DeleteTagValue(name, key, value []byte) error { + f.mu.Lock() + defer f.mu.Unlock() + if err := f.deleteTagValue(name, key, value); err != nil { + return err + } return f.FlushAndSync() } +// DeleteTagValueNoSync adds a tombstone for a tag value to the log file. +// Caller must call FlushAndSync(). +func (f *LogFile) DeleteTagValueNoSync(name, key, value []byte) error { + f.mu.Lock() + defer f.mu.Unlock() + return f.deleteTagValue(name, key, value) +} + // AddSeriesList adds a list of series to the log file in bulk. func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, collection *tsdb.SeriesCollection) ([]tsdb.SeriesID, error) { var writeRequired bool @@ -608,16 +633,18 @@ func (f *LogFile) AddSeriesList(seriesSet *tsdb.SeriesIDSet, collection *tsdb.Se return seriesIDs, nil } -// DeleteSeriesID adds a tombstone for a series id. -func (f *LogFile) DeleteSeriesID(id tsdb.SeriesID) error { +// DeleteSeriesIDs adds a tombstone for a list of series ids. +func (f *LogFile) DeleteSeriesIDs(ids []tsdb.SeriesID) error { f.mu.Lock() defer f.mu.Unlock() - e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} - if err := f.appendEntry(&e); err != nil { - return err + for _, id := range ids { + e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} + if err := f.appendEntry(&e); err != nil { + return err + } + f.execEntry(&e) } - f.execEntry(&e) // Flush buffer and sync to disk. return f.FlushAndSync() diff --git a/tsdb/tsi1/partition.go b/tsdb/tsi1/partition.go index c3519ef0e4..23d92686b7 100644 --- a/tsdb/tsi1/partition.go +++ b/tsdb/tsi1/partition.go @@ -621,7 +621,7 @@ func (p *Partition) DropMeasurement(name []byte) error { if err := func() error { p.mu.RLock() defer p.mu.RUnlock() - return p.activeLogFile.DeleteTagKey(name, k.Key()) + return p.activeLogFile.DeleteTagKeyNoSync(name, k.Key()) }(); err != nil { return err } @@ -634,7 +634,7 @@ func (p *Partition) DropMeasurement(name []byte) error { if err := func() error { p.mu.RLock() defer p.mu.RUnlock() - return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value()) + return p.activeLogFile.DeleteTagValueNoSync(name, k.Key(), v.Value()) }(); err != nil { return err } @@ -687,6 +687,15 @@ func (p *Partition) DropMeasurement(name []byte) error { return err } + // Ensure log is flushed & synced. + if err := func() error { + p.mu.RLock() + defer p.mu.RUnlock() + return p.activeLogFile.FlushAndSync() + }(); err != nil { + return err + } + // Check if the log file needs to be swapped. if err := p.CheckLogFile(); err != nil { return err @@ -750,24 +759,27 @@ func (p *Partition) createSeriesListIfNotExists(collection *tsdb.SeriesCollectio return ids, nil } -// DropSeries removes the provided series id from the index. -// -// TODO(edd): We should support a bulk drop here. -func (p *Partition) DropSeries(seriesID tsdb.SeriesID) error { - // Ignore if the series is already deleted. - if !p.seriesIDSet.Contains(seriesID) { - return nil +// DropSeries removes the provided set of series id from the index. +func (p *Partition) DropSeries(ids []tsdb.SeriesID) error { + // Count total affected series. + var n uint64 + for _, id := range ids { + if p.seriesIDSet.Contains(id) { + n++ + } } // Delete series from index. - if err := p.activeLogFile.DeleteSeriesID(seriesID); err != nil { + if err := p.activeLogFile.DeleteSeriesIDs(ids); err != nil { return err } // Update series set. - p.seriesIDSet.Remove(seriesID) - p.tracker.AddSeriesDropped(1) - p.tracker.SubSeries(1) + for _, id := range ids { + p.seriesIDSet.Remove(id) + } + p.tracker.AddSeriesDropped(n) + p.tracker.SubSeries(n) // Swap log file, if necessary. return p.CheckLogFile() diff --git a/tsdb/tsm1/engine_delete_prefix.go b/tsdb/tsm1/engine_delete_prefix.go index 73c5096201..91803478c0 100644 --- a/tsdb/tsm1/engine_delete_prefix.go +++ b/tsdb/tsm1/engine_delete_prefix.go @@ -12,6 +12,7 @@ import ( "github.com/influxdata/influxdb/kit/tracing" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/tsdb" + "github.com/influxdata/influxdb/tsdb/tsi1" "github.com/influxdata/influxql" ) @@ -245,11 +246,11 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma // and remove from the series file. span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "SFile Delete Series IDs") span.LogKV("measurement_name", fmt.Sprintf("%x", name), "series_id_set_size", set.Cardinality()) - set.ForEachNoLock(func(id tsdb.SeriesID) { - if err = e.sfile.DeleteSeriesID(id); err != nil { - return - } - }) + var ids []tsdb.SeriesID + set.ForEachNoLock(func(id tsdb.SeriesID) { ids = append(ids, id) }) + if err = e.sfile.DeleteSeriesIDs(ids); err != nil { + return err + } span.Finish() return err } @@ -257,22 +258,41 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma // This is the slow path, when not dropping the entire bucket (measurement) span, _ = tracing.StartSpanFromContextWithOperationName(rootCtx, "TSI/SFile Delete keys") span.LogKV("measurement_name", fmt.Sprintf("%x", name), "keys_to_delete", len(possiblyDead.keys)) + + // Convert key map to a slice. + possiblyDeadKeysSlice := make([][]byte, 0, len(possiblyDead.keys)) for key := range possiblyDead.keys { - // TODO(jeff): ugh reduce copies here - keyb := []byte(key) - keyb, _ = SeriesAndFieldFromCompositeKey(keyb) + possiblyDeadKeysSlice = append(possiblyDeadKeysSlice, []byte(key)) + } - name, tags := models.ParseKeyBytes(keyb) - sid := e.sfile.SeriesID(name, tags, buf) - if sid.IsZero() { - continue + const batchSize = 1000 + batch := make([]tsi1.DropSeriesItem, 0, batchSize) + ids := make([]tsdb.SeriesID, 0, batchSize) + for i := 0; i < len(possiblyDeadKeysSlice); i += batchSize { + isLastBatch := i+batchSize > len(possiblyDeadKeysSlice) + batch, ids = batch[:0], ids[:0] + + for j := 0; (i*batchSize)+j < len(possiblyDeadKeysSlice) && j < batchSize; j++ { + var item tsi1.DropSeriesItem + + // TODO(jeff): ugh reduce copies here + key := possiblyDeadKeysSlice[(i*batchSize)+j] + item.Key = []byte(key) + item.Key, _ = SeriesAndFieldFromCompositeKey(item.Key) + + name, tags := models.ParseKeyBytes(item.Key) + item.SeriesID = e.sfile.SeriesID(name, tags, buf) + if item.SeriesID.IsZero() { + continue + } + batch = append(batch, item) + ids = append(ids, item.SeriesID) } - if err := e.index.DropSeries(sid, keyb, true); err != nil { + // Remove from index & series file. + if err := e.index.DropSeries(batch, isLastBatch); err != nil { return err - } - - if err := e.sfile.DeleteSeriesID(sid); err != nil { + } else if err := e.sfile.DeleteSeriesIDs(ids); err != nil { return err } } diff --git a/tsdb/tsm1/engine_delete_prefix_test.go b/tsdb/tsm1/engine_delete_prefix_test.go index b017cf21e5..ee2158824e 100644 --- a/tsdb/tsm1/engine_delete_prefix_test.go +++ b/tsdb/tsm1/engine_delete_prefix_test.go @@ -3,6 +3,7 @@ package tsm1_test import ( "bytes" "context" + "fmt" "reflect" "testing" @@ -119,3 +120,39 @@ func TestEngine_DeletePrefix(t *testing.T) { } } } + +func BenchmarkEngine_DeletePrefixRange(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + e, err := NewEngine(tsm1.NewConfig(), b) + if err != nil { + b.Fatal(err) + } else if err := e.Open(context.Background()); err != nil { + b.Fatal(err) + } + defer e.Close() + + const n = 100000 + var points []models.Point + for i := 0; i < n; i++ { + points = append(points, MustParsePointString(fmt.Sprintf("cpu,host=A%d value=1", i), "mm0")) + points = append(points, MustParsePointString(fmt.Sprintf("cpu,host=B%d value=1", i), "mm1")) + } + if err := e.writePoints(points...); err != nil { + b.Fatal(err) + } + + if err := e.WriteSnapshot(context.Background(), tsm1.CacheStatusColdNoWrites); err != nil { + b.Fatal(err) + } else if got, want := len(e.FileStore.Keys()), n*2; got != want { + b.Fatalf("len(Keys())=%d, want %d", got, want) + } + b.StartTimer() + + if err := e.DeletePrefixRange(context.Background(), []byte("mm0"), 0, 3, nil); err != nil { + b.Fatal(err) + } else if err := e.Close(); err != nil { + b.Fatal(err) + } + } +}