Move to simpler cache

This cache simply evicts as much as possible whenever a checkpoint is
set.
pull/4863/head
Philip O'Toole 2015-11-20 19:28:16 -08:00
parent 298b149836
commit 5b573b9248
3 changed files with 129 additions and 333 deletions

View File

@ -123,11 +123,9 @@ Currently, we are moving towards a Single WAL implemention.
# Cache
The primary purpose of the cache is so that data in the WAL is queryable. The client code writes values to the cache, associating a key and checkpoint with each write. The checkpoint must be a monotonically increasing value, but does not have to increase with every write operation. The cache in turn organises all writes first by key (the cache places no constraints on the key as long as it is non-empty) and then by checkpoint. At a time of its choosing, the client also notifies the cache when previously added data has been drained from the WAL. This allows the cache to evict entries associated with all checkpoints up to and including that checkpoint. Specifically when the cache needs to evict data it first chooses the least-recently-used key ("used" is defined as a write or query of that key) and then all data up-to the checkpoint associated with that key is deleted from memory.
The purpose of the cache is so that data in the WAL is queryable. The client code writes values to the cache, associating a key and checkpoint with each write. The checkpoint must be a monotonically increasing value, but does not have to increase with every write operation. The cache in turn organises all writes first by key (the cache places no constraints on the key as long as it is non-empty) and then by checkpoint. At a time of its choosing, the client also notifies the cache when previously added data has been drained from the WAL. This allows the cache to evict entries associated with all checkpoints up to and including that checkpoint.
The purpose of checkpointing is to allow the cache to keep recently written data in memory, even if the client code has indicated that it has been drained from the WAL. If a query can be satified entirely by accessing the cache, the engine can return data for the query much quicker than if it accessed the disk. This is the secondary purpose of the cache.
The cache tracks it size on a "point-calculated" basis. "Point-calculated" means that the RAM storage footprint for a point in the determined by calling its `Size()` method. While this does not correspond directly to the actual RAM footprint in the cache, the two values are sufficiently correlated for the purpose of controlling RAM.
The cache tracks its size on a "point-calculated" basis. "Point-calculated" means that the RAM storage footprint for a point in the determined by calling its `Size()` method. While this does not correspond directly to the actual RAM footprint in the cache, the two values are sufficiently correlated for the purpose of controlling RAM.
# TSM File Index

View File

