diff --git a/tsdb/engine/tsm1/reader.go b/tsdb/engine/tsm1/reader.go index 9de6714ccb..e7865d3a49 100644 --- a/tsdb/engine/tsm1/reader.go +++ b/tsdb/engine/tsm1/reader.go @@ -39,6 +39,9 @@ type TSMReader struct { // lastModified is the last time this file was modified on disk lastModified int64 + + // deleteMu limits concurrent deletes + deleteMu sync.Mutex } // TSMIndex represent the index section of a TSM file. The index records all @@ -455,27 +458,14 @@ func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error { return nil } - // If the keys can't exist in this TSM file, skip it. - minKey, maxKey := keys[0], keys[len(keys)-1] - if !t.index.OverlapsKeyRange(minKey, maxKey) { - return nil + batch := t.BatchDelete() + for _, key := range keys { + if err := batch.DeleteRange(key, minTime, maxTime); err != nil { + batch.Rollback() + return err + } } - - // If the timerange can't exist in this TSM file, skip it. - if !t.index.OverlapsTimeRange(minTime, maxTime) { - return nil - } - - if err := t.tombstoner.AddRange(keys, minTime, maxTime); err != nil { - return err - } - - if err := t.tombstoner.Flush(); err != nil { - return err - } - - t.index.DeleteRange(keys, minTime, maxTime) - return nil + return batch.Commit() } // Delete deletes blocks indicated by keys. @@ -591,6 +581,55 @@ func (t *TSMReader) BlockIterator() *BlockIterator { } } +type BatchDeleter interface { + DeleteRange(key []byte, min, max int64) error + Commit() error + Rollback() error +} + +type batchDelete struct { + r *TSMReader +} + +func (b *batchDelete) DeleteRange(key []byte, minTime, maxTime int64) error { + // If the keys can't exist in this TSM file, skip it. + if !b.r.index.OverlapsKeyRange(key, key) { + return nil + } + + // If the timerange can't exist in this TSM file, skip it. + if !b.r.index.OverlapsTimeRange(minTime, maxTime) { + return nil + } + + if err := b.r.tombstoner.AddRange([][]byte{key}, minTime, maxTime); err != nil { + return err + } + + return nil +} + +func (b *batchDelete) Commit() error { + defer b.r.deleteMu.Unlock() + if err := b.r.tombstoner.Flush(); err != nil { + return err + } + + return b.r.applyTombstones() +} + +func (b *batchDelete) Rollback() error { + defer b.r.deleteMu.Unlock() + return b.r.tombstoner.Rollback() +} + +// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time. +// Callers must either Commit or Rollback the operation. +func (r *TSMReader) BatchDelete() BatchDeleter { + r.deleteMu.Lock() + return &batchDelete{r: r} +} + // indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This // implementation can be used for indexes that may be MMAPed into memory. type indirectIndex struct { diff --git a/tsdb/engine/tsm1/tombstone.go b/tsdb/engine/tsm1/tombstone.go index c8e41ee6c0..9e540ebedf 100644 --- a/tsdb/engine/tsm1/tombstone.go +++ b/tsdb/engine/tsm1/tombstone.go @@ -124,6 +124,12 @@ func (t *Tombstoner) Flush() error { return nil } +func (t *Tombstoner) Rollback() error { + t.mu.Lock() + defer t.mu.Unlock() + return t.rollback() +} + // ReadAll returns all the tombstones in the Tombstoner's directory. func (t *Tombstoner) ReadAll() ([]Tombstone, error) { t.mu.RLock()