Add a BatchDelete capability to TSMReader
parent
44e782f173
commit
4ed19348fd
|
@ -39,6 +39,9 @@ type TSMReader struct {
|
|||
|
||||
// lastModified is the last time this file was modified on disk
|
||||
lastModified int64
|
||||
|
||||
// deleteMu limits concurrent deletes
|
||||
deleteMu sync.Mutex
|
||||
}
|
||||
|
||||
// TSMIndex represent the index section of a TSM file. The index records all
|
||||
|
@ -455,27 +458,14 @@ func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// If the keys can't exist in this TSM file, skip it.
|
||||
minKey, maxKey := keys[0], keys[len(keys)-1]
|
||||
if !t.index.OverlapsKeyRange(minKey, maxKey) {
|
||||
return nil
|
||||
batch := t.BatchDelete()
|
||||
for _, key := range keys {
|
||||
if err := batch.DeleteRange(key, minTime, maxTime); err != nil {
|
||||
batch.Rollback()
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// If the timerange can't exist in this TSM file, skip it.
|
||||
if !t.index.OverlapsTimeRange(minTime, maxTime) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := t.tombstoner.AddRange(keys, minTime, maxTime); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := t.tombstoner.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
t.index.DeleteRange(keys, minTime, maxTime)
|
||||
return nil
|
||||
return batch.Commit()
|
||||
}
|
||||
|
||||
// Delete deletes blocks indicated by keys.
|
||||
|
@ -591,6 +581,55 @@ func (t *TSMReader) BlockIterator() *BlockIterator {
|
|||
}
|
||||
}
|
||||
|
||||
type BatchDeleter interface {
|
||||
DeleteRange(key []byte, min, max int64) error
|
||||
Commit() error
|
||||
Rollback() error
|
||||
}
|
||||
|
||||
type batchDelete struct {
|
||||
r *TSMReader
|
||||
}
|
||||
|
||||
func (b *batchDelete) DeleteRange(key []byte, minTime, maxTime int64) error {
|
||||
// If the keys can't exist in this TSM file, skip it.
|
||||
if !b.r.index.OverlapsKeyRange(key, key) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the timerange can't exist in this TSM file, skip it.
|
||||
if !b.r.index.OverlapsTimeRange(minTime, maxTime) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := b.r.tombstoner.AddRange([][]byte{key}, minTime, maxTime); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *batchDelete) Commit() error {
|
||||
defer b.r.deleteMu.Unlock()
|
||||
if err := b.r.tombstoner.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return b.r.applyTombstones()
|
||||
}
|
||||
|
||||
func (b *batchDelete) Rollback() error {
|
||||
defer b.r.deleteMu.Unlock()
|
||||
return b.r.tombstoner.Rollback()
|
||||
}
|
||||
|
||||
// BatchDelete returns a BatchDeleter. Only a single goroutine may run a BatchDelete at a time.
|
||||
// Callers must either Commit or Rollback the operation.
|
||||
func (r *TSMReader) BatchDelete() BatchDeleter {
|
||||
r.deleteMu.Lock()
|
||||
return &batchDelete{r: r}
|
||||
}
|
||||
|
||||
// indirectIndex is a TSMIndex that uses a raw byte slice representation of an index. This
|
||||
// implementation can be used for indexes that may be MMAPed into memory.
|
||||
type indirectIndex struct {
|
||||
|
|
|
@ -124,6 +124,12 @@ func (t *Tombstoner) Flush() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (t *Tombstoner) Rollback() error {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
return t.rollback()
|
||||
}
|
||||
|
||||
// ReadAll returns all the tombstones in the Tombstoner's directory.
|
||||
func (t *Tombstoner) ReadAll() ([]Tombstone, error) {
|
||||
t.mu.RLock()
|
||||
|
|
Loading…
Reference in New Issue