diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go index f7b5ac04db..f2308f651c 100644 --- a/tsdb/engine/tsm1/cache.go +++ b/tsdb/engine/tsm1/cache.go @@ -107,7 +107,7 @@ func NewCache(maxSize uint64) *Cache { } } -// WriteKey writes the set of values for the key to the cache. It associates the data with +// Write writes the set of values for the key to the cache. It associates the data with // the given checkpoint. This function is goroutine-safe. // // TODO: This function is a significant potential bottleneck. It is possible that keys could @@ -117,24 +117,35 @@ func NewCache(maxSize uint64) *Cache { func (c *Cache) Write(key string, values []Value, checkpoint uint64) error { c.mu.Lock() defer c.mu.Unlock() - if checkpoint < c.checkpoint { - return ErrCacheInvalidCheckpoint - } // Enough room in the cache? if c.size+uint64(Values(values).Size()) > c.maxSize { return ErrCacheMemoryExceeded } + return c.write(key, values, checkpoint) +} - e, ok := c.store[key] - if !ok { - e = newEntries() - c.store[key] = e +// WriteMulti writes the map of keys and associated values to the cache. It associates the +// data with the given checkpoint. This function is goroutine-safe. +func (c *Cache) WriteMulti(values map[string][]Value, checkpoint uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + + totalSz := 0 + for _, v := range values { + totalSz += Values(v).Size() } - oldSize := e.size - e.add(checkpoint, values) - c.size += e.size - oldSize + // Enough room in the cache? + if c.size+uint64(totalSz) > c.maxSize { + return ErrCacheMemoryExceeded + } + + for k, v := range values { + if err := c.write(k, v, checkpoint); err != nil { + return err + } + } return nil } @@ -219,3 +230,23 @@ func (c *Cache) evict() { } } } + +// write writes the set of values for the key to the cache. It associates the data with +// the given checkpoint. This function assumes the lock has been taken and does not +// enforce the cache size limits. +func (c *Cache) write(key string, values []Value, checkpoint uint64) error { + if checkpoint < c.checkpoint { + return ErrCacheInvalidCheckpoint + } + + e, ok := c.store[key] + if !ok { + e = newEntries() + c.store[key] = e + } + oldSize := e.size + e.add(checkpoint, values) + c.size += e.size - oldSize + + return nil +} diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go index bf57dd16fe..7d845ba777 100644 --- a/tsdb/engine/tsm1/cache_test.go +++ b/tsdb/engine/tsm1/cache_test.go @@ -142,6 +142,27 @@ func Test_CacheWrite(t *testing.T) { } } +func Test_CacheWriteMulti(t *testing.T) { + v0 := NewValue(time.Unix(1, 0).UTC(), 1.0) + v1 := NewValue(time.Unix(2, 0).UTC(), 2.0) + v2 := NewValue(time.Unix(3, 0).UTC(), 3.0) + values := Values{v0, v1, v2} + valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) + + c := MustNewCache(3 * valuesSize) + + if err := c.WriteMulti(map[string][]Value{"foo": values, "bar": values}, 100); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + if n := c.Size(); n != 2*valuesSize { + t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n) + } + + if exp, keys := []string{"bar", "foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) { + t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) + } +} + func Test_CacheValues(t *testing.T) { v0 := NewValue(time.Unix(1, 0).UTC(), 0.0) v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)