Allow buffering tombstones before writing to disk

pull/9084/head
Jason Wilder 2017-10-20 09:40:08 -06:00
parent 7cae889b13
commit 17bae05370
4 changed files with 106 additions and 16 deletions

View File

@ -461,6 +461,10 @@ func TestCompactor_CompactFull_TombstonedSkipBlock(t *testing.T) {
}
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, math.MinInt64, math.MaxInt64)
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
a3 := tsm1.NewValue(3, 1.3)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a3},
@ -563,6 +567,10 @@ func TestCompactor_CompactFull_TombstonedPartialBlock(t *testing.T) {
// a1 should remain after compaction
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, math.MaxInt64)
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
a3 := tsm1.NewValue(3, 1.3)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a3},
@ -670,6 +678,10 @@ func TestCompactor_CompactFull_TombstonedMultipleRanges(t *testing.T) {
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 2, 2)
ts.AddRange([][]byte{[]byte("cpu,host=A#!~#value")}, 4, 4)
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
a5 := tsm1.NewValue(5, 1.5)
writes = map[string][]tsm1.Value{
"cpu,host=A#!~#value": []tsm1.Value{a5},

View File

@ -470,6 +470,10 @@ func (t *TSMReader) DeleteRange(keys [][]byte, minTime, maxTime int64) error {
return err
}
if err := t.tombstoner.Flush(); err != nil {
return err
}
t.index.DeleteRange(keys, minTime, maxTime)
return nil
}
@ -480,6 +484,10 @@ func (t *TSMReader) Delete(keys [][]byte) error {
return err
}
if err := t.tombstoner.Flush(); err != nil {
return err
}
t.index.Delete(keys)
return nil
}

View File

@ -32,6 +32,9 @@ type Tombstoner struct {
// indicates that the stats may be out of sync with what is on disk and they
// should be refreshed.
statsLoaded bool
// Tombstones that have been written but not flushed to disk yet.
tombstones []Tombstone
}
// Tombstone represents an individual deletion.
@ -66,31 +69,58 @@ func (t *Tombstoner) AddRange(keys [][]byte, min, max int64) error {
t.statsLoaded = false
tombstones, err := t.readTombstone()
if err != nil {
return nil
}
if cap(tombstones) < len(tombstones)+len(keys) {
ts := make([]Tombstone, len(tombstones), len(tombstones)+len(keys))
copy(ts, tombstones)
tombstones = ts
if cap(t.tombstones) < len(t.tombstones)+len(keys) {
ts := make([]Tombstone, len(t.tombstones), len(t.tombstones)+len(keys))
copy(ts, t.tombstones)
t.tombstones = ts
}
for _, k := range keys {
tombstones = append(tombstones, Tombstone{
t.tombstones = append(t.tombstones, Tombstone{
Key: k,
Min: min,
Max: max,
})
}
return t.writeTombstone(tombstones)
return nil
}
func (t *Tombstoner) Flush() error {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.tombstones) == 0 {
return nil
}
existing, err := t.readTombstone()
if err != nil {
return nil
}
if err := t.writeTombstone(append(existing, t.tombstones...)); err != nil {
return err
}
t.tombstones = t.tombstones[:0]
return nil
}
// ReadAll returns all the tombstones in the Tombstoner's directory.
func (t *Tombstoner) ReadAll() ([]Tombstone, error) {
return t.readTombstone()
t.mu.RLock()
defer t.mu.RUnlock()
tombstones, err := t.readTombstone()
if err != nil {
return nil, err
}
if len(t.tombstones) == 0 {
return tombstones, nil
}
return append(tombstones, t.tombstones...), nil
}
// Delete removes all the tombstone files from disk.
@ -107,7 +137,11 @@ func (t *Tombstoner) Delete() error {
// HasTombstones return true if there are any tombstone entries recorded.
func (t *Tombstoner) HasTombstones() bool {
files := t.TombstoneFiles()
return len(files) > 0 && files[0].Size > 0
t.mu.RLock()
n := len(t.tombstones)
t.mu.RUnlock()
return len(files) > 0 && files[0].Size > 0 || n > 0
}
// TombstoneFiles returns any tombstone files associated with Tombstoner's TSM file.
@ -266,7 +300,17 @@ func (t *Tombstoner) readTombstoneV1(f *os.File, fn func(t Tombstone) error) err
return err
}
}
return r.Err()
if err := r.Err(); err != nil {
return err
}
for _, t := range t.tombstones {
if err := fn(t); err != nil {
return err
}
}
return nil
}
// readTombstoneV2 reads the second version of tombstone files that are capable
@ -292,7 +336,7 @@ func (t *Tombstoner) readTombstoneV2(f *os.File, fn func(t Tombstone) error) err
b := make([]byte, 4096)
for {
if n >= size {
return nil
break
}
if _, err = f.Read(b[:4]); err != nil {
@ -332,6 +376,13 @@ func (t *Tombstoner) readTombstoneV2(f *os.File, fn func(t Tombstone) error) err
return err
}
}
for _, t := range t.tombstones {
if err := fn(t); err != nil {
return err
}
}
return nil
}
// readTombstoneV3 reads the third version of tombstone files that are capable
@ -357,7 +408,7 @@ func (t *Tombstoner) readTombstoneV3(f *os.File, fn func(t Tombstone) error) err
b := make([]byte, 4096)
for {
if _, err = io.ReadFull(gr, b[:4]); err == io.EOF || err == io.ErrUnexpectedEOF {
return nil
break
} else if err != nil {
return err
}
@ -395,6 +446,13 @@ func (t *Tombstoner) readTombstoneV3(f *os.File, fn func(t Tombstone) error) err
return err
}
}
for _, t := range t.tombstones {
if err := fn(t); err != nil {
return err
}
}
return nil
}
func (t *Tombstoner) tombstonePath() string {

View File

@ -31,6 +31,10 @@ func TestTombstoner_Add(t *testing.T) {
ts.Add([][]byte{[]byte("foo")})
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
entries, err = ts.ReadAll()
if err != nil {
fatal(t, "ReadAll", err)
@ -95,6 +99,10 @@ func TestTombstoner_Add_Empty(t *testing.T) {
ts.Add([][]byte{})
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing tombstone: %v", err)
}
// Use a new Tombstoner to verify values are persisted
ts = &tsm1.Tombstoner{Path: f.Name()}
entries, err = ts.ReadAll()
@ -122,6 +130,10 @@ func TestTombstoner_Delete(t *testing.T) {
ts.Add([][]byte{[]byte("foo")})
if err := ts.Flush(); err != nil {
t.Fatalf("unexpected error flushing: %v", err)
}
// Use a new Tombstoner to verify values are persisted
ts = &tsm1.Tombstoner{Path: f.Name()}
entries, err := ts.ReadAll()