2015-11-11 20:06:02 +00:00
|
|
|
package tsm1
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2015-12-06 03:37:26 +00:00
|
|
|
"log"
|
|
|
|
"os"
|
2015-11-11 20:06:02 +00:00
|
|
|
"sort"
|
|
|
|
"sync"
|
|
|
|
)
|
|
|
|
|
|
|
|
var ErrCacheMemoryExceeded = fmt.Errorf("cache maximum memory size exceeded")
|
|
|
|
var ErrCacheInvalidCheckpoint = fmt.Errorf("invalid checkpoint")
|
|
|
|
|
2015-11-21 03:28:16 +00:00
|
|
|
// entry is a set of values and some metadata.
|
2015-11-11 20:06:02 +00:00
|
|
|
type entry struct {
|
2015-12-03 13:11:50 +00:00
|
|
|
values Values // All stored values.
|
|
|
|
needSort bool // true if the values are out of order and require deduping.
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// newEntry returns a new instance of entry.
|
|
|
|
func newEntry() *entry {
|
|
|
|
return &entry{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// add adds the given values to the entry.
|
|
|
|
func (e *entry) add(values []Value) {
|
2015-12-03 13:11:50 +00:00
|
|
|
// if there are existing values make sure they're all less than the first of
|
|
|
|
// the new values being added
|
|
|
|
l := len(e.values)
|
|
|
|
if l != 0 {
|
|
|
|
lastValTime := e.values[l-1].UnixNano()
|
|
|
|
if lastValTime >= values[0].UnixNano() {
|
|
|
|
e.needSort = true
|
2015-12-01 15:39:12 +00:00
|
|
|
}
|
|
|
|
}
|
2015-12-03 13:11:50 +00:00
|
|
|
e.values = append(e.values, values...)
|
2015-12-01 15:39:12 +00:00
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// if there's only one value, we know it's sorted
|
|
|
|
if len(values) <= 1 {
|
|
|
|
return
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// make sure the new values were in sorted order
|
|
|
|
min := values[0].UnixNano()
|
|
|
|
for _, v := range values[1:] {
|
|
|
|
if min >= v.UnixNano() {
|
|
|
|
e.needSort = true
|
|
|
|
break
|
2015-12-01 15:39:12 +00:00
|
|
|
}
|
|
|
|
}
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
|
|
|
|
2015-12-04 23:46:57 +00:00
|
|
|
// deduplicate sorts and orders the entry's values. If values are already deduped and
|
|
|
|
// and sorted, the function does no work and simply returns.
|
|
|
|
func (e *entry) deduplicate() {
|
2015-12-05 00:37:33 +00:00
|
|
|
if !e.needSort || len(e.values) == 0 {
|
2015-12-04 23:46:57 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
e.values = e.values.Deduplicate()
|
|
|
|
e.needSort = false
|
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// Cache maintains an in-memory store of Values for a set of keys.
|
2015-11-11 20:06:02 +00:00
|
|
|
type Cache struct {
|
2015-12-03 13:11:50 +00:00
|
|
|
mu sync.RWMutex
|
|
|
|
store map[string]*entry
|
|
|
|
size uint64
|
|
|
|
maxSize uint64
|
|
|
|
|
2015-12-04 15:29:58 +00:00
|
|
|
// snapshots are the cache objects that are currently being written to tsm files
|
2015-12-03 13:11:50 +00:00
|
|
|
// they're kept in memory while flushing so they can be queried along with the cache.
|
|
|
|
// they are read only and should never be modified
|
2015-12-04 15:29:58 +00:00
|
|
|
snapshots []*Cache
|
|
|
|
snapshotsSize uint64
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
|
|
|
|
func NewCache(maxSize uint64) *Cache {
|
|
|
|
return &Cache{
|
|
|
|
maxSize: maxSize,
|
2015-12-03 13:11:50 +00:00
|
|
|
store: make(map[string]*entry),
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// Write writes the set of values for the key to the cache. This function is goroutine-safe.
|
|
|
|
// It returns an error if the cache has exceeded its max size.
|
|
|
|
func (c *Cache) Write(key string, values []Value) error {
|
2015-11-11 20:06:02 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2015-11-21 03:28:16 +00:00
|
|
|
// Enough room in the cache?
|
2015-12-03 13:11:50 +00:00
|
|
|
newSize := c.size + uint64(Values(values).Size())
|
2015-12-04 15:29:58 +00:00
|
|
|
if c.maxSize > 0 && newSize+c.snapshotsSize > c.maxSize {
|
2015-11-11 20:06:02 +00:00
|
|
|
return ErrCacheMemoryExceeded
|
|
|
|
}
|
2015-12-03 13:11:50 +00:00
|
|
|
|
|
|
|
c.write(key, values)
|
|
|
|
c.size = newSize
|
|
|
|
|
|
|
|
return nil
|
2015-11-26 14:00:25 +00:00
|
|
|
}
|
2015-11-11 20:06:02 +00:00
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// WriteMulti writes the map of keys and associated values to the cache. This function is goroutine-safe.
|
|
|
|
// It returns an error if the cache has exceeded its max size.
|
|
|
|
func (c *Cache) WriteMulti(values map[string][]Value) error {
|
2015-11-26 14:00:25 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
|
|
|
totalSz := 0
|
|
|
|
for _, v := range values {
|
|
|
|
totalSz += Values(v).Size()
|
|
|
|
}
|
|
|
|
|
|
|
|
// Enough room in the cache?
|
2015-12-03 13:11:50 +00:00
|
|
|
newSize := c.size + uint64(totalSz)
|
2015-12-04 15:29:58 +00:00
|
|
|
if c.maxSize > 0 && newSize+c.snapshotsSize > c.maxSize {
|
2015-11-26 14:00:25 +00:00
|
|
|
return ErrCacheMemoryExceeded
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
|
|
|
|
2015-11-26 14:00:25 +00:00
|
|
|
for k, v := range values {
|
2015-12-03 13:11:50 +00:00
|
|
|
c.write(k, v)
|
2015-11-26 14:00:25 +00:00
|
|
|
}
|
2015-12-03 13:11:50 +00:00
|
|
|
c.size = newSize
|
|
|
|
|
2015-11-11 20:06:02 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// Snapshot will take a snapshot of the current cache, add it to the slice of caches that
|
|
|
|
// are being flushed, and reset the current cache with new values
|
|
|
|
func (c *Cache) Snapshot() *Cache {
|
2015-11-11 20:06:02 +00:00
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
2015-12-03 13:11:50 +00:00
|
|
|
|
|
|
|
snapshot := NewCache(c.maxSize)
|
|
|
|
snapshot.store = c.store
|
|
|
|
snapshot.size = c.size
|
|
|
|
|
|
|
|
c.store = make(map[string]*entry)
|
|
|
|
c.size = 0
|
|
|
|
|
2015-12-04 15:29:58 +00:00
|
|
|
c.snapshots = append(c.snapshots, snapshot)
|
|
|
|
c.snapshotsSize += snapshot.size
|
2015-12-03 13:11:50 +00:00
|
|
|
|
|
|
|
// sort the snapshot before returning it. The compactor and any queries
|
|
|
|
// coming in while it writes will need the values sorted
|
|
|
|
for _, e := range snapshot.store {
|
2015-12-04 23:46:57 +00:00
|
|
|
e.deduplicate()
|
2015-12-03 13:11:50 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return snapshot
|
|
|
|
}
|
|
|
|
|
|
|
|
// ClearSnapshot will remove the snapshot cache from the list of flushing caches and
|
|
|
|
// adjust the size
|
|
|
|
func (c *Cache) ClearSnapshot(snapshot *Cache) {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
|
2015-12-04 15:29:58 +00:00
|
|
|
for i, cache := range c.snapshots {
|
2015-12-03 19:03:11 +00:00
|
|
|
if cache == snapshot {
|
2015-12-04 15:29:58 +00:00
|
|
|
c.snapshots = append(c.snapshots[:i], c.snapshots[i+1:]...)
|
|
|
|
c.snapshotsSize -= snapshot.size
|
2015-12-03 19:03:11 +00:00
|
|
|
break
|
2015-12-03 13:11:50 +00:00
|
|
|
}
|
|
|
|
}
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
|
|
|
|
2015-11-21 03:28:16 +00:00
|
|
|
// Size returns the number of point-calcuated bytes the cache currently uses.
|
2015-11-11 20:06:02 +00:00
|
|
|
func (c *Cache) Size() uint64 {
|
|
|
|
c.mu.RLock()
|
|
|
|
defer c.mu.RUnlock()
|
|
|
|
return c.size
|
|
|
|
}
|
|
|
|
|
|
|
|
// MaxSize returns the maximum number of bytes the cache may consume.
|
|
|
|
func (c *Cache) MaxSize() uint64 {
|
|
|
|
return c.maxSize
|
|
|
|
}
|
|
|
|
|
|
|
|
// Keys returns a sorted slice of all keys under management by the cache.
|
|
|
|
func (c *Cache) Keys() []string {
|
|
|
|
var a []string
|
|
|
|
for k, _ := range c.store {
|
|
|
|
a = append(a, k)
|
|
|
|
}
|
|
|
|
sort.Strings(a)
|
|
|
|
return a
|
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// Values returns a copy of all values, deduped and sorted, for the given key.
|
|
|
|
func (c *Cache) Values(key string) Values {
|
2015-12-05 15:57:55 +00:00
|
|
|
c.mu.RLock()
|
|
|
|
e := c.store[key]
|
|
|
|
if e != nil && e.needSort {
|
|
|
|
// Sorting is needed, so unlock and run the merge operation with
|
|
|
|
// a write-lock. It is actually possible that the data will be
|
|
|
|
// sorted by the time the merge runs, which would mean very occasionally
|
|
|
|
// a write-lock will be held when only a read-lock is required.
|
|
|
|
c.mu.RUnlock()
|
|
|
|
return func() Values {
|
|
|
|
c.mu.Lock()
|
|
|
|
defer c.mu.Unlock()
|
|
|
|
return c.merged(key)
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
|
|
|
|
// No sorting required for key, so just merge while continuing to hold read-lock.
|
|
|
|
return func() Values {
|
2015-12-03 13:11:50 +00:00
|
|
|
defer c.mu.RUnlock()
|
2015-12-05 15:57:55 +00:00
|
|
|
return c.merged(key)
|
|
|
|
}()
|
|
|
|
}
|
2015-12-01 15:39:12 +00:00
|
|
|
|
2015-12-05 15:57:55 +00:00
|
|
|
// merged returns a copy of hot and snapshot values. The copy will be merged, deduped, and
|
|
|
|
// sorted. It assumes all necessary locks have been taken. If the caller knows that the
|
|
|
|
// the hot source data for the key will not be changed, it is safe to call this function
|
|
|
|
// with a read-lock taken. Otherwise it must be called with a write-lock taken.
|
|
|
|
func (c *Cache) merged(key string) Values {
|
2015-12-04 23:37:45 +00:00
|
|
|
e := c.store[key]
|
|
|
|
if e == nil {
|
2015-12-05 00:37:33 +00:00
|
|
|
if len(c.snapshots) == 0 {
|
|
|
|
// No values in hot cache or snapshots.
|
|
|
|
return nil
|
2015-12-02 01:34:48 +00:00
|
|
|
}
|
2015-12-05 00:37:33 +00:00
|
|
|
} else {
|
|
|
|
e.deduplicate()
|
|
|
|
}
|
2015-12-02 01:34:48 +00:00
|
|
|
|
2015-12-05 00:37:33 +00:00
|
|
|
// Build the sequence of entries that will be returned, in the correct order.
|
|
|
|
// Calculate the required size of the destination buffer.
|
|
|
|
var entries []*entry
|
|
|
|
sz := 0
|
|
|
|
for _, s := range c.snapshots {
|
|
|
|
e := s.store[key]
|
|
|
|
if e != nil {
|
|
|
|
entries = append(entries, e)
|
|
|
|
sz += len(e.values)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if e != nil {
|
|
|
|
entries = append(entries, e)
|
|
|
|
sz += len(e.values)
|
|
|
|
}
|
2015-11-11 20:06:02 +00:00
|
|
|
|
2015-12-05 00:37:33 +00:00
|
|
|
// Any entries? If not, return.
|
|
|
|
if sz == 0 {
|
2015-12-04 23:37:45 +00:00
|
|
|
return nil
|
2015-12-01 15:39:12 +00:00
|
|
|
}
|
2015-11-11 20:06:02 +00:00
|
|
|
|
2015-12-05 00:37:33 +00:00
|
|
|
// Create the buffer, and copy all hot values and snapshots. Individual
|
|
|
|
// entries are sorted at this point, so now the code has to check if the
|
|
|
|
// resultant buffer will be sorted from start to finish.
|
|
|
|
var needSort bool
|
|
|
|
values := make(Values, sz)
|
|
|
|
n := 0
|
|
|
|
for _, e := range entries {
|
|
|
|
if !needSort && n > 0 {
|
|
|
|
needSort = values[n-1].UnixNano() > e.values[0].UnixNano()
|
|
|
|
}
|
|
|
|
n += copy(values[n:], e.values)
|
|
|
|
}
|
2015-12-01 15:39:12 +00:00
|
|
|
|
2015-12-05 00:37:33 +00:00
|
|
|
if needSort {
|
|
|
|
values = values.Deduplicate()
|
2015-12-01 15:39:12 +00:00
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
return values
|
2015-11-11 20:06:02 +00:00
|
|
|
}
|
2015-11-26 14:00:25 +00:00
|
|
|
|
2015-12-04 21:40:01 +00:00
|
|
|
// Store returns the underlying cache store. This is not goroutine safe!
|
|
|
|
// Protect access by using the Lock and Unlock functions on Cache.
|
|
|
|
func (c *Cache) Store() map[string]*entry {
|
|
|
|
return c.store
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cache) Lock() {
|
|
|
|
c.mu.Lock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (c *Cache) Unlock() {
|
|
|
|
c.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// values returns the values for the key. It doesn't lock and assumes the data is
|
|
|
|
// already sorted. Should only be used in compact.go in the CacheKeyIterator
|
|
|
|
func (c *Cache) values(key string) Values {
|
|
|
|
e := c.store[key]
|
|
|
|
if e == nil {
|
|
|
|
return nil
|
2015-11-26 14:00:25 +00:00
|
|
|
}
|
2015-12-03 13:11:50 +00:00
|
|
|
return e.values
|
|
|
|
}
|
2015-11-26 14:00:25 +00:00
|
|
|
|
2015-12-03 13:11:50 +00:00
|
|
|
// write writes the set of values for the key to the cache. This function assumes
|
|
|
|
// the lock has been taken and does not enforce the cache size limits.
|
|
|
|
func (c *Cache) write(key string, values []Value) {
|
2015-11-26 14:00:25 +00:00
|
|
|
e, ok := c.store[key]
|
|
|
|
if !ok {
|
2015-12-03 13:11:50 +00:00
|
|
|
e = newEntry()
|
2015-11-26 14:00:25 +00:00
|
|
|
c.store[key] = e
|
|
|
|
}
|
2015-12-03 13:11:50 +00:00
|
|
|
e.add(values)
|
2015-11-26 14:00:25 +00:00
|
|
|
}
|
2015-12-06 03:37:26 +00:00
|
|
|
|
|
|
|
// CacheLoader processes a set of WAL segment files, and loads a cache with the data
|
|
|
|
// contained within those files. Processing of the supplied files take place in the
|
|
|
|
// order they exist in the files slice.
|
|
|
|
type CacheLoader struct {
|
|
|
|
files []string
|
|
|
|
|
|
|
|
Logger *log.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewCacheLoader returns a new instance of a CacheLoader.
|
|
|
|
func NewCacheLoader(files []string) *CacheLoader {
|
|
|
|
return &CacheLoader{
|
|
|
|
files: files,
|
|
|
|
Logger: log.New(os.Stderr, "[cacheloader] ", log.LstdFlags),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Load returns a cache loaded with the data contained within the segment files.
|
|
|
|
// If, during reading of a segment file, corruption is encountered, that segment
|
|
|
|
// file is truncated up to and including the last valid byte, and processing
|
|
|
|
// continues with the next segment file.
|
|
|
|
func (cl *CacheLoader) Load(cache *Cache) error {
|
|
|
|
for _, fn := range cl.files {
|
|
|
|
if err := func() error {
|
|
|
|
f, err := os.OpenFile(fn, os.O_CREATE|os.O_RDWR, 0666)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Log some information about the segments.
|
|
|
|
stat, err := os.Stat(f.Name())
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
cl.Logger.Printf("reading file %s, size %d", f.Name(), stat.Size())
|
|
|
|
|
|
|
|
r := NewWALSegmentReader(f)
|
|
|
|
defer r.Close()
|
|
|
|
|
|
|
|
for r.Next() {
|
|
|
|
entry, err := r.Read()
|
|
|
|
if err != nil {
|
|
|
|
n := r.Count()
|
|
|
|
cl.Logger.Printf("file %s corrupt at position %d, truncating", f.Name(), n)
|
|
|
|
if err := f.Truncate(n); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
|
|
|
|
|
|
|
switch t := entry.(type) {
|
|
|
|
case *WriteWALEntry:
|
|
|
|
if err := cache.WriteMulti(t.Values); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case *DeleteWALEntry:
|
|
|
|
// FIXME: Implement this
|
|
|
|
// if err := e.Cache.Delete(t.Keys); err != nil {
|
|
|
|
// return err
|
|
|
|
// }
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|