@ -1,12 +1,9 @@
package tsm1
import (
"container/list"
"fmt"
"sort"
"sync"
"github.com/influxdb/influxdb/tsdb"
)
var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
@ -18,70 +15,10 @@ func (a checkpoints) Len() int { return len(a) }
func (a checkpoints) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a checkpoints) Less(i, j int) bool { return a[i] < a[j] }
// lru orders string keys from least-recently used to most-recently used. It is not
// goroutine safe.
type lru struct {
list *list.List
elements map[string]*list.Element
}
// newLRU returns an initialized LRU.
func newLRU() *lru {
return &lru{
list: list.New(),
elements: make(map[string]*list.Element),
}
}
// MoveToFront marks key as the most recently used key.
func (l *lru) MoveToFront(key string) {
e, ok := l.elements[key]
if !ok {
l.elements[key] = l.list.PushFront(key)
return
}
l.list.MoveToFront(e)
}
// Remove removes key from the LRU. If the key does not exist nothing happens.
func (l *lru) Remove(key string) {
if _, ok := l.elements[key]; ok {
l.list.Remove(l.elements[key])
delete(l.elements, key)
}
}
// Front returns the most-recently used key. If there is no such key, then "" is returned.
func (l *lru) Front() string {
e := l.list.Front()
if e == nil {
return ""
}
return e.Value.(string)
}
// Back returns the least-recently used key. If there is no such key, then "" is returned.
func (l *lru) Back() string {
e := l.list.Back()
if e == nil {
return ""
}
return e.Value.(string)
}
// DoFromLeast iterates through the LRU, from least-recently used to most-recently used,
// calling the given function with each key.
func (l *lru) DoFromLeast(f func(key string)) {
for e := l.list.Back(); e != nil; e = e.Prev() {
f(e.Value.(string))
}
}
// entry is the set of all values received for a given key.
// entry is a set of values and some metadata.
type entry struct {
values Values // All stored values.
unsorted bool // Whether the data requires sorting and deduping before query.
size uint64 // Total Number of point-calculated bytes stored by this entry.
values Values // All stored values.
size uint64 // Total Number of point-calculated bytes stored by this entry.
}
// newEntry returns a new instance of entry.
@ -91,109 +28,82 @@ func newEntry() *entry {
// add adds the given values to the entry.
func (e *entry) add(values []Value) {
for _, v := range values {
// Only mark unsorted if not already marked.
if !e.unsorted && len(e.values) > 1 {
e.unsorted = e.values[len(e.values)-1].Time().UnixNano() >= v.Time().UnixNano()
}
e.values = append(e.values, v)
e.size += uint64(v.Size())
}
}
// dedupe remove duplicate entries for the same timestamp and sorts the entries.
func (e *entry) dedupe() {
if !e.unsorted {
return
}
e.values = e.values.Deduplicate()
e.unsorted = false
// Update size.
e.size = 0
for _, v := range e.values {
e.size += uint64(v.Size())
}
e.values = append(e.values, values...)
e.size += uint64(Values(values).Size())
}
// entries maps checkpoints to entry objects.
type entries struct {
ee map[uint64]*entry
m map[uint64]*entry
size uint64
}
func newEntries() entries {
return entries{
ee: make(map[uint64]*entry),
// newEntries returns an instance of entries.
func newEntries() *entries {
return &entries{
m: make(map[uint64]*entry),
}
}
func (a entries) add(values []Value, checkpoint uint64) {
e, ok := a.ee[checkpoint]
// add adds checkpointed values to the entries object.
func (a *entries) add(checkpoint uint64, values []Value) {
e, ok := a.m[checkpoint]
if !ok {
e = newEntry()
a.ee[checkpoint] = e
a.m[checkpoint] = e
}
oldSize := e.size
e.add(values)
a.size += e.size - oldSize
}
// purge deletes all data that is as old as the checkpoint. Returns point-calculated
// space freed-up.
func (a entries) purge(checkpoint uint64) uint64 {
var size uint64
for k, v := range a.ee {
// evict evicts all data associated with checkpoints up to and including
// the given checkpoint.
func (a *entries) evict(checkpoint uint64) {
for k, v := range a.m {
if k > checkpoint {
continue
}
size += v.size
delete(a.ee, k)
a.size -= v.size
delete(a.m, k)
}
return size
}
// size returns point-calcuated storage size.
func (a entries) size() uint64 {
var size uint64
for _, v := range a.ee {
size += v.size
// clone returns a copy of all underlying Values. Values are not sorted, nor deduped.
func (a *entries) clone() Values {
var checkpoints checkpoints
for k, _ := range a.m {
checkpoints = append(checkpoints, k)
}
return size
}
sort.Sort(checkpoints)
// clone returns the values for all entries under management, deduped and ordered by time.
func (a entries) clone() Values {
var keys []uint64
var values Values
for k, _ := range a.ee {
keys = append(keys, k)
for _, k := range checkpoints {
values = append(values, a.m[k].values...)
}
sort.Sort(checkpoints(keys))
return values
}
for _, k := range keys {
v := a.ee[k]
v.dedupe()
values = append(values, v.values...)
}
// XXX TO CONSIDER: it might be worth memoizing this.
return values.Deduplicate()
// dedupe returns a copy of all underlying Values. Values are deduped and sorted.
func (a *entries) dedupe() Values {
return a.clone().Deduplicate()
}
// Cache maintains an in-memory store of Values for a set of keys. As data is added to the cache
// it will evict older data as necessary to make room for the new entries.
type Cache struct {
mu sync.RWMutex
store map[string]entries
store map[string]*entries
checkpoint uint64
size uint64
maxSize uint64
lru *lru // List of entry keys from most recently accessed to least.
}
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
func NewCache(maxSize uint64) *Cache {
return &Cache{
maxSize: maxSize,
store: make(map[string]entries),
lru: newLRU(),
store: make(map[string]*entries),
}
}
@ -211,13 +121,8 @@ func (c *Cache) Write(key string, values []Value, checkpoint uint64) error {
return ErrCacheInvalidCheckpoint
}
newSize := c.size + uint64(Values(values).Size())
if newSize >= c.maxSize {
c.evict(newSize - c.maxSize)
}
// Size OK now?
if c.size >= c.maxSize {
// Enough room in the cache?
if c.size+uint64(Values(values).Size()) > c.maxSize {
return ErrCacheMemoryExceeded
}
@ -226,18 +131,15 @@ func (c *Cache) Write(key string, values []Value, checkpoint uint64) error {
e = newEntries()
c.store[key] = e
}
e.add(values, checkpoint)
c.size = newSize
// Mark entry as most-recently used.
c.lru.MoveToFront(key)
oldSize := e.size
e.add(checkpoint, values)
c.size += e.size - oldSize
return nil
}
// SetCheckpoint informs the cache that updates received up to and including checkpoint
// can be safely evicted. Setting a checkpoint does not mean that eviction up to that
// point will actually occur.
// can be safely evicted.
func (c *Cache) SetCheckpoint(checkpoint uint64) error {
c.mu.Lock()
defer c.mu.Unlock()
@ -245,10 +147,11 @@ func (c *Cache) SetCheckpoint(checkpoint uint64) error {
return ErrCacheInvalidCheckpoint
}
c.checkpoint = checkpoint
c.evict()
return nil
}
// Size returns the number of bytes the cache currently uses.
// Size returns the number of point-calcuated bytes the cache currently uses.
func (c *Cache) Size() uint64 {
c.mu.RLock()
defer c.mu.RUnlock()
@ -277,57 +180,42 @@ func (c *Cache) Checkpoint() uint64 {
return c.checkpoint
}
// Evict instructs the cache to evict.
func (c *Cache) Evict(size uint64) uint64 {
// Evict forces the cache to evict.
func (c *Cache) Evict() {
c.mu.Lock()
defer c.mu.Unlock()
return c.evict(size)
c.evict()
}
// Cursor returns a cursor for the given key.
func (c *Cache) Cursor(key string) tsdb.Cursor {
c.mu.RLock()
defer c.mu.RUnlock()
// e, ok := c.store[key]
// if !ok {
// return nil
// }
// // Mark entry as most-recently used.
// c.lru.MoveToFront(key)
// e.dedupe()
// _ = e.clone()
// // Actually return a cursor
// Mark entry as most-recently used.
c.lru.MoveToFront(key)
return nil
}
// evict instructs the cache to evict data until all data with an associated checkpoint
// before the last checkpoint was set, or memory footprint decreases by the given size,
// whichever happens first. Returns the number of point-calculated bytes that were
// actually evicted.
func (c *Cache) evict(size uint64) uint64 {
var freed uint64
defer func() {
c.size -= freed
// Values returns a copy of all values, deduped and sorted, for the given key.
func (c *Cache) Values(key string) Values {
values := func() Values {
c.mu.RLock()
defer c.mu.RUnlock()
e, ok := c.store[key]
if !ok {
return nil
}
return e.clone()
}()
// Now have copy, so perform dedupe and sort outside of lock, unblocking
// writes.
c.lru.DoFromLeast(func(key string) {
e := c.store[key]
freed += e.purge(c.checkpoint)
if e.size() == 0 {
// If the entry for the key is empty, remove all reference from the store.
if values == nil {
return nil
}
return values.Deduplicate()
}
// evict instructs the cache to evict data up to and including the current checkpoint.
func (c *Cache) evict() {
for key, entries := range c.store {
oldSize := entries.size
entries.evict(c.checkpoint)
c.size -= oldSize - entries.size
if entries.size == 0 {
// All data for the key evicted.
delete(c.store, key)
}
if freed >= size {
return
}
})
return freed
}
}

View File

@ -6,151 +6,34 @@ import (
"time"
)
func Test_LRU(t *testing.T) {
lru := newLRU()
if lru == nil {
t.Fatalf("failed to create LRU")
}
// Test adding various elements to the LRU.
lru.MoveToFront("A")
if f := lru.Front(); f != "A" {
t.Fatalf("first inserted key not at front, got: %s", f)
}
if f := lru.Back(); f != "A" {
t.Fatalf("first inserted key not at back, got: %s", f)
}
lru.MoveToFront("B")
if f := lru.Front(); f != "B" {
t.Fatalf("second inserted key not at front, got: %s", f)
}
if f := lru.Back(); f != "A" {
t.Fatalf("second inserted key not at back, got: %s", f)
}
lru.MoveToFront("C")
if f := lru.Front(); f != "C" {
t.Fatalf("second inserted key not at front, got: %s", f)
}
if f := lru.Back(); f != "A" {
t.Fatalf("second inserted key not at back, got: %s", f)
}
lru.MoveToFront("A")
if f := lru.Front(); f != "A" {
t.Fatalf("second inserted key not at front, got: %s", f)
}
if f := lru.Back(); f != "B" {
t.Fatalf("second inserted key not at back, got: %s", f)
}
// Ensure that LRU ordering is correct.
expectedOrder, gotOrder := []string{"B", "C", "A"}, []string{}
lru.DoFromLeast(func(key string) {
gotOrder = append(gotOrder, key)
})
if !reflect.DeepEqual(expectedOrder, gotOrder) {
t.Fatalf("expected LRU order not correct, got %v, exp %v", gotOrder, expectedOrder)
}
// Ensure ordering is still correct after various remove operations.
lru.Remove("A")
lru.Remove("X")
expectedOrder, gotOrder = []string{"B", "C"}, []string{}
lru.DoFromLeast(func(key string) {
gotOrder = append(gotOrder, key)
})
if !reflect.DeepEqual(expectedOrder, gotOrder) {
t.Fatalf("expected LRU order not correct post remove, got %v, exp %v", gotOrder, expectedOrder)
}
}
func Test_EntryAdd(t *testing.T) {
e := newEntry()
v1 := NewValue(time.Unix(2, 0).UTC(), 1.0)
v2 := NewValue(time.Unix(3, 0).UTC(), 2.0)
v3 := NewValue(time.Unix(1, 0).UTC(), 2.0)
e.add([]Value{v1, v2})
if e.size != uint64(v1.Size()+v2.Size()) {
t.Fatal("adding points to entry, wrong size")
}
if e.unsorted {
t.Fatal("adding ordered points resulted in unordered entry")
}
e.add([]Value{v3})
if e.size != uint64(v1.Size()+v2.Size()+v3.Size()) {
t.Fatal("adding point to entry, wrong size")
}
if !e.unsorted {
t.Fatal("adding unordered point resulted in ordered entry")
}
}
func Test_EntryDedupe(t *testing.T) {
e := newEntry()
v1 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v2 := NewValue(time.Unix(2, 0).UTC(), 2.0)
v3 := NewValue(time.Unix(1, 0).UTC(), 2.0)
e.add([]Value{v1, v2})
if e.size != uint64(v1.Size()+v2.Size()) {
t.Fatal("adding points to entry, wrong size")
}
if !reflect.DeepEqual(e.values, Values{v1, v2}) {
t.Fatal("entry values not as expected")
}
e.dedupe()
if !reflect.DeepEqual(e.values, Values{v1, v2}) {
t.Fatal("entry values not as expected after dedupe")
}
e.add([]Value{v3})
if !reflect.DeepEqual(e.values, Values{v1, v2, v3}) {
t.Fatal("entry values not as expected after v3")
}
if e.size != uint64(v1.Size()+v2.Size()+v3.Size()) {
t.Fatal("adding points to entry, wrong size")
}
e.dedupe()
if e.size != uint64(v3.Size()+v2.Size()) {
t.Fatal("adding points to entry, wrong size")
}
if !reflect.DeepEqual(e.values, Values{v3, v2}) {
t.Fatal("entry values not as expected dedupe of v3")
}
}
func Test_EntriesAdd(t *testing.T) {
e := newEntries()
v1 := NewValue(time.Unix(2, 0).UTC(), 1.0)
v2 := NewValue(time.Unix(3, 0).UTC(), 2.0)
v3 := NewValue(time.Unix(1, 0).UTC(), 2.0)
e.add([]Value{v1, v2}, uint64(100))
if e.size() != uint64(v1.Size()+v2.Size()) {
e.add(uint64(100), []Value{v1, v2})
if e.size != uint64(v1.Size()+v2.Size()) {
t.Fatal("adding points to entry, wrong size")
}
e.add([]Value{v3}, uint64(100))
if e.size() != uint64(v1.Size()+v2.Size()+v3.Size()) {
e.add(uint64(100), []Value{v3})
if e.size != uint64(v1.Size()+v2.Size()+v3.Size()) {
t.Fatal("adding point to entry, wrong size")
}
}
func Test_EntriesClone(t *testing.T) {
func Test_EntriesDedupe(t *testing.T) {
e := newEntries()
v0 := NewValue(time.Unix(4, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
v2 := NewValue(time.Unix(3, 0).UTC(), 3.0)
v3 := NewValue(time.Unix(3, 0).UTC(), 4.0)
e.add([]Value{v0, v1}, uint64(100))
e.add([]Value{v2}, uint64(200))
e.add([]Value{v3}, uint64(400))
e.add(uint64(100), []Value{v0, v1})
e.add(uint64(200), []Value{v2})
e.add(uint64(400), []Value{v3})
values := e.clone()
values := e.dedupe()
if len(values) != 3 {
t.Fatalf("cloned values is wrong length, got %d", len(values))
}
@ -163,22 +46,21 @@ func Test_EntriesClone(t *testing.T) {
if !reflect.DeepEqual(values[2], v0) {
t.Fatal("2nd point does not equal v0:", values[0], v0)
}
if n := e.purge(100); n != uint64(v0.Size()+v1.Size()) {
t.Fatal("wrong size of points purged:", n)
}
}
func Test_EntriesPurge(t *testing.T) {
func Test_EntriesEvict(t *testing.T) {
e := newEntries()
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
v2 := NewValue(time.Unix(3, 0).UTC(), 3.0)
e.add([]Value{v0, v1}, uint64(100))
e.add([]Value{v2}, uint64(200))
e.add(uint64(100), []Value{v0, v1})
e.add(uint64(200), []Value{v2})
if e.size != uint64(v0.Size()+v1.Size()+v2.Size()) {
t.Fatal("wrong size post eviction:", e.size)
}
values := e.clone()
values := e.dedupe()
if len(values) != 3 {
t.Fatalf("cloned values is wrong length, got %d", len(values))
}
@ -192,11 +74,12 @@ func Test_EntriesPurge(t *testing.T) {
t.Fatal("2nd point does not equal v2:", values[0], v2)
}
if n := e.purge(100); n != uint64(v0.Size()+v1.Size()) {
t.Fatal("wrong size of points purged:", n)
e.evict(100)
if e.size != uint64(v2.Size()) {
t.Fatalf("wrong size post eviction, exp: %d, got %d:", v2.Size(), e.size)
}
values = e.clone()
values = e.dedupe()
if len(values) != 1 {
t.Fatalf("purged cloned values is wrong length, got %d", len(values))
}
@ -204,10 +87,12 @@ func Test_EntriesPurge(t *testing.T) {
t.Fatal("0th point does not equal v1:", values[0], v2)
}
if n := e.purge(200); n != uint64(v2.Size()) {
t.Fatal("wrong size of points purged:", n)
e.evict(200)
if e.size != 0 {
t.Fatal("wrong size post eviction of last point:", e.size)
}
values = e.clone()
values = e.dedupe()
if len(values) != 0 {
t.Fatalf("purged cloned values is wrong length, got %d", len(values))
}
@ -257,6 +142,31 @@ func Test_CacheWrite(t *testing.T) {
}
}
func Test_CacheValues(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 0.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)
v2 := NewValue(time.Unix(3, 0).UTC(), 3.0)
v3 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v4 := NewValue(time.Unix(4, 0).UTC(), 4.0)
c := MustNewCache(512)
if deduped := c.Values("no such key"); deduped != nil {
t.Fatalf("Values returned for no such key")
}
if err := c.Write("foo", Values{v0, v1, v2, v3}, 100); err != nil {
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
}
if err := c.Write("foo", Values{v4}, 200); err != nil {
t.Fatalf("failed to write 1 value, key foo to cache: %s", err.Error())
}
expValues := Values{v3, v1, v2, v4}
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
t.Fatalf("deduped values for foo incorrect, exp: %v, got %v", expValues, deduped)
}
}
func Test_CacheCheckpoint(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)