Fix performance of dropping large number of keys

pull/5053/head
Jason Wilder 2015-12-08 10:47:06 -07:00
parent 7a9019cf41
commit 9d82e24ca0
9 changed files with 41 additions and 31 deletions

View File

@ -272,7 +272,7 @@ func TestKeyIterator_TSM_MultipleKeysDeleted(t *testing.T) {
} }
r1 := MustTSMReader(dir, 1, points1) r1 := MustTSMReader(dir, 1, points1)
r1.Delete("cpu,host=A#!~#value") r1.Delete([]string{"cpu,host=A#!~#value"})
v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) v2 := tsm1.NewValue(time.Unix(1, 0), float64(1))
v3 := tsm1.NewValue(time.Unix(1, 0), float64(1)) v3 := tsm1.NewValue(time.Unix(1, 0), float64(1))
@ -283,7 +283,7 @@ func TestKeyIterator_TSM_MultipleKeysDeleted(t *testing.T) {
} }
r2 := MustTSMReader(dir, 1, points2) r2 := MustTSMReader(dir, 1, points2)
r2.Delete("cpu,host=A#!~#count") r2.Delete([]string{"cpu,host=A#!~#count"})
iter, err := tsm1.NewTSMKeyIterator(r1, r2) iter, err := tsm1.NewTSMKeyIterator(r1, r2)
if err != nil { if err != nil {

View File

@ -128,8 +128,8 @@ type TSMIndex interface {
// Add records a new block entry for a key in the index. // Add records a new block entry for a key in the index.
Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32) Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32)
// Delete removes the given key from the index. // Delete removes the given keys from the index.
Delete(key string) Delete(keys []string)
// Contains return true if the given key exists in the index. // Contains return true if the given key exists in the index.
Contains(key string) bool Contains(key string) bool
@ -292,11 +292,13 @@ func (d *directIndex) ContainsValue(key string, t time.Time) bool {
return d.Entry(key, t) != nil return d.Entry(key, t) != nil
} }
func (d *directIndex) Delete(key string) { func (d *directIndex) Delete(keys []string) {
d.mu.Lock() d.mu.Lock()
defer d.mu.Unlock() defer d.mu.Unlock()
delete(d.blocks, key) for _, k := range keys {
delete(d.blocks, k)
}
} }
func (d *directIndex) Keys() []string { func (d *directIndex) Keys() []string {
@ -627,14 +629,20 @@ func (d *indirectIndex) KeyCount() int {
return len(d.offsets) return len(d.offsets)
} }
func (d *indirectIndex) Delete(key string) { func (d *indirectIndex) Delete(keys []string) {
d.mu.Lock() d.mu.Lock()
defer d.mu.Unlock() defer d.mu.Unlock()
lookup := map[string]struct{}{}
for _, k := range keys {
lookup[k] = struct{}{}
}
var offsets []int32 var offsets []int32
for _, offset := range d.offsets { for _, offset := range d.offsets {
_, indexKey, _ := readKey(d.b[offset:]) _, indexKey, _ := readKey(d.b[offset:])
if key == indexKey {
if _, ok := lookup[indexKey]; ok {
continue continue
} }
offsets = append(offsets, int32(offset)) offsets = append(offsets, int32(offset))
@ -934,9 +942,7 @@ func (t *TSMReader) applyTombstones() error {
} }
// Update our index // Update our index
for _, tombstone := range tombstones { t.index.Delete(tombstones)
t.index.Delete(tombstone)
}
return nil return nil
} }
@ -1015,12 +1021,12 @@ func (t *TSMReader) ContainsValue(key string, ts time.Time) bool {
return t.index.ContainsValue(key, ts) return t.index.ContainsValue(key, ts)
} }
func (t *TSMReader) Delete(key string) error { func (t *TSMReader) Delete(keys []string) error {
if err := t.tombstoner.Add(key); err != nil { if err := t.tombstoner.Add(keys); err != nil {
return err return err
} }
t.index.Delete(key) t.index.Delete(keys)
return nil return nil
} }

View File

@ -794,7 +794,7 @@ func TestTSMReader_MMAP_Tombstone(t *testing.T) {
t.Fatalf("unexpected error created reader: %v", err) t.Fatalf("unexpected error created reader: %v", err)
} }
if err := r.Delete("mem"); err != nil { if err := r.Delete([]string{"mem"}); err != nil {
t.Fatalf("unexpected error deleting: %v", err) t.Fatalf("unexpected error deleting: %v", err)
} }

View File

