diff --git a/tsdb/tsm1/cache.go b/tsdb/tsm1/cache.go index 9e63bf83a5..8b67ee84b4 100644 --- a/tsdb/tsm1/cache.go +++ b/tsdb/tsm1/cache.go @@ -1,10 +1,10 @@ package tsm1 import ( - "bytes" "context" "fmt" "math" + "strings" "sync" "sync/atomic" "time" @@ -388,7 +388,7 @@ func (c *Cache) Values(key []byte) Values { // DeleteBucketRange removes values for all keys containing points // with timestamps between min and max contained in the bucket identified // by name from the cache. -func (c *Cache) DeleteBucketRange(ctx context.Context, name []byte, min, max int64, pred Predicate) { +func (c *Cache) DeleteBucketRange(ctx context.Context, name string, min, max int64, pred Predicate) { span, _ := tracing.StartSpanFromContext(ctx) defer span.Finish() @@ -396,15 +396,17 @@ func (c *Cache) DeleteBucketRange(ctx context.Context, name []byte, min, max int c.mu.Lock() defer c.mu.Unlock() - var toDelete [][]byte + var toDelete []string var total uint64 // applySerial only errors if the closure returns an error. - _ = c.store.applySerial(func(k []byte, e *entry) error { - if !bytes.HasPrefix(k, name) { + _ = c.store.applySerial(func(k string, e *entry) error { + if !strings.HasPrefix(k, name) { return nil } - if pred != nil && !pred.Matches(k) { + // TODO(edd): either use an unsafe conversion to []byte, or add a MatchesString + // method to tsm1.Predicate. + if pred != nil && !pred.Matches([]byte(k)) { return nil } @@ -430,7 +432,9 @@ func (c *Cache) DeleteBucketRange(ctx context.Context, name []byte, min, max int for _, k := range toDelete { total += uint64(len(k)) - c.store.remove(k) + // TODO(edd): either use unsafe conversion to []byte or add a removeString + // method. + c.store.remove([]byte(k)) } c.tracker.DecCacheSize(total) @@ -461,7 +465,7 @@ func (c *Cache) values(key []byte) Values { // ApplyEntryFn applies the function f to each entry in the Cache. // ApplyEntryFn calls f on each entry in turn, within the same goroutine. // It is safe for use by multiple goroutines. -func (c *Cache) ApplyEntryFn(f func(key []byte, entry *entry) error) error { +func (c *Cache) ApplyEntryFn(f func(key string, entry *entry) error) error { c.mu.RLock() store := c.store c.mu.RUnlock() @@ -503,7 +507,7 @@ func (cl *CacheLoader) Load(cache *Cache) error { encoded := tsdb.EncodeName(en.OrgID, en.BucketID) name := models.EscapeMeasurement(encoded[:]) - cache.DeleteBucketRange(context.Background(), name, en.Min, en.Max, pred) + cache.DeleteBucketRange(context.Background(), string(name), en.Min, en.Max, pred) return nil } diff --git a/tsdb/tsm1/cache_test.go b/tsdb/tsm1/cache_test.go index be4c045605..dda161c2ab 100644 --- a/tsdb/tsm1/cache_test.go +++ b/tsdb/tsm1/cache_test.go @@ -177,7 +177,7 @@ func TestCache_Cache_DeleteBucketRange(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange(context.Background(), []byte("bar"), 2, math.MaxInt64, nil) + c.DeleteBucketRange(context.Background(), "bar", 2, math.MaxInt64, nil) if exp, keys := [][]byte{[]byte("bar"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys) @@ -216,7 +216,7 @@ func TestCache_DeleteBucketRange_NoValues(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange(context.Background(), []byte("foo"), math.MinInt64, math.MaxInt64, nil) + c.DeleteBucketRange(context.Background(), "foo", math.MinInt64, math.MaxInt64, nil) if exp, keys := 0, len(c.Keys()); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) @@ -251,7 +251,7 @@ func TestCache_DeleteBucketRange_NotSorted(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange(context.Background(), []byte("foo"), 1, 3, nil) + c.DeleteBucketRange(context.Background(), "foo", 1, 3, nil) if exp, keys := 0, len(c.Keys()); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys) @@ -269,7 +269,7 @@ func TestCache_DeleteBucketRange_NotSorted(t *testing.T) { func TestCache_DeleteBucketRange_NonExistent(t *testing.T) { c := NewCache(1024) - c.DeleteBucketRange(context.Background(), []byte("bar"), math.MinInt64, math.MaxInt64, nil) + c.DeleteBucketRange(context.Background(), "bar", math.MinInt64, math.MaxInt64, nil) if got, exp := c.Size(), uint64(0); exp != got { t.Fatalf("cache size incorrect exp %d, got %d", exp, got) @@ -301,7 +301,7 @@ func TestCache_Cache_DeleteBucketRange_WithPredicate(t *testing.T) { t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) } - c.DeleteBucketRange(context.Background(), []byte("f"), 2, math.MaxInt64, stringPredicate("fee")) + c.DeleteBucketRange(context.Background(), "f", 2, math.MaxInt64, stringPredicate("fee")) if exp, keys := [][]byte{[]byte("fee"), []byte("foo")}, c.Keys(); !reflect.DeepEqual(keys, exp) { t.Fatalf("cache keys incorrect after delete, exp %v, got %v", exp, keys) diff --git a/tsdb/tsm1/engine_delete_prefix.go b/tsdb/tsm1/engine_delete_prefix.go index eeb7aecc2f..4ea512d777 100644 --- a/tsdb/tsm1/engine_delete_prefix.go +++ b/tsdb/tsm1/engine_delete_prefix.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "math" + "strings" "sync" "github.com/influxdata/influxdb/kit/tracing" @@ -92,18 +93,21 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma span.LogKV("cache_size", e.Cache.Size()) var keysChecked int // For tracing information. // ApplySerialEntryFn cannot return an error in this invocation. - _ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { + nameStr := string(name) + _ = e.Cache.ApplyEntryFn(func(k string, _ *entry) error { keysChecked++ - if !bytes.HasPrefix(k, name) { + if !strings.HasPrefix(k, nameStr) { return nil } - if pred != nil && !pred.Matches(k) { + // TODO(edd): either use an unsafe conversion to []byte, or add a MatchesString + // method to tsm1.Predicate. + if pred != nil && !pred.Matches([]byte(k)) { return nil } // we have to double check every key in the cache because maybe // it exists in the index but not yet on disk. - possiblyDead.keys[string(k)] = struct{}{} + possiblyDead.keys[k] = struct{}{} return nil }) @@ -111,7 +115,7 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma span.Finish() // Delete from the cache (traced in cache). - e.Cache.DeleteBucketRange(ctx, name, min, max, pred) + e.Cache.DeleteBucketRange(ctx, nameStr, min, max, pred) // Now that all of the data is purged, we need to find if some keys are fully deleted // and if so, remove them from the index. @@ -160,16 +164,18 @@ func (e *Engine) DeletePrefixRange(rootCtx context.Context, name []byte, min, ma span.LogKV("cache_size", e.Cache.Size()) keysChecked = 0 // ApplySerialEntryFn cannot return an error in this invocation. - _ = e.Cache.ApplyEntryFn(func(k []byte, _ *entry) error { + _ = e.Cache.ApplyEntryFn(func(k string, _ *entry) error { keysChecked++ - if !bytes.HasPrefix(k, name) { + if !strings.HasPrefix(k, nameStr) { return nil } - if pred != nil && !pred.Matches(k) { + // TODO(edd): either use an unsafe conversion to []byte, or add a MatchesString + // method to tsm1.Predicate. + if pred != nil && !pred.Matches([]byte(k)) { return nil } - delete(possiblyDead.keys, string(k)) + delete(possiblyDead.keys, k) return nil }) span.LogKV("cache_cardinality", keysChecked) diff --git a/tsdb/tsm1/engine_schema.go b/tsdb/tsm1/engine_schema.go index afcbfeb28f..f125cd7110 100644 --- a/tsdb/tsm1/engine_schema.go +++ b/tsdb/tsm1/engine_schema.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "sort" + "strings" "github.com/influxdata/influxdb" "github.com/influxdata/influxdb/models" @@ -95,12 +96,14 @@ func (e *Engine) tagValuesNoPredicate(ctx context.Context, orgBucket, tagKeyByte // With performance in mind, we explicitly do not check the context // while scanning the entries in the cache. - _ = e.Cache.ApplyEntryFn(func(sfkey []byte, entry *entry) error { - if !bytes.HasPrefix(sfkey, prefix) { + prefixStr := string(prefix) + _ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error { + if !strings.HasPrefix(sfkey, prefixStr) { return nil } - key, _ := SeriesAndFieldFromCompositeKey(sfkey) + // TODO(edd): consider the []byte() conversion here. + key, _ := SeriesAndFieldFromCompositeKey([]byte(sfkey)) tags = models.ParseTagsWithTags(key, tags[:0]) curVal := tags.Get(tagKeyBytes) if len(curVal) == 0 { @@ -353,12 +356,13 @@ func (e *Engine) tagKeysNoPredicate(ctx context.Context, orgBucket []byte, start // With performance in mind, we explicitly do not check the context // while scanning the entries in the cache. - _ = e.Cache.ApplyEntryFn(func(sfkey []byte, entry *entry) error { - if !bytes.HasPrefix(sfkey, prefix) { + _ = e.Cache.ApplyEntryFn(func(sfkey string, entry *entry) error { + if !strings.HasPrefix(sfkey, string(prefix)) { return nil } - key, _ := SeriesAndFieldFromCompositeKey(sfkey) + // TODO(edd): consider []byte conversion here. + key, _ := SeriesAndFieldFromCompositeKey([]byte(sfkey)) tags = models.ParseTagsWithTags(key, tags[:0]) if keyset.IsSupersetKeys(tags) { return nil diff --git a/tsdb/tsm1/ring.go b/tsdb/tsm1/ring.go index a95c74f795..0ec2ee3cc7 100644 --- a/tsdb/tsm1/ring.go +++ b/tsdb/tsm1/ring.go @@ -165,14 +165,14 @@ func (r *ring) apply(f func([]byte, *entry) error) error { // applySerial is similar to apply, but invokes f on each partition in the same // goroutine. // apply is safe for use by multiple goroutines. -func (r *ring) applySerial(f func([]byte, *entry) error) error { +func (r *ring) applySerial(f func(string, *entry) error) error { for _, p := range r.partitions { p.mu.RLock() for k, e := range p.store { if e.count() == 0 { continue } - if err := f([]byte(k), e); err != nil { + if err := f(k, e); err != nil { p.mu.RUnlock() return err }