Cache values supports sorting order

pull/4932/head
Philip O'Toole 2015-11-30 16:29:38 -08:00
parent 3a72e40e3f
commit fc83968e2e
4 changed files with 25 additions and 10 deletions

View File

@ -198,8 +198,8 @@ func (c *Cache) Evict() {
c.evict()
}
// Values returns a copy of all values, deduped and sorted, for the given key.
func (c *Cache) Values(key string) Values {
// Values returns a copy of all values, deduped and sorted as requested, for the given key.
func (c *Cache) Values(key string, ascending bool) Values {
values := func() Values {
c.mu.RLock()
defer c.mu.RUnlock()
@ -215,7 +215,7 @@ func (c *Cache) Values(key string) Values {
if values == nil {
return nil
}
return values.Deduplicate(true)
return values.Deduplicate(ascending)
}
// evict instructs the cache to evict data up to and including the current checkpoint.

View File

@ -171,7 +171,7 @@ func Test_CacheValues(t *testing.T) {
v4 := NewValue(time.Unix(4, 0).UTC(), 4.0)
c := MustNewCache(512)
if deduped := c.Values("no such key"); deduped != nil {
if deduped := c.Values("no such key", true); deduped != nil {
t.Fatalf("Values returned for no such key")
}
@ -182,9 +182,14 @@ func Test_CacheValues(t *testing.T) {
t.Fatalf("failed to write 1 value, key foo to cache: %s", err.Error())
}
expValues := Values{v3, v1, v2, v4}
if deduped := c.Values("foo"); !reflect.DeepEqual(expValues, deduped) {
t.Fatalf("deduped values for foo incorrect, exp: %v, got %v", expValues, deduped)
expAscValues := Values{v3, v1, v2, v4}
if deduped := c.Values("foo", true); !reflect.DeepEqual(expAscValues, deduped) {
t.Fatalf("deduped ascending values for foo incorrect, exp: %v, got %v", expAscValues, deduped)
}
expDescValues := Values{v4, v2, v1, v3}
if deduped := c.Values("foo", false); !reflect.DeepEqual(expDescValues, deduped) {
t.Fatalf("deduped descending values for foo incorrect, exp: %v, got %v", expDescValues, deduped)
}
}

View File

@ -214,7 +214,7 @@ type devTx struct {
// Cursor returns a cursor for all cached and TSM-based data.
func (t *devTx) Cursor(series string, fields []string, dec *tsdb.FieldCodec, ascending bool) tsdb.Cursor {
return &devCursor{
cache: t.engine.Cache.Values(SeriesFieldKey(series, fields[0])),
cache: t.engine.Cache.Values(SeriesFieldKey(series, fields[0]), ascending),
ascending: ascending,
}
}

View File

@ -96,12 +96,22 @@ func Test_DevEngineCacheQueryDescending(t *testing.T) {
// Start a query transactions and get a cursor.
tx := devTx{engine: e.(*DevEngine)}
ascCursor := tx.Cursor("cpu,host=A", []string{"value"}, nil, false)
descCursor := tx.Cursor("cpu,host=A", []string{"value"}, nil, false)
k, v := ascCursor.SeekTo(4000000000)
k, v := descCursor.SeekTo(4000000000)
if k != 3000000000 {
t.Fatalf("failed to seek to before last key: %v %v", k, v)
}
k, v = descCursor.Next()
if k != 2000000000 {
t.Fatalf("failed to get next key: %v %v", k, v)
}
k, v = descCursor.SeekTo(1)
if k != -1 {
t.Fatalf("failed to seek to after first key: %v %v", k, v)
}
}
func parsePoints(buf string) []models.Point {