diff --git a/tsdb/engine/b1/b1.go b/tsdb/engine/b1/b1.go index f5a7b17a6f..b971696b99 100644 --- a/tsdb/engine/b1/b1.go +++ b/tsdb/engine/b1/b1.go @@ -568,10 +568,18 @@ func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { copy(cache, tx.engine.cache[partitionID][key]) // Build a cursor that merges the bucket and cache together. - cur := &Cursor{cache: cache} + cur := &Cursor{cache: cache, direction: direction} if b != nil { cur.cursor = b.Cursor() } + + // If it's a reverse cursor, set the current location to the end. + if direction.Reverse() { + cur.index = len(cache) - 1 + if cur.cursor != nil { + cur.cursor.Last() + } + } return cur } @@ -589,9 +597,12 @@ type Cursor struct { // Previously read key. prev []byte + + // The direction the cursor pointer moves after each call to Next() + direction tsdb.Direction } -func (c *Cursor) Direction() tsdb.Direction { return tsdb.Forward } +func (c *Cursor) Direction() tsdb.Direction { return c.direction } // Seek moves the cursor to a position and returns the closest key/value pair. func (c *Cursor) Seek(seek []byte) (key, value []byte) { @@ -605,6 +616,12 @@ func (c *Cursor) Seek(seek []byte) (key, value []byte) { return bytes.Compare(c.cache[i][0:8], seek) != -1 }) + // Search will return an index after the length of cache if the seek value is greater + // than all the values. Clamp it to the end of the cache. + if c.direction.Reverse() && c.index >= len(c.cache) { + c.index = len(c.cache) - 1 + } + c.prev = nil return c.read() } @@ -618,20 +635,10 @@ func (c *Cursor) Next() (key, value []byte) { func (c *Cursor) read() (key, value []byte) { // Continue skipping ahead through duplicate keys in the cache list. for { - // Read next value from the cursor. - if c.buf.key == nil && c.cursor != nil { - c.buf.key, c.buf.value = c.cursor.Next() - } - - // Read from the buffer or cache, which ever is lower. - if c.buf.key != nil && (c.index >= len(c.cache) || bytes.Compare(c.buf.key, c.cache[c.index][0:8]) == -1) { - key, value = c.buf.key, c.buf.value - c.buf.key, c.buf.value = nil, nil - } else if c.index < len(c.cache) { - key, value = c.cache[c.index][0:8], c.cache[c.index][8:] - c.index++ + if c.direction.Forward() { + key, value = c.readForward() } else { - key, value = nil, nil + key, value = c.readReverse() } // Exit loop if we're at the end of the cache or the next key is different. @@ -644,6 +651,46 @@ func (c *Cursor) read() (key, value []byte) { return } +// readForward returns the next key/value from the cursor and moves the current location forward. +func (c *Cursor) readForward() (key, value []byte) { + // Read next value from the cursor. + if c.buf.key == nil && c.cursor != nil { + c.buf.key, c.buf.value = c.cursor.Next() + } + + // Read from the buffer or cache, which ever is lower. + if c.buf.key != nil && (c.index >= len(c.cache) || bytes.Compare(c.buf.key, c.cache[c.index][0:8]) == -1) { + key, value = c.buf.key, c.buf.value + c.buf.key, c.buf.value = nil, nil + } else if c.index < len(c.cache) { + key, value = c.cache[c.index][0:8], c.cache[c.index][8:] + c.index++ + } else { + key, value = nil, nil + } + return +} + +// readReverse returns the next key/value from the cursor and moves the current location backwards. +func (c *Cursor) readReverse() (key, value []byte) { + // Read prev value from the cursor. + if c.buf.key == nil && c.cursor != nil { + c.buf.key, c.buf.value = c.cursor.Prev() + } + + // Read from the buffer or cache, which ever is lower. + if c.buf.key != nil && (c.index < 0 || bytes.Compare(c.buf.key, c.cache[c.index][0:8]) == 1) { + key, value = c.buf.key, c.buf.value + c.buf.key, c.buf.value = nil, nil + } else if c.index >= 0 && c.index < len(c.cache) { + key, value = c.cache[c.index][0:8], c.cache[c.index][8:] + c.index-- + } else { + key, value = nil, nil + } + return +} + // WALPartitionN is the number of partitions in the write ahead log. const WALPartitionN = 8 diff --git a/tsdb/engine/b1/b1_test.go b/tsdb/engine/b1/b1_test.go index f8860cfb8f..d664f4ee28 100644 --- a/tsdb/engine/b1/b1_test.go +++ b/tsdb/engine/b1/b1_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/binary" "io/ioutil" + "math" "os" "testing" "time" @@ -64,7 +65,7 @@ func TestEngine_WritePoints(t *testing.T) { tx := e.MustBegin(false) defer tx.Rollback() - c := tx.Cursor("temperature", true) + c := tx.Cursor("temperature", tsdb.Forward) if k, v := c.Seek([]byte{0}); !bytes.Equal(k, u64tob(uint64(time.Unix(1434059627, 0).UnixNano()))) { t.Fatalf("unexpected key: %#v", k) } else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil { @@ -78,6 +79,73 @@ func TestEngine_WritePoints(t *testing.T) { } } +// Ensure points can be written to the engine and queried in reverse order. +func TestEngine_WritePoints_Reverse(t *testing.T) { + e := OpenDefaultEngine() + defer e.Close() + + // Create metadata. + mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)} + mf.CreateFieldIfNotExists("value", influxql.Float) + seriesToCreate := []*tsdb.SeriesCreate{ + {Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)}, + } + + // Parse point. + points, err := tsdb.ParsePointsWithPrecision([]byte("temperature value=100 0"), time.Now().UTC(), "s") + if err != nil { + t.Fatal(err) + } else if data, err := mf.Codec.EncodeFields(points[0].Fields()); err != nil { + t.Fatal(err) + } else { + points[0].SetData(data) + } + + // Write original value. + if err := e.WritePoints(points, map[string]*tsdb.MeasurementFields{"temperature": mf}, seriesToCreate); err != nil { + t.Fatal(err) + } + + // Flush to disk. + if err := e.Flush(0); err != nil { + t.Fatal(err) + } + + // Parse new point. + points, err = tsdb.ParsePointsWithPrecision([]byte("temperature value=200 1"), time.Now().UTC(), "s") + if err != nil { + t.Fatal(err) + } else if data, err := mf.Codec.EncodeFields(points[0].Fields()); err != nil { + t.Fatal(err) + } else { + points[0].SetData(data) + } + + // Write the new points existing value. + if err := e.WritePoints(points, nil, nil); err != nil { + t.Fatal(err) + } + + // Ensure only the updated value is read. + tx := e.MustBegin(false) + defer tx.Rollback() + + c := tx.Cursor("temperature", tsdb.Reverse) + if k, _ := c.Seek(u64tob(math.MaxInt64)); !bytes.Equal(k, u64tob(uint64(time.Unix(1, 0).UnixNano()))) { + t.Fatalf("unexpected key: %v", btou64(k)) + } else if k, v := c.Next(); !bytes.Equal(k, u64tob(uint64(time.Unix(0, 0).UnixNano()))) { + t.Fatalf("unexpected key: %#v", k) + } else if m, err := mf.Codec.DecodeFieldsWithNames(v); err != nil { + t.Fatal(err) + } else if m["value"] != float64(100) { + t.Errorf("unexpected value: %#v", m) + } + + if k, v := c.Next(); k != nil { + t.Fatalf("unexpected key/value: %#v / %#v", k, v) + } +} + // Engine represents a test wrapper for b1.Engine. type Engine struct { *b1.Engine