Support reverse iteration for b1 engine

pull/3986/head
Jason Wilder 2015-09-03 10:09:41 -06:00
parent 2725757dba
commit 7fa3d445f7
2 changed files with 131 additions and 16 deletions

View File

@ -568,10 +568,18 @@ func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
copy(cache, tx.engine.cache[partitionID][key])
// Build a cursor that merges the bucket and cache together.
cur := &Cursor{cache: cache}
cur := &Cursor{cache: cache, direction: direction}
if b != nil {
cur.cursor = b.Cursor()
}
// If it's a reverse cursor, set the current location to the end.
if direction.Reverse() {
cur.index = len(cache) - 1
if cur.cursor != nil {
cur.cursor.Last()
}
}
return cur
}
@ -589,9 +597,12 @@ type Cursor struct {
// Previously read key.
prev []byte
// The direction the cursor pointer moves after each call to Next()
direction tsdb.Direction
}
func (c *Cursor) Direction() tsdb.Direction { return tsdb.Forward }
func (c *Cursor) Direction() tsdb.Direction { return c.direction }
// Seek moves the cursor to a position and returns the closest key/value pair.
func (c *Cursor) Seek(seek []byte) (key, value []byte) {
@ -605,6 +616,12 @@ func (c *Cursor) Seek(seek []byte) (key, value []byte) {
return bytes.Compare(c.cache[i][0:8], seek) != -1
})
// Search will return an index after the length of cache if the seek value is greater
// than all the values. Clamp it to the end of the cache.
if c.direction.Reverse() && c.index >= len(c.cache) {
c.index = len(c.cache) - 1
}
c.prev = nil
return c.read()
}
@ -618,20 +635,10 @@ func (c *Cursor) Next() (key, value []byte) {
func (c *Cursor) read() (key, value []byte) {
// Continue skipping ahead through duplicate keys in the cache list.
for {
// Read next value from the cursor.
if c.buf.key == nil && c.cursor != nil {
c.buf.key, c.buf.value = c.cursor.Next()
}
// Read from the buffer or cache, which ever is lower.
if c.buf.key != nil && (c.index >= len(c.cache) || bytes.Compare(c.buf.key, c.cache[c.index][0:8]) == -1) {
key, value = c.buf.key, c.buf.value
c.buf.key, c.buf.value = nil, nil
} else if c.index < len(c.cache) {
key, value = c.cache[c.index][0:8], c.cache[c.index][8:]
c.index++
if c.direction.Forward() {
key, value = c.readForward()
} else {
key, value = nil, nil
key, value = c.readReverse()
}
// Exit loop if we're at the end of the cache or the next key is different.
@ -644,6 +651,46 @@ func (c *Cursor) read() (key, value []byte) {
return
}
// readForward returns the next key/value from the cursor and moves the current location forward.
func (c *Cursor) readForward() (key, value []byte) {
// Read next value from the cursor.
if c.buf.key == nil && c.cursor != nil {
c.buf.key, c.buf.value = c.cursor.Next()
}
// Read from the buffer or cache, which ever is lower.
if c.buf.key != nil && (c.index >= len(c.cache) || bytes.Compare(c.buf.key, c.cache[c.index][0:8]) == -1) {
key, value = c.buf.key, c.buf.value
c.buf.key, c.buf.value = nil, nil
} else if c.index < len(c.cache) {
key, value = c.cache[c.index][0:8], c.cache[c.index][8:]
c.index++
} else {
key, value = nil, nil
}
return
}
// readReverse returns the next key/value from the cursor and moves the current location backwards.
func (c *Cursor) readReverse() (key, value []byte) {
// Read prev value from the cursor.
if c.buf.key == nil && c.cursor != nil {
c.buf.key, c.buf.value = c.cursor.Prev()
}
// Read from the buffer or cache, which ever is lower.
if c.buf.key != nil && (c.index < 0 || bytes.Compare(c.buf.key, c.cache[c.index][0:8]) == 1) {
key, value = c.buf.key, c.buf.value
c.buf.key, c.buf.value = nil, nil
} else if c.index >= 0 && c.index < len(c.cache) {
key, value = c.cache[c.index][0:8], c.cache[c.index][8:]
c.index--
} else {
key, value = nil, nil
}
return
}
// WALPartitionN is the number of partitions in the write ahead log.
const WALPartitionN = 8

View File

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"io/ioutil"
"math"
"os"
"testing"
"time"
@ -64,7 +65,7 @@ func TestEngine_WritePoints(t *testing.T) {
tx := e.MustBegin(false)
defer tx.Rollback()
c := tx.Cursor("temperature", true)
c := tx.Cursor("temperature", tsdb.Forward)
if k, v := c.Seek([]byte{0}); !bytes.Equal(k, u64tob(uint64(time.Unix(1434059627, 0).UnixNano()))) {
t.Fatalf("unexpected key: %#v", k)
} else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil {
@ -78,6 +79,73 @@ func TestEngine_WritePoints(t *testing.T) {
}
}
// Ensure points can be written to the engine and queried in reverse order.
func TestEngine_WritePoints_Reverse(t *testing.T) {
e := OpenDefaultEngine()
defer e.Close()
// Create metadata.
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
mf.CreateFieldIfNotExists("value", influxql.Float)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)},
}
// Parse point.
points, err := tsdb.ParsePointsWithPrecision([]byte("temperature value=100 0"), time.Now().UTC(), "s")
if err != nil {
t.Fatal(err)
} else if data, err := mf.Codec.EncodeFields(points[0].Fields()); err != nil {
t.Fatal(err)
} else {
points[0].SetData(data)
}
// Write original value.
if err := e.WritePoints(points, map[string]*tsdb.MeasurementFields{"temperature": mf}, seriesToCreate); err != nil {
t.Fatal(err)
}
// Flush to disk.
if err := e.Flush(0); err != nil {
t.Fatal(err)
}
// Parse new point.
points, err = tsdb.ParsePointsWithPrecision([]byte("temperature value=200 1"), time.Now().UTC(), "s")
if err != nil {
t.Fatal(err)
} else if data, err := mf.Codec.EncodeFields(points[0].Fields()); err != nil {
t.Fatal(err)
} else {
points[0].SetData(data)
}
// Write the new points existing value.
if err := e.WritePoints(points, nil, nil); err != nil {
t.Fatal(err)
}
// Ensure only the updated value is read.
tx := e.MustBegin(false)
defer tx.Rollback()
c := tx.Cursor("temperature", tsdb.Reverse)
if k, _ := c.Seek(u64tob(math.MaxInt64)); !bytes.Equal(k, u64tob(uint64(time.Unix(1, 0).UnixNano()))) {
t.Fatalf("unexpected key: %v", btou64(k))
} else if k, v := c.Next(); !bytes.Equal(k, u64tob(uint64(time.Unix(0, 0).UnixNano()))) {
t.Fatalf("unexpected key: %#v", k)
} else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil {
t.Fatal(err)
} else if m["value"] != float64(100) {
t.Errorf("unexpected value: %#v", m)
}
if k, v := c.Next(); k != nil {
t.Fatalf("unexpected key/value: %#v / %#v", k, v)
}
}
// Engine represents a test wrapper for b1.Engine.
type Engine struct {
*b1.Engine