From 9d82e24ca0f9ed25d7317c31c144383d3b7d4aae Mon Sep 17 00:00:00 2001 From: Jason Wilder Date: Tue, 8 Dec 2015 10:47:06 -0700 Subject: [PATCH] Fix performance of dropping large number of keys --- tsdb/engine/tsm1/compact_test.go | 4 ++-- tsdb/engine/tsm1/data_file.go | 30 +++++++++++++++++------------ tsdb/engine/tsm1/data_file_test.go | 2 +- tsdb/engine/tsm1/engine.go | 6 +++++- tsdb/engine/tsm1/file_store.go | 12 +++++------- tsdb/engine/tsm1/file_store_test.go | 4 ++-- tsdb/engine/tsm1/tombstone.go | 6 ++++-- tsdb/engine/tsm1/tombstone_test.go | 4 ++-- tsdb/engine/tsm1/wal.go | 4 ++-- 9 files changed, 41 insertions(+), 31 deletions(-) diff --git a/tsdb/engine/tsm1/compact_test.go b/tsdb/engine/tsm1/compact_test.go index 1601c97169..805138aa46 100644 --- a/tsdb/engine/tsm1/compact_test.go +++ b/tsdb/engine/tsm1/compact_test.go @@ -272,7 +272,7 @@ func TestKeyIterator_TSM_MultipleKeysDeleted(t *testing.T) { } r1 := MustTSMReader(dir, 1, points1) - r1.Delete("cpu,host=A#!~#value") + r1.Delete([]string{"cpu,host=A#!~#value"}) v2 := tsm1.NewValue(time.Unix(1, 0), float64(1)) v3 := tsm1.NewValue(time.Unix(1, 0), float64(1)) @@ -283,7 +283,7 @@ func TestKeyIterator_TSM_MultipleKeysDeleted(t *testing.T) { } r2 := MustTSMReader(dir, 1, points2) - r2.Delete("cpu,host=A#!~#count") + r2.Delete([]string{"cpu,host=A#!~#count"}) iter, err := tsm1.NewTSMKeyIterator(r1, r2) if err != nil { diff --git a/tsdb/engine/tsm1/data_file.go b/tsdb/engine/tsm1/data_file.go index d3b7a176cd..33eb3e743c 100644 --- a/tsdb/engine/tsm1/data_file.go +++ b/tsdb/engine/tsm1/data_file.go @@ -128,8 +128,8 @@ type TSMIndex interface { // Add records a new block entry for a key in the index. Add(key string, blockType byte, minTime, maxTime time.Time, offset int64, size uint32) - // Delete removes the given key from the index. - Delete(key string) + // Delete removes the given keys from the index. + Delete(keys []string) // Contains return true if the given key exists in the index. Contains(key string) bool @@ -292,11 +292,13 @@ func (d *directIndex) ContainsValue(key string, t time.Time) bool { return d.Entry(key, t) != nil } -func (d *directIndex) Delete(key string) { +func (d *directIndex) Delete(keys []string) { d.mu.Lock() defer d.mu.Unlock() - delete(d.blocks, key) + for _, k := range keys { + delete(d.blocks, k) + } } func (d *directIndex) Keys() []string { @@ -627,14 +629,20 @@ func (d *indirectIndex) KeyCount() int { return len(d.offsets) } -func (d *indirectIndex) Delete(key string) { +func (d *indirectIndex) Delete(keys []string) { d.mu.Lock() defer d.mu.Unlock() + lookup := map[string]struct{}{} + for _, k := range keys { + lookup[k] = struct{}{} + } + var offsets []int32 for _, offset := range d.offsets { _, indexKey, _ := readKey(d.b[offset:]) - if key == indexKey { + + if _, ok := lookup[indexKey]; ok { continue } offsets = append(offsets, int32(offset)) @@ -934,9 +942,7 @@ func (t *TSMReader) applyTombstones() error { } // Update our index - for _, tombstone := range tombstones { - t.index.Delete(tombstone) - } + t.index.Delete(tombstones) return nil } @@ -1015,12 +1021,12 @@ func (t *TSMReader) ContainsValue(key string, ts time.Time) bool { return t.index.ContainsValue(key, ts) } -func (t *TSMReader) Delete(key string) error { - if err := t.tombstoner.Add(key); err != nil { +func (t *TSMReader) Delete(keys []string) error { + if err := t.tombstoner.Add(keys); err != nil { return err } - t.index.Delete(key) + t.index.Delete(keys) return nil } diff --git a/tsdb/engine/tsm1/data_file_test.go b/tsdb/engine/tsm1/data_file_test.go index 3f23ec7327..4489ce5c63 100644 --- a/tsdb/engine/tsm1/data_file_test.go +++ b/tsdb/engine/tsm1/data_file_test.go @@ -794,7 +794,7 @@ func TestTSMReader_MMAP_Tombstone(t *testing.T) { t.Fatalf("unexpected error created reader: %v", err) } - if err := r.Delete("mem"); err != nil { + if err := r.Delete([]string{"mem"}); err != nil { t.Fatalf("unexpected error deleting: %v", err) } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index 210cc1baa2..a2ae4088f1 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -260,18 +260,22 @@ func (e *DevEngine) DeleteSeries(seriesKeys []string) error { e.mu.RLock() defer e.mu.RUnlock() + // keyMap is used to see if a given key should be deleted. seriesKey + // are the measurement + tagset (minus separate & field) keyMap := map[string]struct{}{} for _, k := range seriesKeys { keyMap[k] = struct{}{} } + var deleteKeys []string // go through the keys in the file store for _, k := range e.FileStore.Keys() { seriesKey, _ := seriesAndFieldFromCompositeKey(k) if _, ok := keyMap[seriesKey]; ok { - e.FileStore.Delete(k) + deleteKeys = append(deleteKeys, k) } } + e.FileStore.Delete(deleteKeys) // find the keys in the cache and remove them walKeys := make([]string, 0) diff --git a/tsdb/engine/tsm1/file_store.go b/tsdb/engine/tsm1/file_store.go index a65e3bd3a8..08e82eb666 100644 --- a/tsdb/engine/tsm1/file_store.go +++ b/tsdb/engine/tsm1/file_store.go @@ -47,8 +47,8 @@ type TSMFile interface { // an error is returned. Type(key string) (byte, error) - // Delete removes the key from the set of keys available in this file. - Delete(key string) error + // Delete removes the keys from the set of keys available in this file. + Delete(keys []string) error // HasTombstones returns true if file contains values that have been deleted. HasTombstones() bool @@ -186,17 +186,15 @@ func (f *FileStore) Type(key string) (byte, error) { return 0, fmt.Errorf("unknown type for %v", key) } -func (f *FileStore) Delete(key string) error { +func (f *FileStore) Delete(keys []string) error { f.mu.Lock() defer f.mu.Unlock() f.lastModified = time.Now() for _, file := range f.files { - if file.Contains(key) { - if err := file.Delete(key); err != nil { - return err - } + if err := file.Delete(keys); err != nil { + return err } } return nil diff --git a/tsdb/engine/tsm1/file_store_test.go b/tsdb/engine/tsm1/file_store_test.go index 7f0586a14f..eb81bab8f1 100644 --- a/tsdb/engine/tsm1/file_store_test.go +++ b/tsdb/engine/tsm1/file_store_test.go @@ -537,7 +537,7 @@ func TestFileStore_Open_Deleted(t *testing.T) { t.Fatalf("file count mismatch: got %v, exp %v", got, exp) } - if err := fs.Delete("cpu,host=server2!~#!value"); err != nil { + if err := fs.Delete([]string{"cpu,host=server2!~#!value"}); err != nil { fatal(t, "deleting", err) } @@ -574,7 +574,7 @@ func TestFileStore_Delete(t *testing.T) { t.Fatalf("key length mismatch: got %v, exp %v", got, exp) } - if err := fs.Delete("cpu,host=server2!~#!value"); err != nil { + if err := fs.Delete([]string{"cpu,host=server2!~#!value"}); err != nil { fatal(t, "deleting", err) } diff --git a/tsdb/engine/tsm1/tombstone.go b/tsdb/engine/tsm1/tombstone.go index 4a62d93c85..f775c60b2d 100644 --- a/tsdb/engine/tsm1/tombstone.go +++ b/tsdb/engine/tsm1/tombstone.go @@ -16,7 +16,7 @@ type Tombstoner struct { Path string } -func (t *Tombstoner) Add(key string) error { +func (t *Tombstoner) Add(keys []string) error { t.mu.Lock() defer t.mu.Unlock() @@ -31,7 +31,9 @@ func (t *Tombstoner) Add(key string) error { return nil } - tombstones = append(tombstones, key) + for _, k := range keys { + tombstones = append(tombstones, k) + } return t.writeTombstone(tombstones) } diff --git a/tsdb/engine/tsm1/tombstone_test.go b/tsdb/engine/tsm1/tombstone_test.go index f1668c64a6..d5cd1244d8 100644 --- a/tsdb/engine/tsm1/tombstone_test.go +++ b/tsdb/engine/tsm1/tombstone_test.go @@ -23,7 +23,7 @@ func TestTombstoner_Add(t *testing.T) { t.Fatalf("length mismatch: got %v, exp %v", got, exp) } - ts.Add("foo") + ts.Add([]string{"foo"}) entries, err = ts.ReadAll() if err != nil { @@ -61,7 +61,7 @@ func TestTombstoner_Delete(t *testing.T) { f := MustTempFile(dir) ts := &tsm1.Tombstoner{Path: f.Name()} - ts.Add("foo") + ts.Add([]string{"foo"}) // Use a new Tombstoner to verify values are persisted ts = &tsm1.Tombstoner{Path: f.Name()} diff --git a/tsdb/engine/tsm1/wal.go b/tsdb/engine/tsm1/wal.go index b6ab6f94be..29c4a7e0a6 100644 --- a/tsdb/engine/tsm1/wal.go +++ b/tsdb/engine/tsm1/wal.go @@ -519,8 +519,8 @@ func (w *DeleteWALEntry) UnmarshalBinary(b []byte) error { func (w *DeleteWALEntry) Encode(dst []byte) ([]byte, error) { var n int for _, k := range w.Keys { - if len(dst)+1 > len(dst)-n { - grow := make([]byte, defaultBufLen) + if len(dst[:n])+1+len(k) > len(dst) { + grow := make([]byte, len(dst)*2) dst = append(dst, grow...) }