perf(storage): reduce allocations when deleting from cache
When deleting from the cache, each cache key must be checked to determine if it matches the prefix we're deleting. Since the keys are stored as strings in the cache (map keys) there were a lot of allocations happening because `applySerial` expects `[]byte` keys. It's beneficial to reduce allocations by refacting `applySerial` to work on strings. Whilst some allocations now have to happen the other way (string -> []byte), they only happen if we actually need to delete the key from the cache. Most of the keys don't get deleted so it's better doing it this way. Performance on the benchmark from the previous commit improved by ~40-50%. name old time/op new time/op delta Engine_DeletePrefixRange_Cache/exists-24 102ms ±11% 59ms ± 3% -41.95% (p=0.000 n=10+8) Engine_DeletePrefixRange_Cache/not_exists-24 97.1ms ± 4% 45.0ms ± 1% -53.66% (p=0.000 n=10+10) name old alloc/op new alloc/op delta Engine_DeletePrefixRange_Cache/exists-24 25.5MB ± 1% 3.1MB ± 2% -87.83% (p=0.000 n=10+10) Engine_DeletePrefixRange_Cache/not_exists-24 23.9MB ± 1% 0.1MB ±86% -99.65% (p=0.000 n=10+10) name old allocs/op new allocs/op delta Engine_DeletePrefixRange_Cache/exists-24 305k ± 1% 28k ± 1% -90.77% (p=0.000 n=10+10) Engine_DeletePrefixRange_Cache/not_exists-24 299k ± 1% 1k ±63% -99.74% (p=0.000 n=9+10) Raw benchmarks on a 24T/32GB/NVME machine are as follows: goos: linux goarch: amd64 pkg: github.com/influxdata/influxdb/tsdb/tsm1 BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 50379720 ns/op 3054106 B/op 27859 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 57326032 ns/op 3124764 B/op 28217 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 58943855 ns/op 3162146 B/op 28527 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 60565115 ns/op 3138811 B/op 28176 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 200 59775969 ns/op 3087910 B/op 27921 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 59530451 ns/op 3120986 B/op 28207 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 59185532 ns/op 3113066 B/op 28302 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 59295867 ns/op 3100832 B/op 28108 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 300 59599776 ns/op 3100686 B/op 28113 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/exists-24 200 62065907 ns/op 3048527 B/op 27879 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 44979062 ns/op 123026 B/op 1244 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 44733344 ns/op 52650 B/op 479 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 44534180 ns/op 35119 B/op 398 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 45179881 ns/op 105256 B/op 706 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 44918964 ns/op 47426 B/op 621 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 45000465 ns/op 63164 B/op 564 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 45332999 ns/op 117008 B/op 1146 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 45652342 ns/op 66221 B/op 616 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 45083957 ns/op 154354 B/op 1143 allocs/op BenchmarkEngine_DeletePrefixRange_Cache/not_exists-24 300 44560228 ns/op 65024 B/op 724 allocs/op PASS ok github.com/influxdata/influxdb/tsdb/tsm1 1690.583spull/14892/head
parent
eba4dec7e6
commit
2e5ebbe251
|
@ -1,10 +1,10 @@
|
|||
package tsm1
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -388,7 +388,7 @@ func (c *Cache) Values(key []byte) Values {
|
|||
// DeleteBucketRange removes values for all keys containing points
|
||||
// with timestamps between min and max contained in the bucket identified
|
||||
// by name from the cache.
|
||||
func (c *Cache) DeleteBucketRange(ctx context.Context, name []byte, min, max int64, pred Predicate) {
|
||||
func (c *Cache) DeleteBucketRange(ctx context.Context, name string, min, max int64, pred Predicate) {
|
||||
span, _ := tracing.StartSpanFromContext(ctx)
|
||||
defer span.Finish()
|
||||
|
||||
|
@ -396,15 +396,17 @@ func (c *Cache) DeleteBucketRange(ctx context.Context, name []byte, min, max int
|
|||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
var toDelete [][]byte
|
||||
var toDelete []string
|
||||
var total uint64
|
||||
|
||||
// applySerial only errors if the closure returns an error.
|
||||
_ = c.store.applySerial(func(k []byte, e *entry) error {
|
||||
if !bytes.HasPrefix(k, name) {
|
||||
_ = c.store.applySerial(func(k string, e *entry) error {
|
||||
if !strings.HasPrefix(k, name) {
|
||||
return nil
|
||||
}
|
||||
if pred != nil && !pred.Matches(k) {
|
||||
// TODO(edd): either use an unsafe conversion to []byte, or add a MatchesString
|
||||
// method to tsm1.Predicate.
|
||||
if pred != nil && !pred.Matches([]byte(k)) {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -430,7 +432,9 @@ func (c *Cache) DeleteBucketRange(ctx context.Context, name []byte, min, max int
|
|||
|
||||
for _, k := range toDelete {
|
||||
total += uint64(len(k))
|
||||
c.store.remove(k)
|
||||
// TODO(edd): either use unsafe conversion to []byte or add a removeString
|
||||
// method.
|
||||
c.store.remove([]byte(k))
|
||||
}
|
||||
|
||||
c.tracker.DecCacheSize(total)
|
||||
|
@ -461,7 +465,7 @@ func (c *Cache) values(key []byte) Values {
|
|||
// ApplyEntryFn applies the function f to each entry in the Cache.
|
||||
// ApplyEntryFn calls f on each entry in turn, within the same goroutine.
|
||||
// It is safe for use by multiple goroutines.
|
||||
func (c *Cache) ApplyEntryFn(f func(key []byte, entry *entry) error) error {
|
||||
func (c *Cache) ApplyEntryFn(f func(key string, entry *entry) error) error {
|
||||
c.mu.RLock()
|
||||
store := c.store
|
||||
c.mu.RUnlock()
|
||||
|
@ -503,7 +507,7 @@ func (cl *CacheLoader) Load(cache *Cache) error {
|
|||
encoded := tsdb.EncodeName(en.OrgID, en.BucketID)
|
||||
name := models.EscapeMeasurement(encoded[:])
|
||||
|
||||
cache.DeleteBucketRange(context.Background(), name, en.Min, en.Max, pred)
|
||||
cache.DeleteBucketRange(context.Background(), string(name), en.Min, en.Max, pred)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ func TestCache_Cache_DeleteBucketRange(t *testing.T) {
|
|||
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
|
||||
}
|
||||
|
||||
c.DeleteBucketRange(context.Background(), []byte("bar"), 2, math.MaxInt64, nil)
|
||||
c.DeleteBucketRange(context.Background(), "bar", 2, math.MaxInt64, nil)
|
||||
|
||||
if exp, keys := [][]byte{[]byte("bar"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) {
|
||||
t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys)
|
||||
|
@ -216,7 +216,7 @@ func TestCache_DeleteBucketRange_NoValues(t *testing.T) {
|
|||
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
|
||||
}
|
||||
|
||||
c.DeleteBucketRange(context.Background(), []byte("foo"), math.MinInt64, math.MaxInt64, nil)
|
||||
c.DeleteBucketRange(context.Background(), "foo", math.MinInt64, math.MaxInt64, nil)
|
||||
|
||||
if exp, keys := 0, len(c.Keys()); !reflect.DeepEqual(keys, exp) {
|
||||
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
|
||||
|
@ -251,7 +251,7 @@ func TestCache_DeleteBucketRange_NotSorted(t *testing.T) {
|
|||
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
|
||||
}
|
||||
|
||||
c.DeleteBucketRange(context.Background(), []byte("foo"), 1, 3, nil)
|
||||
c.DeleteBucketRange(context.Background(), "foo", 1, 3, nil)
|
||||
|
||||
if exp, keys := 0, len(c.Keys()); !reflect.DeepEqual(keys, exp) {
|
||||
t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys)
|
||||
|
@ -269,7 +269,7 @@ func TestCache_DeleteBucketRange_NotSorted(t *testing.T) {
|
|||
func TestCache_DeleteBucketRange_NonExistent(t *testing.T) {
|
||||
c := NewCache(1024)
|
||||
|
||||
c.DeleteBucketRange(context.Background(), []byte("bar"), math.MinInt64, math.MaxInt64, nil)
|
||||
c.DeleteBucketRange(context.Background(), "bar", math.MinInt64, math.MaxInt64, nil)
|
||||
|
||||
if got, exp := c.Size(), uint64(0); exp != got {
|
||||
t.Fatalf("cache size incorrect exp %d, got %d", exp, got)
|
||||
|
@ -301,7 +301,7 @@ func TestCache_Cache_DeleteBucketRange_WithPredicate(t *testing.T) {
|
|||
t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys)
|
||||
}
|
||||
|
||||
c.DeleteBucketRange(context.Background(), []byte("f"), 2, math.MaxInt64, stringPredicate("fee"))
|
||||
c.DeleteBucketRange(context.Background(), "f", 2, math.MaxInt64, stringPredicate("fee"))
|
||||
|
||||
if exp, keys := [][]byte{[]byte("fee"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) {
|
||||
t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"math"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/influxdata/influxdb/kit/tracing"
|
||||
|
@ -92,18 +93,21 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
|||
span.LogKV("cache_size", e.Cache.Size())
|
||||
var keysChecked int // For tracing information.
|
||||
// ApplySerialEntryFn cannot return an error in this invocation.
|
||||
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
||||
nameStr := string(name)
|
||||
_ = e.Cache.ApplyEntryFn(func(k string, _ *entry) error {
|
||||
keysChecked++
|
||||
if !bytes.HasPrefix(k, name) {
|
||||
if !strings.HasPrefix(k, nameStr) {
|
||||
return nil
|
||||
}
|
||||
if pred != nil && !pred.Matches(k) {
|
||||
// TODO(edd): either use an unsafe conversion to []byte, or add a MatchesString
|
||||
// method to tsm1.Predicate.
|
||||
if pred != nil && !pred.Matches([]byte(k)) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// we have to double check every key in the cache because maybe
|
||||
// it exists in the index but not yet on disk.
|
||||
possiblyDead.keys[string(k)] = struct{}{}
|
||||
possiblyDead.keys[k] = struct{}{}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
@ -111,7 +115,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
|||
span.Finish()
|
||||
|
||||
// Delete from the cache (traced in cache).
|
||||
e.Cache.DeleteBucketRange(ctx, name, min, max, pred)
|
||||
e.Cache.DeleteBucketRange(ctx, nameStr, min, max, pred)
|
||||
|
||||
// Now that all of the data is purged, we need to find if some keys are fully deleted
|
||||
// and if so, remove them from the index.
|
||||
|
@ -160,16 +164,18 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma
|
|||
span.LogKV("cache_size", e.Cache.Size())
|
||||
keysChecked = 0
|
||||
// ApplySerialEntryFn cannot return an error in this invocation.
|
||||
_ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error {
|
||||
_ = e.Cache.ApplyEntryFn(func(k string, _ *entry) error {
|
||||
keysChecked++
|
||||
if !bytes.HasPrefix(k, name) {
|
||||
if !strings.HasPrefix(k, nameStr) {
|
||||
return nil
|
||||
}
|
||||
if pred != nil && !pred.Matches(k) {
|
||||
// TODO(edd): either use an unsafe conversion to []byte, or add a MatchesString
|
||||
// method to tsm1.Predicate.
|
||||
if pred != nil && !pred.Matches([]byte(k)) {
|
||||
return nil
|
||||
}
|
||||
|
||||
delete(possiblyDead.keys, string(k))
|
||||
delete(possiblyDead.keys, k)
|
||||
return nil
|
||||
})
|
||||
span.LogKV("cache_cardinality", keysChecked)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
|
||||
"github.com/influxdata/influxdb"
|
||||
"github.com/influxdata/influxdb/models"
|
||||
|
@ -95,12 +96,14 @@ func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgBucket, tagKeyByte
|
|||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
_ = e.Cache.ApplyEntryFn(func(sfkey []byte, entry *entry) error {
|
||||
if !bytes.HasPrefix(sfkey, prefix) {
|
||||
prefixStr := string(prefix)
|
||||
_ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error {
|
||||
if !strings.HasPrefix(sfkey, prefixStr) {
|
||||
return nil
|
||||
}
|
||||
|
||||
key, _ := SeriesAndFieldFromCompositeKey(sfkey)
|
||||
// TODO(edd): consider the []byte() conversion here.
|
||||
key, _ := SeriesAndFieldFromCompositeKey([]byte(sfkey))
|
||||
tags = models.ParseTagsWithTags(key, tags[:0])
|
||||
curVal := tags.Get(tagKeyBytes)
|
||||
if len(curVal) == 0 {
|
||||
|
@ -353,12 +356,13 @@ func (e *Engine) tagKeysNoPredicate(ctx context.Context, orgBucket []byte, start
|
|||
|
||||
// With performance in mind, we explicitly do not check the context
|
||||
// while scanning the entries in the cache.
|
||||
_ = e.Cache.ApplyEntryFn(func(sfkey []byte, entry *entry) error {
|
||||
if !bytes.HasPrefix(sfkey, prefix) {
|
||||
_ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error {
|
||||
if !strings.HasPrefix(sfkey, string(prefix)) {
|
||||
return nil
|
||||
}
|
||||
|
||||
key, _ := SeriesAndFieldFromCompositeKey(sfkey)
|
||||
// TODO(edd): consider []byte conversion here.
|
||||
key, _ := SeriesAndFieldFromCompositeKey([]byte(sfkey))
|
||||
tags = models.ParseTagsWithTags(key, tags[:0])
|
||||
if keyset.IsSupersetKeys(tags) {
|
||||
return nil
|
||||
|
|
|
@ -165,14 +165,14 @@ func (r *ring) apply(f func([]byte, *entry) error) error {
|
|||
// applySerial is similar to apply, but invokes f on each partition in the same
|
||||
// goroutine.
|
||||
// apply is safe for use by multiple goroutines.
|
||||
func (r *ring) applySerial(f func([]byte, *entry) error) error {
|
||||
func (r *ring) applySerial(f func(string, *entry) error) error {
|
||||
for _, p := range r.partitions {
|
||||
p.mu.RLock()
|
||||
for k, e := range p.store {
|
||||
if e.count() == 0 {
|
||||
continue
|
||||
}
|
||||
if err := f([]byte(k), e); err != nil {
|
||||
if err := f(k, e); err != nil {
|
||||
p.mu.RUnlock()
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue