diff --git a/tsdb/engine/tsm1/DESIGN.md b/tsdb/engine/tsm1/DESIGN.md index dfdff76cae..f1fc115b29 100644 --- a/tsdb/engine/tsm1/DESIGN.md +++ b/tsdb/engine/tsm1/DESIGN.md @@ -32,7 +32,7 @@ Blocks are sequences of block CRC32 and data. The block data is opaque to the f └─────────┴─────────┴─────────┴─────────┴─────────┴─────────┘ ``` -Following the blocks is the index for the blocks in the file. The index is composed of a sequence of index entries ordered lexicographically by key and then by time. Each index entry starts with a key length and key followed by a count of the number of blocks in the file. Each block entry is composed of the min and max time for the block, the offset into the file where the block is located and the the size of the block. +Following the blocks is the index for the blocks in the file. The index is composed of a sequence of index entries ordered lexicographically by key and then by time. Each index entry starts with a key length and key followed by a count of the number of blocks in the file. Each block entry is composed of the min and max time for the block, the offset into the file where the block is located and the size of the block. The index structure can provide efficient access to all blocks as well as the ability to determine the cost associated with acessing given key. Given a key and timestamp, we know exactly which file contains the block for that timestamp as well as where that block resides and how much data to read to retrieve the block. If we know we need to read all or multiple blocks in a file, we can use the size to determine how much to read in a given IO. @@ -97,7 +97,7 @@ The compaction is used to generate a set of SeriesIterators that return a sequen Deletions can occur while a new file is being written. Since the new TSM file is not complete a tombstone would not be written for it. This could result in deleted values getting written into a new file. To prevent this, if a compaction is running and a delete occurs, the current compaction is aborted and new compaction is started. -When all files are processed and succesfully written, completion checkpoint markers are created and files are renamed. The engine then notifies the Cache of the last written timestamp which is used for by the Cache to know what entries can be evicted in the future. +When all files are processed and succesfully written, completion checkpoint markers are created and files are renamed. The engine then notifies the Cache of the checkpoint of the compacted which is used for by the Cache to know what entries can be evicted in the future. This process then runs again until there are no more WAL files and the minimum number of TSM files exists that are also under the maximum file size. @@ -121,6 +121,14 @@ Deletions would not be able to reclaim WAL segments immediately as in the case w Currently, we are moving towards a Single WAL implemention. +# Cache + +The primary purpose of the cache is so that data in the WAL is queryable. The client code writes values to the cache, associating a key and checkpoint with each write. The checkpoint must be a monotonically increasing value, but does not have to increase with every write operation. The cache in turn organises all writes first by key (the cache places no constraints on the key as long as it is non-empty) and then by checkpoint. At a time of its choosing, the client also notifies the cache when previously added data has been drained from the WAL. This allows the cache to evict entries associated with all checkpoints up to and including that checkpoint. Specifically when the cache needs to evict data it first chooses the least-recently-used key ("used" is defined as a write or query of that key) and then all data up-to the checkpoint associated with that key is deleted from memory. + +The purpose of checkpointing is to allow the cache to keep recently written data in memory, even if the client code has indicated that it has been drained from the WAL. If a query can be satified entirely by accessing the cache, the engine can return data for the query much quicker than if it accessed the disk. This is the secondary purpose of the cache. + +The cache tracks it size on a "point-calculated" basis. "Point-calculated" means that the RAM storage footprint for a point in the determined by calling its `Size()` method. While this does not correspond directly to the actual RAM footprint in the cache, the two values are sufficiently correlated for the purpose of controlling RAM. + # TSM File Index Each TSM file contains a full index of the blocks contained within the file. The existing index structure is designed to allow for a binary search across the index to find the starting block for a key. We would then seek to that start key and sequentially scan each block to find the location of a timestamp. @@ -129,7 +137,7 @@ Some issues with the existing structure is that seeking to a given timestamp for We've chosen to update the block index structure to ensure a TSM file is fully self-contained, supports consistent IO characteristics for sequential and random accesses as well as provides an efficient load time regardless of file size. The implications of these changes are that the index is slightly larger and we need to be able to search the index despite each entry being variably sized. -The following are some alternative design options to handle the cases where the index is too large to fit in memory. We are currently planning to use an indirect MMAP indexing approach for loaded TSM files. +The following are some alternative design options to handle the cases where the index is too large to fit in memory. We are currently planning to use an indirect MMAP indexing approach for loaded TSM files. ### Indirect MMAP Indexing @@ -169,7 +177,7 @@ The size of the offsets slice would be proportional to the number of unique seri ### LRU/Lazy Load -A second option could be to have the index work as a memory bounded, lazy-load style cache. When a cache miss occurs, the index structure is scanned to find the the key and the entries are load and added to the cache which causes the least-recently used entries to be evicted. +A second option could be to have the index work as a memory bounded, lazy-load style cache. When a cache miss occurs, the index structure is scanned to find the key and the entries are load and added to the cache which causes the least-recently used entries to be evicted. ### Key Compression @@ -227,7 +235,7 @@ These are some of the high-level components and their responsibilities. These a * Append-only log composed of a fixed size segment files. * Writes are appended to the current segment -* Roll-over to new segment after filling the the current segment +* Roll-over to new segment after filling the current segment * Closed segments are never modified and used for startup and recovery as well as compactions. * There is a single WAL for the store as opposed to a WAL per shard. @@ -263,7 +271,7 @@ These are some of the high-level components and their responsibilities. These a * A TSM file that is opened entails reading in and adding the index section to the `FileIndex`. The block data is then MMAPed up to the index offset to avoid having the index in memory twice. ## FileIndex -* Provides location information to a file and block for a given key and timestamp. +* Provides location information to a file and block for a given key and timestamp. ## Interfaces @@ -288,7 +296,7 @@ func (t *TSMWriter) Close() error ``` -// WALIterator returns the key and []Values for a set of WAL segment files. +// WALIterator returns the key and []Values for a set of WAL segment files. type WALIterator struct{ Files *os.File } @@ -343,7 +351,9 @@ func (f *FileIndex) Location(key, timestamp) (*os.File, uint64, error) ``` type Cache struct {} -func (c *Cache) Write(key string, values []Value) error +func (c *Cache) Write(key string, values []Value, checkpoint uint64) error +func (c *Cache) SetCheckpoint(checkpoint uint64) error +func (c *Cache) Cursor(key string) tsdb.Cursor ``` ``` @@ -375,7 +385,7 @@ Write latency is minimal for the WAL write since there are no seeks. The latenc Query throughput is directly related to how many blocks can be read in a period of time. The index structure contains enough information to determine if one or multiple blocks can be read in a single IO. -Query latency is determine by how long it takes to find and read the relevant blocks. The in-memory index structure contains the offsets and sizes of all blocks for a key. This allows every block to be read in 2 IOPS (seek + read) regardless of position, structure or size of file. +Query latency is determine by how long it takes to find and read the relevant blocks. The in-memory index structure contains the offsets and sizes of all blocks for a key. This allows every block to be read in 2 IOPS (seek + read) regardless of position, structure or size of file. ### Startup @@ -383,11 +393,11 @@ Startup time is proportional to the number of WAL files, TSM files and tombstone ### Compactions -Compactions are IO intensive in that they may need to read multiple, large TSM files to rewrite them. The throughput of a compactions (MB/s) as well as the latency for each compaction is important to keep consistent even as data sizes grow. +Compactions are IO intensive in that they may need to read multiple, large TSM files to rewrite them. The throughput of a compactions (MB/s) as well as the latency for each compaction is important to keep consistent even as data sizes grow. The performance of compactions also has an effect on what data is visible during queries. If the Cache fills up and evicts old entries faster than the compactions can process old WAL files, queries could return return gaps until compactions catch up. -To address these concerns, compactions prioritize old WAL files over optimizing storage/compression to avoid data being hidden overload situations. This also accounts for the fact that shards will eventually become cold for writes so that existing data will be able to be optimized. To maintain consistent performance, the number of each type of file processed as well as the size of each file processed is bounded. +To address these concerns, compactions prioritize old WAL files over optimizing storage/compression to avoid data being hidden overload situations. This also accounts for the fact that shards will eventually become cold for writes so that existing data will be able to be optimized. To maintain consistent performance, the number of each type of file processed as well as the size of each file processed is bounded. ### Memory Footprint @@ -399,7 +409,7 @@ The main concern with concurrency is that reads and writes should not block each 1. Cache series data can be returned to cursors as a copy. Since cache entries are evicted on writes, cursors iteration and writes to the same series could block each other. Iterating over copies of the values can relieve some of this contention. 2. TSM data values returned by the engine are new references to Values and not access to the actual TSM files. This means that the `Engine`, through the `FileStore` can limit contention. -3. Compactions are the only place where new TSM files are added and removed. Since this is a serial, continously running process, file contention is minimized. +3. Compactions are the only place where new TSM files are added and removed. Since this is a serial, continously running process, file contention is minimized. ## Robustness @@ -407,5 +417,5 @@ The two robustness concerns considered by this design are writes filling the cac Writes filling up cache faster than the WAL segments can be processed result in the oldest entries being evicted from the cache. This is the normal operation for the cache. Old entries are always evicited to make room for new entries. In the case where WAL segements are slow to be processed, writes are not blocked or errored so timeouts should not occur due to IO issues. A side effect of this is that queries for recent data will always be served from memory. The size of the in-memory cache can also be tuned so that if IO does because a bottleneck the window of time for queries with recent data can be tuned. -Crash recovery is handled by using copy-on-write style updates along with checkpoint marker files. Existing data is never updated. Updates and deletes to existing data are recored as new changes and processed at compaction and query time. +Crash recovery is handled by using copy-on-write style updates along with checkpoint marker files. Existing data is never updated. Updates and deletes to existing data are recored as new changes and processed at compaction and query time. diff --git a/tsdb/engine/tsm1/cache.go b/tsdb/engine/tsm1/cache.go new file mode 100644 index 0000000000..597016928f --- /dev/null +++ b/tsdb/engine/tsm1/cache.go @@ -0,0 +1,320 @@ +package tsm1 + +import ( + "container/list" + "fmt" + "sort" + "sync" + + "github.com/influxdb/influxdb/tsdb" +) + +var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded") +var ErrCacheInvalidCheckpoint = fmt.Errorf("invalid checkpoint") + +// lru orders string keys from least-recently used to most-recently used. It is not +// goroutine safe. +type lru struct { + list *list.List + elements map[string]*list.Element +} + +// newLRU returns an initialized LRU. +func newLRU() *lru { + return &lru{ + list: list.New(), + elements: make(map[string]*list.Element), + } +} + +// MoveToFront marks key as the most recently used key. +func (l *lru) MoveToFront(key string) { + e, ok := l.elements[key] + if !ok { + l.elements[key] = l.list.PushFront(key) + return + } + l.list.MoveToFront(e) +} + +// Remove removes key from the LRU. If the key does not exist nothing happens. +func (l *lru) Remove(key string) { + if _, ok := l.elements[key]; ok { + l.list.Remove(l.elements[key]) + delete(l.elements, key) + } +} + +// Front returns the most-recently used key. If there is no such key, then "" is returned. +func (l *lru) Front() string { + e := l.list.Front() + if e == nil { + return "" + } + return e.Value.(string) +} + +// Back returns the least-recently used key. If there is no such key, then "" is returned. +func (l *lru) Back() string { + e := l.list.Back() + if e == nil { + return "" + } + return e.Value.(string) +} + +// DoFromLeast iterates through the LRU, from least-recently used to most-recently used, +// calling the given function with each key. +func (l *lru) DoFromLeast(f func(key string)) { + for e := l.list.Back(); e != nil; e = e.Prev() { + f(e.Value.(string)) + } +} + +// entry is the set of all values received for a given key. +type entry struct { + values Values // All stored values. + unsorted bool // Whether the data requires sorting and deduping before query. + size uint64 // Total Number of point-calculated bytes stored by this entry. +} + +// newEntry returns a new instance of entry. +func newEntry() *entry { + return &entry{} +} + +// add adds the given values to the entry. +func (e *entry) add(values []Value) { + for _, v := range values { + // Only mark unsorted if not already marked. + if !e.unsorted && len(e.values) > 1 { + e.unsorted = e.values[len(e.values)-1].Time().UnixNano() >= v.Time().UnixNano() + } + e.values = append(e.values, v) + e.size += uint64(v.Size()) + } +} + +// dedupe remove duplicate entries for the same timestamp and sorts the entries. +func (e *entry) dedupe() { + if !e.unsorted { + return + } + e.values = e.values.Deduplicate() + e.unsorted = false + + // Update size. + e.size = 0 + for _, v := range e.values { + e.size += uint64(v.Size()) + } +} + +type entries struct { + ee map[uint64]*entry +} + +func newEntries() entries { + return entries{ + ee: make(map[uint64]*entry), + } +} + +func (a entries) add(values []Value, checkpoint uint64) { + e, ok := a.ee[checkpoint] + if !ok { + e = newEntry() + a.ee[checkpoint] = e + } + e.add(values) +} + +// purge deletes all data that is as old as the checkpoint. Returns point-calculated +// space freed-up. +func (a entries) purge(checkpoint uint64) uint64 { + var size uint64 + for k, v := range a.ee { + if k > checkpoint { + continue + } + size += v.size + delete(a.ee, k) + } + return size +} + +// size returns point-calcuated storage size. +func (a entries) size() uint64 { + var size uint64 + for _, v := range a.ee { + size += v.size + } + return size +} + +// clone returns the values for all entries under management, deduped and ordered by time. +func (a entries) clone() Values { + var values Values + for _, v := range a.ee { + v.dedupe() + values = append(values, v.values...) + } + // XXX TO CONSIDER: it might be worth memoizing this. + return values.Deduplicate() +} + +// Cache maintains an in-memory store of Values for a set of keys. As data is added to the cache +// it will evict older data as necessary to make room for the new entries. +type Cache struct { + mu sync.RWMutex + store map[string]entries + checkpoint uint64 + size uint64 + maxSize uint64 + + lru *lru // List of entry keys from most recently accessed to least. +} + +// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory. +func NewCache(maxSize uint64) *Cache { + return &Cache{ + maxSize: maxSize, + store: make(map[string]entries), + lru: newLRU(), + } +} + +// WriteKey writes the set of values for the key to the cache. It associates the data with +// the given checkpoint. This function is goroutine-safe. +// +// TODO: This function is a significant potential bottleneck. It is possible that keys could +// be modified while an eviction process was taking place, so a big cache-level lock is in place. +// Need to revisit this. It's correct but may not be performant (it is the same as the existing +// design however) +func (c *Cache) Write(key string, values []Value, checkpoint uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + if checkpoint < c.checkpoint { + return ErrCacheInvalidCheckpoint + } + + newSize := c.size + uint64(Values(values).Size()) + if newSize >= c.maxSize { + c.evict(newSize - c.maxSize) + } + + // Size OK now? + if c.size >= c.maxSize { + return ErrCacheMemoryExceeded + } + + e, ok := c.store[key] + if !ok { + e = newEntries() + c.store[key] = e + } + e.add(values, checkpoint) + c.size = newSize + + // Mark entry as most-recently used. + c.lru.MoveToFront(key) + + return nil +} + +// SetCheckpoint informs the cache that updates received up to and including checkpoint +// can be safely evicted. Setting a checkpoint does not mean that eviction up to that +// point will actually occur. +func (c *Cache) SetCheckpoint(checkpoint uint64) error { + c.mu.Lock() + defer c.mu.Unlock() + if checkpoint < c.checkpoint { + return ErrCacheInvalidCheckpoint + } + c.checkpoint = checkpoint + return nil +} + +// Size returns the number of bytes the cache currently uses. +func (c *Cache) Size() uint64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.size +} + +// MaxSize returns the maximum number of bytes the cache may consume. +func (c *Cache) MaxSize() uint64 { + return c.maxSize +} + +// Keys returns a sorted slice of all keys under management by the cache. +func (c *Cache) Keys() []string { + var a []string + for k, _ := range c.store { + a = append(a, k) + } + sort.Strings(a) + return a +} + +// Checkpoint returns the current checkpoint for the cache. +func (c *Cache) Checkpoint() uint64 { + c.mu.RLock() + defer c.mu.RUnlock() + return c.checkpoint +} + +// Evict instructs the cache to evict. +func (c *Cache) Evict(size uint64) uint64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.evict(size) +} + +// Cursor returns a cursor for the given key. +func (c *Cache) Cursor(key string) tsdb.Cursor { + c.mu.RLock() + defer c.mu.RUnlock() + + // e, ok := c.store[key] + // if !ok { + // return nil + // } + + // // Mark entry as most-recently used. + // c.lru.MoveToFront(key) + + // e.dedupe() + // _ = e.clone() + // // Actually return a cursor + + // Mark entry as most-recently used. + c.lru.MoveToFront(key) + return nil +} + +// evict instructs the cache to evict data until all data with an associated checkpoint +// before the last checkpoint was set, or memory footprint decreases by the given size, +// whichever happens first. Returns the number of point-calculated bytes that were +// actually evicted. +func (c *Cache) evict(size uint64) uint64 { + var freed uint64 + defer func() { + c.size -= freed + }() + + c.lru.DoFromLeast(func(key string) { + e := c.store[key] + freed += e.purge(c.checkpoint) + if e.size() == 0 { + // If the entry for the key is empty, remove all reference from the store. + delete(c.store, key) + } + + if freed >= size { + return + } + }) + + return freed +} diff --git a/tsdb/engine/tsm1/cache_test.go b/tsdb/engine/tsm1/cache_test.go new file mode 100644 index 0000000000..32f56787bc --- /dev/null +++ b/tsdb/engine/tsm1/cache_test.go @@ -0,0 +1,321 @@ +package tsm1 + +import ( + "reflect" + "testing" + "time" +) + +func Test_LRU(t *testing.T) { + lru := newLRU() + if lru == nil { + t.Fatalf("failed to create LRU") + } + + // Test adding various elements to the LRU. + + lru.MoveToFront("A") + if f := lru.Front(); f != "A" { + t.Fatalf("first inserted key not at front, got: %s", f) + } + if f := lru.Back(); f != "A" { + t.Fatalf("first inserted key not at back, got: %s", f) + } + + lru.MoveToFront("B") + if f := lru.Front(); f != "B" { + t.Fatalf("second inserted key not at front, got: %s", f) + } + if f := lru.Back(); f != "A" { + t.Fatalf("second inserted key not at back, got: %s", f) + } + + lru.MoveToFront("C") + if f := lru.Front(); f != "C" { + t.Fatalf("second inserted key not at front, got: %s", f) + } + if f := lru.Back(); f != "A" { + t.Fatalf("second inserted key not at back, got: %s", f) + } + + lru.MoveToFront("A") + if f := lru.Front(); f != "A" { + t.Fatalf("second inserted key not at front, got: %s", f) + } + if f := lru.Back(); f != "B" { + t.Fatalf("second inserted key not at back, got: %s", f) + } + + // Ensure that LRU ordering is correct. + expectedOrder, gotOrder := []string{"B", "C", "A"}, []string{} + lru.DoFromLeast(func(key string) { + gotOrder = append(gotOrder, key) + }) + if !reflect.DeepEqual(expectedOrder, gotOrder) { + t.Fatalf("expected LRU order not correct, got %v, exp %v", gotOrder, expectedOrder) + } + + // Ensure ordering is still correct after various remove operations. + lru.Remove("A") + lru.Remove("X") + expectedOrder, gotOrder = []string{"B", "C"}, []string{} + lru.DoFromLeast(func(key string) { + gotOrder = append(gotOrder, key) + }) + if !reflect.DeepEqual(expectedOrder, gotOrder) { + t.Fatalf("expected LRU order not correct post remove, got %v, exp %v", gotOrder, expectedOrder) + } +} + +func Test_EntryAdd(t *testing.T) { + e := newEntry() + v1 := NewValue(time.Unix(2, 0).UTC(), 1.0) + v2 := NewValue(time.Unix(3, 0).UTC(), 2.0) + v3 := NewValue(time.Unix(1, 0).UTC(), 2.0) + + e.add([]Value{v1, v2}) + if e.size != uint64(v1.Size()+v2.Size()) { + t.Fatal("adding points to entry, wrong size") + } + if e.unsorted { + t.Fatal("adding ordered points resulted in unordered entry") + } + e.add([]Value{v3}) + if e.size != uint64(v1.Size()+v2.Size()+v3.Size()) { + t.Fatal("adding point to entry, wrong size") + } + if !e.unsorted { + t.Fatal("adding unordered point resulted in ordered entry") + } +} + +func Test_EntryDedupe(t *testing.T) { + e := newEntry() + v1 := NewValue(time.Unix(1, 0).UTC(), 1.0) + v2 := NewValue(time.Unix(2, 0).UTC(), 2.0) + v3 := NewValue(time.Unix(1, 0).UTC(), 2.0) + + e.add([]Value{v1, v2}) + if e.size != uint64(v1.Size()+v2.Size()) { + t.Fatal("adding points to entry, wrong size") + } + if !reflect.DeepEqual(e.values, Values{v1, v2}) { + t.Fatal("entry values not as expected") + } + e.dedupe() + if !reflect.DeepEqual(e.values, Values{v1, v2}) { + t.Fatal("entry values not as expected after dedupe") + } + + e.add([]Value{v3}) + if !reflect.DeepEqual(e.values, Values{v1, v2, v3}) { + t.Fatal("entry values not as expected after v3") + } + if e.size != uint64(v1.Size()+v2.Size()+v3.Size()) { + t.Fatal("adding points to entry, wrong size") + } + e.dedupe() + if e.size != uint64(v3.Size()+v2.Size()) { + t.Fatal("adding points to entry, wrong size") + } + if !reflect.DeepEqual(e.values, Values{v3, v2}) { + t.Fatal("entry values not as expected dedupe of v3") + } +} + +func Test_EntriesAdd(t *testing.T) { + e := newEntries() + v1 := NewValue(time.Unix(2, 0).UTC(), 1.0) + v2 := NewValue(time.Unix(3, 0).UTC(), 2.0) + v3 := NewValue(time.Unix(1, 0).UTC(), 2.0) + + e.add([]Value{v1, v2}, uint64(100)) + if e.size() != uint64(v1.Size()+v2.Size()) { + t.Fatal("adding points to entry, wrong size") + } + e.add([]Value{v3}, uint64(100)) + if e.size() != uint64(v1.Size()+v2.Size()+v3.Size()) { + t.Fatal("adding point to entry, wrong size") + } +} + +func Test_EntriesClone(t *testing.T) { + e := newEntries() + v0 := NewValue(time.Unix(4, 0).UTC(), 1.0) + v1 := NewValue(time.Unix(2, 0).UTC(), 2.0) + v2 := NewValue(time.Unix(3, 0).UTC(), 3.0) + v3 := NewValue(time.Unix(3, 0).UTC(), 4.0) + + e.add([]Value{v0, v1}, uint64(100)) + e.add([]Value{v2}, uint64(200)) + e.add([]Value{v3}, uint64(400)) + + values := e.clone() + if len(values) != 3 { + t.Fatalf("cloned values is wrong length, got %d", len(values)) + } + if !reflect.DeepEqual(values[0], v1) { + t.Fatal("0th point does not equal v1:", values[0], v1) + } + if !reflect.DeepEqual(values[1], v3) { + t.Fatal("1st point does not equal v3:", values[0], v3) + } + if !reflect.DeepEqual(values[2], v0) { + t.Fatal("2nd point does not equal v0:", values[0], v0) + } + + if n := e.purge(100); n != uint64(v0.Size()+v1.Size()) { + t.Fatal("wrong size of points purged:", n) + } +} + +func Test_EntriesPurge(t *testing.T) { + e := newEntries() + v0 := NewValue(time.Unix(1, 0).UTC(), 1.0) + v1 := NewValue(time.Unix(2, 0).UTC(), 2.0) + v2 := NewValue(time.Unix(3, 0).UTC(), 3.0) + + e.add([]Value{v0, v1}, uint64(100)) + e.add([]Value{v2}, uint64(200)) + + values := e.clone() + if len(values) != 3 { + t.Fatalf("cloned values is wrong length, got %d", len(values)) + } + if !reflect.DeepEqual(values[0], v0) { + t.Fatal("0th point does not equal v0:", values[0], v0) + } + if !reflect.DeepEqual(values[1], v1) { + t.Fatal("1st point does not equal v1:", values[0], v1) + } + if !reflect.DeepEqual(values[2], v2) { + t.Fatal("2nd point does not equal v2:", values[0], v2) + } + + if n := e.purge(100); n != uint64(v0.Size()+v1.Size()) { + t.Fatal("wrong size of points purged:", n) + } + + values = e.clone() + if len(values) != 1 { + t.Fatalf("purged cloned values is wrong length, got %d", len(values)) + } + if !reflect.DeepEqual(values[0], v2) { + t.Fatal("0th point does not equal v1:", values[0], v2) + } + + if n := e.purge(200); n != uint64(v2.Size()) { + t.Fatal("wrong size of points purged:", n) + } + values = e.clone() + if len(values) != 0 { + t.Fatalf("purged cloned values is wrong length, got %d", len(values)) + } +} + +func Test_NewCache(t *testing.T) { + c := NewCache(100) + if c == nil { + t.Fatalf("failed to create new cache") + } + + if c.MaxSize() != 100 { + t.Fatalf("new cache max size not correct") + } + if c.Size() != 0 { + t.Fatalf("new cache size not correct") + } + if c.Checkpoint() != 0 { + t.Fatalf("new checkpoint not correct") + } + if len(c.Keys()) != 0 { + t.Fatalf("new cache keys not correct: %v", c.Keys()) + } +} + +func Test_CacheWrite(t *testing.T) { + v0 := NewValue(time.Unix(1, 0).UTC(), 1.0) + v1 := NewValue(time.Unix(2, 0).UTC(), 2.0) + v2 := NewValue(time.Unix(3, 0).UTC(), 3.0) + values := Values{v0, v1, v2} + valuesSize := uint64(v0.Size() + v1.Size() + v2.Size()) + + c := MustNewCache(3 * valuesSize) + + if err := c.Write("foo", values, 100); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + if err := c.Write("bar", values, 100); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + if n := c.Size(); n != 2*valuesSize { + t.Fatalf("cache size incorrect after 2 writes, exp %d, got %d", 2*valuesSize, n) + } + + if exp, keys := []string{"bar", "foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) { + t.Fatalf("cache keys incorrect after 2 writes, exp %v, got %v", exp, keys) + } +} + +func Test_CacheCheckpoint(t *testing.T) { + v0 := NewValue(time.Unix(1, 0).UTC(), 1.0) + + c := MustNewCache(1024) + + if err := c.SetCheckpoint(50); err != nil { + t.Fatalf("failed to set checkpoint: %s", err.Error()) + } + if err := c.Write("foo", Values{v0}, 100); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + if err := c.SetCheckpoint(25); err != ErrCacheInvalidCheckpoint { + t.Fatalf("unexpectedly set checkpoint") + } + if err := c.Write("foo", Values{v0}, 30); err != ErrCacheInvalidCheckpoint { + t.Fatalf("unexpectedly wrote key foo to cache") + } +} + +func Test_CacheWriteMemoryExceeded(t *testing.T) { + v0 := NewValue(time.Unix(1, 0).UTC(), 1.0) + v1 := NewValue(time.Unix(2, 0).UTC(), 2.0) + + c := MustNewCache(uint64(v1.Size())) + + if err := c.Write("foo", Values{v0}, 100); err != nil { + t.Fatalf("failed to write key foo to cache: %s", err.Error()) + } + if exp, keys := []string{"foo"}, c.Keys(); !reflect.DeepEqual(keys, exp) { + t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys) + } + if err := c.Write("bar", Values{v1}, 100); err != ErrCacheMemoryExceeded { + t.Fatalf("wrong error writing key bar to cache") + } + + // Set too-early checkpoint, write should still fail. + if err := c.SetCheckpoint(50); err != nil { + t.Fatalf("failed to set checkpoint: %s", err.Error()) + } + if err := c.Write("bar", Values{v1}, 100); err != ErrCacheMemoryExceeded { + t.Fatalf("wrong error writing key bar to cache") + } + + // Set later checkpoint, write should then succeed. + if err := c.SetCheckpoint(100); err != nil { + t.Fatalf("failed to set checkpoint: %s", err.Error()) + } + if err := c.Write("bar", Values{v1}, 100); err != nil { + t.Fatalf("failed to write key bar to checkpointed cache: %s", err.Error()) + } + if exp, keys := []string{"bar"}, c.Keys(); !reflect.DeepEqual(keys, exp) { + t.Fatalf("cache keys incorrect after writes, exp %v, got %v", exp, keys) + } +} + +func MustNewCache(size uint64) *Cache { + c := NewCache(size) + if c == nil { + panic("failed to create cache") + } + return c +} diff --git a/tsdb/engine/tsm1/encoding.go b/tsdb/engine/tsm1/encoding.go index eca5359c07..1b7a742032 100644 --- a/tsdb/engine/tsm1/encoding.go +++ b/tsdb/engine/tsm1/encoding.go @@ -69,6 +69,14 @@ func (a Values) MaxTime() int64 { return a[len(a)-1].Time().UnixNano() } +func (a Values) Size() int { + sz := 0 + for _, v := range a { + sz += v.Size() + } + return sz +} + // Encode converts the values to a byte slice. If there are no values, // this function panics. func (a Values) Encode(buf []byte) ([]byte, error) {