@ -260,18 +260,22 @@ func (e *DevEngine) DeleteSeries(seriesKeys []string) error {
e.mu.RLock() e.mu.RLock()
defer e.mu.RUnlock() defer e.mu.RUnlock()
// keyMap is used to see if a given key should be deleted. seriesKey
// are the measurement + tagset (minus separate & field)
keyMap := map[string]struct{}{} keyMap := map[string]struct{}{}
for _, k := range seriesKeys { for _, k := range seriesKeys {
keyMap[k] = struct{}{} keyMap[k] = struct{}{}
} }
var deleteKeys []string
// go through the keys in the file store // go through the keys in the file store
for _, k := range e.FileStore.Keys() { for _, k := range e.FileStore.Keys() {
seriesKey, _ := seriesAndFieldFromCompositeKey(k) seriesKey, _ := seriesAndFieldFromCompositeKey(k)
if _, ok := keyMap[seriesKey]; ok { if _, ok := keyMap[seriesKey]; ok {
e.FileStore.Delete(k) deleteKeys = append(deleteKeys, k)
} }
} }
e.FileStore.Delete(deleteKeys)
// find the keys in the cache and remove them // find the keys in the cache and remove them
walKeys := make([]string, 0) walKeys := make([]string, 0)

View File

@ -47,8 +47,8 @@ type TSMFile interface {
// an error is returned. // an error is returned.
Type(key string) (byte, error) Type(key string) (byte, error)
// Delete removes the key from the set of keys available in this file. // Delete removes the keys from the set of keys available in this file.
Delete(key string) error Delete(keys []string) error
// HasTombstones returns true if file contains values that have been deleted. // HasTombstones returns true if file contains values that have been deleted.
HasTombstones() bool HasTombstones() bool
@ -186,17 +186,15 @@ func (f *FileStore) Type(key string) (byte, error) {
return 0, fmt.Errorf("unknown type for %v", key) return 0, fmt.Errorf("unknown type for %v", key)
} }
func (f *FileStore) Delete(key string) error { func (f *FileStore) Delete(keys []string) error {
f.mu.Lock() f.mu.Lock()
defer f.mu.Unlock() defer f.mu.Unlock()
f.lastModified = time.Now() f.lastModified = time.Now()
for _, file := range f.files { for _, file := range f.files {
if file.Contains(key) { if err := file.Delete(keys); err != nil {
if err := file.Delete(key); err != nil { return err
return err
}
} }
} }
return nil return nil

View File

@ -537,7 +537,7 @@ func TestFileStore_Open_Deleted(t *testing.T) {
t.Fatalf("file count mismatch: got %v, exp %v", got, exp) t.Fatalf("file count mismatch: got %v, exp %v", got, exp)
} }
if err := fs.Delete("cpu,host=server2!~#!value"); err != nil { if err := fs.Delete([]string{"cpu,host=server2!~#!value"}); err != nil {
fatal(t, "deleting", err) fatal(t, "deleting", err)
} }
@ -574,7 +574,7 @@ func TestFileStore_Delete(t *testing.T) {
t.Fatalf("key length mismatch: got %v, exp %v", got, exp) t.Fatalf("key length mismatch: got %v, exp %v", got, exp)
} }
if err := fs.Delete("cpu,host=server2!~#!value"); err != nil { if err := fs.Delete([]string{"cpu,host=server2!~#!value"}); err != nil {
fatal(t, "deleting", err) fatal(t, "deleting", err)
} }

View File

@ -16,7 +16,7 @@ type Tombstoner struct {
Path string Path string
} }
func (t *Tombstoner) Add(key string) error { func (t *Tombstoner) Add(keys []string) error {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
@ -31,7 +31,9 @@ func (t *Tombstoner) Add(key string) error {
return nil return nil
} }
tombstones = append(tombstones, key) for _, k := range keys {
tombstones = append(tombstones, k)
}
return t.writeTombstone(tombstones) return t.writeTombstone(tombstones)
} }

View File

@ -23,7 +23,7 @@ func TestTombstoner_Add(t *testing.T) {
t.Fatalf("length mismatch: got %v, exp %v", got, exp) t.Fatalf("length mismatch: got %v, exp %v", got, exp)
} }
ts.Add("foo") ts.Add([]string{"foo"})
entries, err = ts.ReadAll() entries, err = ts.ReadAll()
if err != nil { if err != nil {
@ -61,7 +61,7 @@ func TestTombstoner_Delete(t *testing.T) {
f := MustTempFile(dir) f := MustTempFile(dir)
ts := &tsm1.Tombstoner{Path: f.Name()} ts := &tsm1.Tombstoner{Path: f.Name()}
ts.Add("foo") ts.Add([]string{"foo"})
// Use a new Tombstoner to verify values are persisted // Use a new Tombstoner to verify values are persisted
ts = &tsm1.Tombstoner{Path: f.Name()} ts = &tsm1.Tombstoner{Path: f.Name()}

View File

@ -519,8 +519,8 @@ func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error {
func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) { func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) {
var n int var n int
for _, k := range w.Keys { for _, k := range w.Keys {
if len(dst)+1 > len(dst)-n { if len(dst[:n])+1+len(k) > len(dst) {
grow := make([]byte, defaultBufLen) grow := make([]byte, len(dst)*2)
dst = append(dst, grow...) dst = append(dst, grow...)
} }