diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 4cabe08b43..d4239e7feb 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -1673,6 +1673,9 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { ids := tsdb.NewSeriesIDSet() measurements := make(map[string]struct{}, 1) + deleteIDList := make([]uint64, 0, 10000) + deleteKeyList := make([][]byte, 0, 10000) + for _, k := range seriesKeys { if len(k) == 0 { continue // This key was wiped because it shouldn't be removed from index. @@ -1702,15 +1705,18 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error { continue } + // Insert deleting series info into queue measurements[string(name)] = struct{}{} - // Remove the series from the local index. - if err := e.index.DropSeries(sid, k, false); err != nil { - return err - } + deleteIDList = append(deleteIDList, sid) + deleteKeyList = append(deleteKeyList, k) // Add the id to the set of delete ids. ids.Add(sid) } + // Remove the series from the local index. + if err := e.index.DropSeriesList(deleteIDList, deleteKeyList, false); err != nil { + return err + } fielsetChanged := false for k := range measurements { diff --git a/tsdb/index.go b/tsdb/index.go index 60016cb585..538a734d90 100644 --- a/tsdb/index.go +++ b/tsdb/index.go @@ -43,6 +43,7 @@ type Index interface { CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error DropSeries(seriesID uint64, key []byte, cascade bool) error + DropSeriesList(seriesID []uint64, key [][]byte, cascade bool) error DropMeasurementIfSeriesNotExist(name []byte) (bool, error) // Used to clean up series in inmem index that were dropped with a shard. diff --git a/tsdb/index/inmem/inmem.go b/tsdb/index/inmem/inmem.go index 5390639134..adb17c7a67 100644 --- a/tsdb/index/inmem/inmem.go +++ b/tsdb/index/inmem/inmem.go @@ -1135,6 +1135,30 @@ func (idx *ShardIndex) DropSeries(seriesID uint64, key []byte, _ bool) error { return nil } +// DropSeriesList removes the provided series ids from the local bitset that tracks +// series in this shard only. +func (idx *ShardIndex) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error { + // All slices must be of equal length. + if len(seriesIDs) != len(keys) { + return errors.New("seriesIDs/keys length mismatch in index") + } + idx.seriesIDSet.Lock() + for i, seriesID := range seriesIDs { + if idx.seriesIDSet.ContainsNoLock(seriesID) { + idx.seriesIDSet.RemoveNoLock(seriesID) + + name := models.ParseName(keys[i]) + if curr := idx.measurements[string(name)]; curr <= 1 { + delete(idx.measurements, string(name)) + } else { + idx.measurements[string(name)] = curr - 1 + } + } + } + idx.seriesIDSet.Unlock() + return nil +} + // DropMeasurementIfSeriesNotExist drops a measurement only if there are no more // series for the measurment. func (idx *ShardIndex) DropMeasurementIfSeriesNotExist(name []byte) (bool, error) { diff --git a/tsdb/index/tsi1/index.go b/tsdb/index/tsi1/index.go index a666d635ac..04dc480443 100644 --- a/tsdb/index/tsi1/index.go +++ b/tsdb/index/tsi1/index.go @@ -824,6 +824,64 @@ func (i *Index) DropSeries(seriesID uint64, key []byte, cascade bool) error { return nil } +// DropSeries drops the provided series from the index. If cascade is true +// and this is the last series to the measurement, the measurment will also be dropped. +func (i *Index) DropSeriesList(seriesIDs []uint64, keys [][]byte, _ bool) error { + // All slices must be of equal length. + if len(seriesIDs) != len(keys) { + return errors.New("seriesIDs/keys length mismatch in index") + } + + // We need to move different series into collections for each partition + // to process. + pSeriesIDs := make([][]uint64, i.PartitionN) + pKeys := make([][][]byte, i.PartitionN) + + for idx, key := range keys { + pidx := i.partitionIdx(key) + pSeriesIDs[pidx] = append(pSeriesIDs[pidx], seriesIDs[idx]) + pKeys[pidx] = append(pKeys[pidx], key) + } + + // Process each subset of series on each partition. + n := i.availableThreads() + + // Store errors. + errC := make(chan error, i.PartitionN) + + var pidx uint32 // Index of maximum Partition being worked on. + for k := 0; k < n; k++ { + go func() { + for { + idx := int(atomic.AddUint32(&pidx, 1) - 1) // Get next partition to work on. + if idx >= len(i.partitions) { + return // No more work. + } + + // Drop from partition. + err := i.partitions[idx].DropSeriesList(pSeriesIDs[idx]) + errC <- err + } + }() + } + + // Check for error + for i := 0; i < cap(errC); i++ { + if err := <-errC; err != nil { + return err + } + } + + // Add sketch tombstone. + i.mu.Lock() + for _, key := range keys { + i.sTSketch.Add(key) + } + i.mu.Unlock() + + return nil +} + // DropSeriesGlobal is a no-op on the tsi1 index. func (i *Index) DropSeriesGlobal(key []byte) error { return nil } diff --git a/tsdb/index/tsi1/index_test.go b/tsdb/index/tsi1/index_test.go index c49fa912f0..d0603e2b12 100644 --- a/tsdb/index/tsi1/index_test.go +++ b/tsdb/index/tsi1/index_test.go @@ -466,6 +466,69 @@ func TestIndex_TagValueSeriesIDIterator(t *testing.T) { }) } +func TestIndex_DropSeriesList(t *testing.T) { + idx := MustOpenDefaultIndex() // Uses the single series creation method CreateSeriesIfNotExists + defer idx.Close() + + // Add some series. + data := []struct { + Key string + Name string + Tags map[string]string + }{ + {"cpu,region=west,server=a", "cpu", map[string]string{"region": "west", "server": "a"}}, + {"cpu,region=west,server=b", "cpu", map[string]string{"region": "west", "server": "b"}}, + {"cpu,region=west,server=c", "cpu", map[string]string{"region": "west", "server": "c"}}, + {"cpu,region=east,server=a", "cpu", map[string]string{"region": "east", "server": "a"}}, + {"cpu,region=east,server=c", "cpu", map[string]string{"region": "east", "server": "c"}}, + {"cpu,region=east,server=d", "cpu", map[string]string{"region": "east", "server": "d"}}, + {"cpu,region=north,server=b", "cpu", map[string]string{"region": "north", "server": "b"}}, + {"cpu,region=north,server=c", "cpu", map[string]string{"region": "north", "server": "c"}}, + {"cpu,region=north,server=d", "cpu", map[string]string{"region": "north", "server": "d"}}, + {"cpu,region=south,server=a", "cpu", map[string]string{"region": "south", "server": "a"}}, + {"cpu,region=south,server=d", "cpu", map[string]string{"region": "south", "server": "d"}}, + } + + keys := make([][]byte, 0, 15) + seriesIDs := make([]uint64, 0, 15) + for _, pt := range data { + if err := idx.CreateSeriesIfNotExists([]byte(pt.Key), []byte(pt.Name), models.NewTags(pt.Tags)); err != nil { + t.Fatal(err) + } + + keys = append(keys, []byte(pt.Key)) + seriesIDs = append(seriesIDs, idx.Index.SeriesFile().SeriesID([]byte(pt.Name), models.NewTags(pt.Tags), nil)) + } + + // Drop series list + if err := idx.DropSeriesList(seriesIDs[0:len(seriesIDs)-2], keys[0:len(keys)-2], false); err != nil { + t.Fatal(err) + } + + // Verify series still exists. + idx.Run(t, func(t *testing.T) { + if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil { + t.Fatal(err) + } else if !v { + t.Fatal("expected series to still exist") + } + }) + + // Drop series list lefted + if err := idx.DropSeriesList(seriesIDs[len(seriesIDs)-2:], keys[len(keys)-2:], false); err != nil { + t.Fatal(err) + } + + // Verify series is now deleted. + idx.Run(t, func(t *testing.T) { + if v, err := idx.MeasurementHasSeries([]byte("cpu")); err != nil { + t.Fatal(err) + } else if v { + t.Fatal("expected series to be deleted") + } + }) +} + // Index is a test wrapper for tsi1.Index. type Index struct { *tsi1.Index diff --git a/tsdb/index/tsi1/log_file.go b/tsdb/index/tsi1/log_file.go index 238128bf68..54a56e28ed 100644 --- a/tsdb/index/tsi1/log_file.go +++ b/tsdb/index/tsi1/log_file.go @@ -578,6 +578,23 @@ func (f *LogFile) DeleteSeriesID(id uint64) error { return f.FlushAndSync() } +// DeleteSeriesIDList adds a tombstone for seriesIDList +func (f *LogFile) DeleteSeriesIDList(ids []uint64) error { + f.mu.Lock() + defer f.mu.Unlock() + + for _, id := range ids { + e := LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: id} + if err := f.appendEntry(&e); err != nil { + return err + } + f.execEntry(&e) + } + + // Flush buffer and sync to disk. + return f.FlushAndSync() +} + // SeriesN returns the total number of series in the file. func (f *LogFile) SeriesN() (n uint64) { f.mu.RLock() @@ -1056,6 +1073,21 @@ func (f *LogFile) seriesSketches() (sketch, tSketch estimator.Sketch, err error) return sketch, tSketch, nil } +func (f *LogFile) Writes(entries []LogEntry) error { + f.mu.RLock() + defer f.mu.RUnlock() + + for i := range entries { + entry := &entries[i] + if err := f.appendEntry(entry); err != nil { + return err + } + f.execEntry(entry) + } + // Flush buffer and sync to disk. + return f.FlushAndSync() +} + // LogEntry represents a single log entry in the write-ahead log. type LogEntry struct { Flag byte // flag diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 593588e649..827d72cb60 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -585,31 +585,20 @@ func (p *Partition) DropMeasurement(name []byte) error { } defer fs.Release() + entries := make([]LogEntry, 0, 100) // Delete all keys and values. if kitr := fs.TagKeyIterator(name); kitr != nil { for k := kitr.Next(); k != nil; k = kitr.Next() { // Delete key if not already deleted. if !k.Deleted() { - if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() - return p.activeLogFile.DeleteTagKey(name, k.Key()) - }(); err != nil { - return err - } + entries = append(entries, LogEntry{Flag: LogEntryTagKeyTombstoneFlag, Name: name, Key: k.Key()}) } // Delete each value in key. if vitr := k.TagValueIterator(); vitr != nil { for v := vitr.Next(); v != nil; v = vitr.Next() { if !v.Deleted() { - if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() - return p.activeLogFile.DeleteTagValue(name, k.Key(), v.Value()) - }(); err != nil { - return err - } + entries = append(entries, LogEntry{Flag: LogEntryTagValueTombstoneFlag, Name: name, Key: k.Key(), Value: v.Value()}) } } } @@ -626,13 +615,7 @@ func (p *Partition) DropMeasurement(name []byte) error { } else if elem.SeriesID == 0 { break } - if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() - return p.activeLogFile.DeleteSeriesID(elem.SeriesID) - }(); err != nil { - return err - } + entries = append(entries, LogEntry{Flag: LogEntrySeriesTombstoneFlag, SeriesID: elem.SeriesID}) } if err = itr.Close(); err != nil { return err @@ -640,13 +623,14 @@ func (p *Partition) DropMeasurement(name []byte) error { } // Mark measurement as deleted. - if err := func() error { - p.mu.RLock() - defer p.mu.RUnlock() - return p.activeLogFile.DeleteMeasurement(name) - }(); err != nil { + entries = append(entries, LogEntry{Flag: LogEntryMeasurementTombstoneFlag, Name: name}) + + p.mu.RLock() + if err := p.activeLogFile.Writes(entries); err != nil { + p.mu.RUnlock() return err } + p.mu.RUnlock() // Check if the log file needs to be swapped. if err := p.CheckLogFile(); err != nil { @@ -705,6 +689,28 @@ func (p *Partition) DropSeries(seriesID uint64) error { return p.CheckLogFile() } +func (p *Partition) DropSeriesList(seriesIDs []uint64) error { + if len(seriesIDs) == 0 { + return nil + } + + // Delete series from index. + if err := func() error { + p.mu.RLock() + defer p.mu.RUnlock() + return p.activeLogFile.DeleteSeriesIDList(seriesIDs) + }(); err != nil { + return err + } + + for _, seriesID := range seriesIDs { + p.seriesIDSet.Remove(seriesID) + } + + // Swap log file, if necessary. + return p.CheckLogFile() +} + // MeasurementsSketches returns the two sketches for the partition by merging all // instances of the type sketch types in all the index files. func (p *Partition) MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error) {