Merge pull request #4999 from influxdb/cache_sort

Always copy the Cache values for query and merge with snapshot
pull/5005/merge
Philip O'Toole 2015-12-05 08:15:13 -08:00
commit 7296de1fac
2 changed files with 153 additions and 32 deletions

View File

@ -48,6 +48,16 @@ func (e *entry) add(values []Value) {
}
}
// deduplicate sorts and orders the entry's values. If values are already deduped and
// and sorted, the function does no work and simply returns.
func (e *entry) deduplicate() {
if !e.needSort || len(e.values) == 0 {
return
}
e.values = e.values.Deduplicate()
e.needSort = false
}
// Cache maintains an in-memory store of Values for a set of keys.
type Cache struct {
mu sync.RWMutex
@ -132,10 +142,7 @@ func (c *Cache) Snapshot() *Cache {
// sort the snapshot before returning it. The compactor and any queries
// coming in while it writes will need the values sorted
for _, e := range snapshot.store {
if e.needSort {
e.values = e.values.Deduplicate()
e.needSort = false
}
e.deduplicate()
}
return snapshot
@ -180,39 +187,81 @@ func (c *Cache) Keys() []string {
// Values returns a copy of all values, deduped and sorted, for the given key.
func (c *Cache) Values(key string) Values {
values, needSort := func() (Values, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
e := c.store[key]
if e == nil {
return nil, false
}
if e.needSort {
return nil, true
}
return e.values[0:len(e.values)], false
}()
// the values in the entry require a sort, do so with a write lock so
// we can sort once and set everything in order
if needSort {
values = func() Values {
c.mu.RLock()
e := c.store[key]
if e != nil && e.needSort {
// Sorting is needed, so unlock and run the merge operation with
// a write-lock. It is actually possible that the data will be
// sorted by the time the merge runs, which would mean very occasionally
// a write-lock will be held when only a read-lock is required.
c.mu.RUnlock()
return func() Values {
c.mu.Lock()
defer c.mu.Unlock()
e := c.store[key]
if e == nil {
return nil
}
e.values = e.values.Deduplicate()
e.needSort = false
return e.values[0:len(e.values)]
return c.merged(key)
}()
}
// No sorting required for key, so just merge while continuing to hold read-lock.
return func() Values {
defer c.mu.RUnlock()
return c.merged(key)
}()
}
// merged returns a copy of hot and snapshot values. The copy will be merged, deduped, and
// sorted. It assumes all necessary locks have been taken. If the caller knows that the
// the hot source data for the key will not be changed, it is safe to call this function
// with a read-lock taken. Otherwise it must be called with a write-lock taken.
func (c *Cache) merged(key string) Values {
e := c.store[key]
if e == nil {
if len(c.snapshots) == 0 {
// No values in hot cache or snapshots.
return nil
}
} else {
e.deduplicate()
}
// Build the sequence of entries that will be returned, in the correct order.
// Calculate the required size of the destination buffer.
var entries []*entry
sz := 0
for _, s := range c.snapshots {
e := s.store[key]
if e != nil {
entries = append(entries, e)
sz += len(e.values)
}
}
if e != nil {
entries = append(entries, e)
sz += len(e.values)
}
// Any entries? If not, return.
if sz == 0 {
return nil
}
// Create the buffer, and copy all hot values and snapshots. Individual
// entries are sorted at this point, so now the code has to check if the
// resultant buffer will be sorted from start to finish.
var needSort bool
values := make(Values, sz)
n := 0
for _, e := range entries {
if !needSort && n > 0 {
needSort = values[n-1].UnixNano() > e.values[0].UnixNano()
}
n += copy(values[n:], e.values)
}
if needSort {
values = values.Deduplicate()
}
return values
}

View File

@ -93,6 +93,78 @@ func TestCache_CacheValues(t *testing.T) {
}
}
func TestCache_CacheSnapshot(t *testing.T) {
v0 := NewValue(time.Unix(2, 0).UTC(), 0.0)
v1 := NewValue(time.Unix(3, 0).UTC(), 2.0)
v2 := NewValue(time.Unix(4, 0).UTC(), 3.0)
v3 := NewValue(time.Unix(5, 0).UTC(), 4.0)
v4 := NewValue(time.Unix(6, 0).UTC(), 5.0)
v5 := NewValue(time.Unix(1, 0).UTC(), 5.0)
c := NewCache(512)
if err := c.Write("foo", Values{v0, v1, v2, v3}); err != nil {
t.Fatalf("failed to write 3 values, key foo to cache: %s", err.Error())
}
// Grab snapshot, and ensure it's as expected.
snapshot := c.Snapshot()
expValues := Values{v0, v1, v2, v3}
if deduped := snapshot.values("foo"); !reflect.DeepEqual(expValues, deduped) {
t.Fatalf("snapshotted values for foo incorrect, exp: %v, got %v", expValues, deduped)
}
// Ensure cache is still as expected.
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
t.Fatalf("post-snapshot values for foo incorrect, exp: %v, got %v", expValues, deduped)
}
// Write a new value to the cache.
if err := c.Write("foo", Values{v4}); err != nil {
t.Fatalf("failed to write post-snap value, key foo to cache: %s", err.Error())
}
expValues = Values{v0, v1, v2, v3, v4}
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
t.Fatalf("post-snapshot write values for foo incorrect, exp: %v, got %v", expValues, deduped)
}
// Write a new, out-of-order, value to the cache.
if err := c.Write("foo", Values{v5}); err != nil {
t.Fatalf("failed to write post-snap value, key foo to cache: %s", err.Error())
}
expValues = Values{v5, v0, v1, v2, v3, v4}
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
t.Fatalf("post-snapshot out-of-order write values for foo incorrect, exp: %v, got %v", expValues, deduped)
}
// Clear snapshot, ensuring non-snapshot data untouched.
c.ClearSnapshot(snapshot)
expValues = Values{v5, v4}
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
t.Fatalf("post-clear values for foo incorrect, exp: %v, got %v", expValues, deduped)
}
}
func TestCache_CacheEmptySnapshot(t *testing.T) {
c := NewCache(512)
// Grab snapshot, and ensure it's as expected.
snapshot := c.Snapshot()
if deduped := snapshot.values("foo"); !reflect.DeepEqual(Values(nil), deduped) {
t.Fatalf("snapshotted values for foo incorrect, exp: %v, got %v", nil, deduped)
}
// Ensure cache is still as expected.
if deduped := c.Values("foo"); !reflect.DeepEqual(Values(nil), deduped) {
t.Fatalf("post-snapshotted values for foo incorrect, exp: %v, got %v", Values(nil), deduped)
}
// Clear snapshot.
c.ClearSnapshot(snapshot)
if deduped := c.Values("foo"); !reflect.DeepEqual(Values(nil), deduped) {
t.Fatalf("post-snapshot-clear values for foo incorrect, exp: %v, got %v", Values(nil), deduped)
}
}
func TestCache_CacheWriteMemoryExceeded(t *testing.T) {
v0 := NewValue(time.Unix(1, 0).UTC(), 1.0)
v1 := NewValue(time.Unix(2, 0).UTC(), 2.0)