Update cache to address PR comments
parent
be4891c40b
commit
4624fb2a78
|
@ -58,8 +58,8 @@ type Cache struct {
|
||||||
// flushingCaches are the cache objects that are currently being written to tsm files
|
// flushingCaches are the cache objects that are currently being written to tsm files
|
||||||
// they're kept in memory while flushing so they can be queried along with the cache.
|
// they're kept in memory while flushing so they can be queried along with the cache.
|
||||||
// they are read only and should never be modified
|
// they are read only and should never be modified
|
||||||
flushingCaches []*Cache
|
snapshotCaches []*Cache
|
||||||
flushingCachesSize uint64
|
snapshotCachesSize uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
|
// NewCache returns an instance of a cache which will use a maximum of maxSize bytes of memory.
|
||||||
|
@ -78,7 +78,7 @@ func (c *Cache) Write(key string, values []Value) error {
|
||||||
|
|
||||||
// Enough room in the cache?
|
// Enough room in the cache?
|
||||||
newSize := c.size + uint64(Values(values).Size())
|
newSize := c.size + uint64(Values(values).Size())
|
||||||
if c.maxSize > 0 && newSize+c.flushingCachesSize > c.maxSize {
|
if c.maxSize > 0 && newSize+c.snapshotCachesSize > c.maxSize {
|
||||||
return ErrCacheMemoryExceeded
|
return ErrCacheMemoryExceeded
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ func (c *Cache) WriteMulti(values map[string][]Value) error {
|
||||||
|
|
||||||
// Enough room in the cache?
|
// Enough room in the cache?
|
||||||
newSize := c.size + uint64(totalSz)
|
newSize := c.size + uint64(totalSz)
|
||||||
if c.maxSize > 0 && newSize+c.flushingCachesSize > c.maxSize {
|
if c.maxSize > 0 && newSize+c.snapshotCachesSize > c.maxSize {
|
||||||
return ErrCacheMemoryExceeded
|
return ErrCacheMemoryExceeded
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,8 +126,8 @@ func (c *Cache) Snapshot() *Cache {
|
||||||
c.store = make(map[string]*entry)
|
c.store = make(map[string]*entry)
|
||||||
c.size = 0
|
c.size = 0
|
||||||
|
|
||||||
c.flushingCaches = append(c.flushingCaches, snapshot)
|
c.snapshotCaches = append(c.snapshotCaches, snapshot)
|
||||||
c.flushingCachesSize += snapshot.size
|
c.snapshotCachesSize += snapshot.size
|
||||||
|
|
||||||
// sort the snapshot before returning it. The compactor and any queries
|
// sort the snapshot before returning it. The compactor and any queries
|
||||||
// coming in while it writes will need the values sorted
|
// coming in while it writes will need the values sorted
|
||||||
|
@ -147,22 +147,13 @@ func (c *Cache) ClearSnapshot(snapshot *Cache) {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
caches := make([]*Cache, 0)
|
for i, cache := range c.snapshotCaches {
|
||||||
cleared := false
|
if cache == snapshot {
|
||||||
for _, cache := range c.flushingCaches {
|
c.snapshotCaches = append(c.snapshotCaches[:i], c.snapshotCaches[i+1:]...)
|
||||||
if cache != snapshot {
|
c.snapshotCachesSize -= snapshot.size
|
||||||
caches = append(caches, cache)
|
break
|
||||||
} else {
|
|
||||||
cleared = true
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
c.flushingCaches = caches
|
|
||||||
|
|
||||||
// update the size if the snapshot was cleared from the flushing caches
|
|
||||||
if cleared {
|
|
||||||
c.flushingCachesSize -= snapshot.size
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Size returns the number of point-calcuated bytes the cache currently uses.
|
// Size returns the number of point-calcuated bytes the cache currently uses.
|
||||||
|
|
Loading…
Reference in New Issue