Replace cursor direction with a type

pull/3986/head
Jason Wilder 2015-09-02 15:42:34 -06:00
parent 7c67e60c4f
commit 5a6b0afc4b
8 changed files with 93 additions and 63 deletions

View File

@ -5,13 +5,40 @@ import (
"container/heap" "container/heap"
) )
// Direction represents a cursor navigation direction.
type Direction bool
const (
// Forward indicates that a cursor will move forward over its values.
Forward Direction = true
// Reverse indicates that a cursor will move backwards over its values.
Reverse Direction = false
)
func (d Direction) String() string {
if d.Forward() {
return "forward"
}
return "reverse"
}
// Forward returns true if direction is forward
func (d Direction) Forward() bool {
return d == Forward
}
// Forward returns true if direction is reverse
func (d Direction) Reverse() bool {
return d == Reverse
}
// MultiCursor returns a single cursor that combines the results of all cursors in order. // MultiCursor returns a single cursor that combines the results of all cursors in order.
// //
// If the same key is returned from multiple cursors then the first cursor // If the same key is returned from multiple cursors then the first cursor
// specified will take precendence. A key will only be returned once from the // specified will take precendence. A key will only be returned once from the
// returned cursor. // returned cursor.
func MultiCursor(forward bool, cursors ...Cursor) Cursor { func MultiCursor(d Direction, cursors ...Cursor) Cursor {
return &multiCursor{cursors: cursors, forward: forward} return &multiCursor{cursors: cursors, direction: d}
} }
// multiCursor represents a cursor that combines multiple cursors into one. // multiCursor represents a cursor that combines multiple cursors into one.
@ -19,7 +46,7 @@ type multiCursor struct {
cursors []Cursor cursors []Cursor
heap cursorHeap heap cursorHeap
prev []byte prev []byte
forward bool direction Direction
} }
// Seek moves the cursor to a given key. // Seek moves the cursor to a given key.
@ -49,7 +76,7 @@ func (mc *multiCursor) Seek(seek []byte) (key, value []byte) {
return mc.pop() return mc.pop()
} }
func (mc *multiCursor) Direction() bool { return mc.forward } func (mc *multiCursor) Direction() Direction { return mc.direction }
// Next returns the next key/value from the cursor. // Next returns the next key/value from the cursor.
func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() } func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() }

View File

@ -212,27 +212,27 @@ func TestMultiCursor_Quick(t *testing.T) {
// Cursor represents an in-memory test cursor. // Cursor represents an in-memory test cursor.
type Cursor struct { type Cursor struct {
forward bool direction tsdb.Direction
items []CursorItem items []CursorItem
index int index int
} }
// NewCursor returns a new instance of Cursor. // NewCursor returns a new instance of Cursor.
func NewCursor(forward bool, items []CursorItem) *Cursor { func NewCursor(direction tsdb.Direction, items []CursorItem) *Cursor {
index := 0 index := 0
sort.Sort(CursorItems(items)) sort.Sort(CursorItems(items))
if !forward { if direction.Reverse() {
index = len(items) index = len(items)
} }
return &Cursor{forward: forward, items: items, index: index} return &Cursor{direction: direction, items: items, index: index}
} }
func (c *Cursor) Direction() bool { return c.forward } func (c *Cursor) Direction() tsdb.Direction { return c.direction }
// Seek seeks to an item by key. // Seek seeks to an item by key.
func (c *Cursor) Seek(seek []byte) (key, value []byte) { func (c *Cursor) Seek(seek []byte) (key, value []byte) {
if c.forward { if c.direction.Forward() {
return c.seekForward(seek) return c.seekForward(seek)
} }
return c.seekReverse(seek) return c.seekReverse(seek)
@ -260,17 +260,17 @@ func (c *Cursor) seekReverse(seek []byte) (key, value []byte) {
// Next returns the next key/value pair. // Next returns the next key/value pair.
func (c *Cursor) Next() (key, value []byte) { func (c *Cursor) Next() (key, value []byte) {
if !c.forward && c.index < 0 { if c.direction.Reverse() && c.index < 0 {
return nil, nil return nil, nil
} }
if c.forward && c.index >= len(c.items) { if c.direction.Forward() && c.index >= len(c.items) {
return nil, nil return nil, nil
} }
k, v := c.items[c.index].Key, c.items[c.index].Value k, v := c.items[c.index].Key, c.items[c.index].Value
if c.forward { if c.direction.Forward() {
c.index++ c.index++
} else { } else {
c.index-- c.index--
@ -281,7 +281,7 @@ func (c *Cursor) Next() (key, value []byte) {
// Generate returns a randomly generated cursor. Implements quick.Generator. // Generate returns a randomly generated cursor. Implements quick.Generator.
func (c Cursor) Generate(rand *rand.Rand, size int) reflect.Value { func (c Cursor) Generate(rand *rand.Rand, size int) reflect.Value {
c.index = 0 c.index = 0
c.forward = true c.direction = tsdb.Forward
c.items = make([]CursorItem, rand.Intn(size)) c.items = make([]CursorItem, rand.Intn(size))
for i := range c.items { for i := range c.items {

View File

@ -123,7 +123,7 @@ func NewEngineOptions() EngineOptions {
type Tx interface { type Tx interface {
io.WriterTo io.WriterTo
Cursor(series string, forward bool) Cursor Cursor(series string, direction Direction) Cursor
Size() int64 Size() int64
Commit() error Commit() error
Rollback() error Rollback() error
@ -133,7 +133,7 @@ type Tx interface {
type Cursor interface { type Cursor interface {
Seek(seek []byte) (key, value []byte) Seek(seek []byte) (key, value []byte)
Next() (key, value []byte) Next() (key, value []byte)
Direction() bool Direction() Direction
} }
// DedupeEntries returns slices with unique keys (the first 8 bytes). // DedupeEntries returns slices with unique keys (the first 8 bytes).

View File

@ -550,7 +550,7 @@ type Tx struct {
} }
// Cursor returns an iterator for a key. // Cursor returns an iterator for a key.
func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor { func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
// Retrieve key bucket. // Retrieve key bucket.
b := tx.Bucket([]byte(key)) b := tx.Bucket([]byte(key))
@ -591,7 +591,7 @@ type Cursor struct {
prev []byte prev []byte
} }
func (c *Cursor) Direction() bool { return true } func (c *Cursor) Direction() tsdb.Direction { return tsdb.Forward }
// Seek moves the cursor to a position and returns the closest key/value pair. // Seek moves the cursor to a position and returns the closest key/value pair.
func (c *Cursor) Seek(seek []byte) (key, value []byte) { func (c *Cursor) Seek(seek []byte) (key, value []byte) {

View File

@ -59,7 +59,7 @@ type WAL interface {
WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error
LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error
DeleteSeries(keys []string) error DeleteSeries(keys []string) error
Cursor(key string, forward bool) tsdb.Cursor Cursor(key string, direction tsdb.Direction) tsdb.Cursor
Open() error Open() error
Close() error Close() error
Flush() error Flush() error
@ -606,8 +606,8 @@ type Tx struct {
} }
// Cursor returns an iterator for a key. // Cursor returns an iterator for a key.
func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor { func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
walCursor := tx.wal.Cursor(key, forward) walCursor := tx.wal.Cursor(key, direction)
// Retrieve points bucket. Ignore if there is no bucket. // Retrieve points bucket. Ignore if there is no bucket.
b := tx.Bucket([]byte("points")).Bucket([]byte(key)) b := tx.Bucket([]byte("points")).Bucket([]byte(key))
@ -617,14 +617,14 @@ func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor {
c := &Cursor{ c := &Cursor{
cursor: b.Cursor(), cursor: b.Cursor(),
forward: forward, direction: direction,
} }
if !forward { if direction.Reverse() {
c.last() c.last()
} }
return tsdb.MultiCursor(forward, walCursor, c) return tsdb.MultiCursor(direction, walCursor, c)
} }
// Cursor provides ordered iteration across a series. // Cursor provides ordered iteration across a series.
@ -632,7 +632,7 @@ type Cursor struct {
cursor *bolt.Cursor cursor *bolt.Cursor
buf []byte // uncompressed buffer buf []byte // uncompressed buffer
off int // buffer offset off int // buffer offset
forward bool direction tsdb.Direction
fieldIndices []int fieldIndices []int
index int index int
} }
@ -642,7 +642,7 @@ func (c *Cursor) last() {
c.setBuf(v) c.setBuf(v)
} }
func (c *Cursor) Direction() bool { return c.forward } func (c *Cursor) Direction() tsdb.Direction { return c.direction }
// Seek moves the cursor to a position and returns the closest key/value pair. // Seek moves the cursor to a position and returns the closest key/value pair.
func (c *Cursor) Seek(seek []byte) (key, value []byte) { func (c *Cursor) Seek(seek []byte) (key, value []byte) {
@ -678,13 +678,13 @@ func (c *Cursor) seekBuf(seek []byte) (key, value []byte) {
return return
} }
if c.forward && bytes.Compare(buf[0:8], seek) != -1 { if c.direction.Forward() && bytes.Compare(buf[0:8], seek) != -1 {
return return
} else if !c.forward && bytes.Compare(buf[0:8], seek) != 1 { } else if c.direction.Reverse() && bytes.Compare(buf[0:8], seek) != 1 {
return return
} }
if c.forward { if c.direction.Forward() {
// Otherwise skip ahead to the next entry. // Otherwise skip ahead to the next entry.
c.off += entryHeaderSize + entryDataSize(buf) c.off += entryHeaderSize + entryDataSize(buf)
} else { } else {
@ -704,7 +704,7 @@ func (c *Cursor) Next() (key, value []byte) {
return nil, nil return nil, nil
} }
if c.forward { if c.direction.Forward() {
// Move forward to next entry. // Move forward to next entry.
c.off += entryHeaderSize + entryDataSize(c.buf[c.off:]) c.off += entryHeaderSize + entryDataSize(c.buf[c.off:])
} else { } else {
@ -745,7 +745,7 @@ func (c *Cursor) setBuf(block []byte) {
log.Printf("block decode error: %s", err) log.Printf("block decode error: %s", err)
} }
if c.forward { if c.direction.Forward() {
c.buf, c.off = buf, 0 c.buf, c.off = buf, 0
} else { } else {
c.buf, c.off = buf, 0 c.buf, c.off = buf, 0

View File

@ -582,18 +582,18 @@ func (w *EnginePointsWriter) Open() error { return nil }
func (w *EnginePointsWriter) Close() error { return nil } func (w *EnginePointsWriter) Close() error { return nil }
func (w *EnginePointsWriter) Cursor(key string, forward bool) tsdb.Cursor { func (w *EnginePointsWriter) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
return &Cursor{forward: forward} return &Cursor{direction: direction}
} }
func (w *EnginePointsWriter) Flush() error { return nil } func (w *EnginePointsWriter) Flush() error { return nil }
// Cursor represents a mock that implements tsdb.Curosr. // Cursor represents a mock that implements tsdb.Curosr.
type Cursor struct { type Cursor struct {
forward bool direction tsdb.Direction
} }
func (c *Cursor) Direction() bool { return c.forward } func (c *Cursor) Direction() tsdb.Direction { return c.direction }
func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil } func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil }

View File

@ -223,11 +223,11 @@ func (l *Log) Open() error {
} }
// Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given // Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given
func (l *Log) Cursor(key string, forward bool) tsdb.Cursor { func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor {
l.mu.RLock() l.mu.RLock()
defer l.mu.RUnlock() defer l.mu.RUnlock()
return l.partition([]byte(key)).cursor(key, forward) return l.partition([]byte(key)).cursor(key, direction)
} }
func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error {
@ -1380,7 +1380,7 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) {
} }
// cursor will combine the in memory cache and flush cache (if a flush is currently happening) to give a single ordered cursor for the key // cursor will combine the in memory cache and flush cache (if a flush is currently happening) to give a single ordered cursor for the key
func (p *Partition) cursor(key string, forward bool) *cursor { func (p *Partition) cursor(key string, direction tsdb.Direction) *cursor {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
@ -1398,7 +1398,7 @@ func (p *Partition) cursor(key string, forward bool) *cursor {
c = append(c, entry.points...) c = append(c, entry.points...)
dedupe := tsdb.DedupeEntries(c) dedupe := tsdb.DedupeEntries(c)
return newCursor(dedupe, forward) return newCursor(dedupe, direction)
} }
} }
@ -1410,7 +1410,7 @@ func (p *Partition) cursor(key string, forward bool) *cursor {
// build a copy so modifications to the partition don't change the result set // build a copy so modifications to the partition don't change the result set
a := make([][]byte, len(entry.points)) a := make([][]byte, len(entry.points))
copy(a, entry.points) copy(a, entry.points)
return newCursor(a, forward) return newCursor(a, direction)
} }
// idFromFileName parses the segment file ID from its name // idFromFileName parses the segment file ID from its name
@ -1589,22 +1589,22 @@ type entry struct {
timestamp int64 timestamp int64
} }
// cursor is a forward cursor for a given entry in the cache // cursor is a unidirectional iterator for a given entry in the cache
type cursor struct { type cursor struct {
cache [][]byte cache [][]byte
position int position int
forward bool direction tsdb.Direction
} }
func newCursor(cache [][]byte, forward bool) *cursor { func newCursor(cache [][]byte, direction tsdb.Direction) *cursor {
c := &cursor{cache: cache, forward: forward} c := &cursor{cache: cache, direction: direction}
if !forward { if direction.Reverse() {
c.position = len(c.cache) c.position = len(c.cache)
} }
return c return c
} }
func (c *cursor) Direction() bool { return c.forward } func (c *cursor) Direction() tsdb.Direction { return c.direction }
// Seek will point the cursor to the given time (or key) // Seek will point the cursor to the given time (or key)
func (c *cursor) Seek(seek []byte) (key, value []byte) { func (c *cursor) Seek(seek []byte) (key, value []byte) {
@ -1619,28 +1619,27 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) {
// Next moves the cursor to the next key/value. will return nil if at the end // Next moves the cursor to the next key/value. will return nil if at the end
func (c *cursor) Next() (key, value []byte) { func (c *cursor) Next() (key, value []byte) {
if !c.forward && c.position >= len(c.cache) { if c.direction.Reverse() && c.position >= len(c.cache) {
c.position-- c.position--
} }
if c.forward && c.position >= len(c.cache) { if c.direction.Forward() && c.position >= len(c.cache) {
return nil, nil return nil, nil
} }
if !c.forward && c.position < 0 { if c.direction.Reverse() && c.position < 0 {
return nil, nil return nil, nil
} }
v := c.cache[c.position] v := c.cache[c.position]
if c.forward { if c.direction.Forward() {
c.position++ c.position++
} else { } else {
c.position-- c.position--
} }
return v[0:8], v[8:] return v[0:8], v[8:]
} }
// seriesAndFields is a data struct to serialize new series and fields // seriesAndFields is a data struct to serialize new series and fields

View File

@ -218,9 +218,13 @@ func (lm *SelectMapper) Open() error {
} }
} }
forward := true direction := Forward
if len(lm.selectStmt.SortFields) > 0 { if len(lm.selectStmt.SortFields) > 0 {
forward = lm.selectStmt.SortFields[0].Ascending if lm.selectStmt.SortFields[0].Ascending {
direction = Forward
} else {
direction = Reverse
}
} }
// Create all cursors for reading the data from this shard. // Create all cursors for reading the data from this shard.
@ -228,7 +232,7 @@ func (lm *SelectMapper) Open() error {
cursors := []*seriesCursor{} cursors := []*seriesCursor{}
for i, key := range t.SeriesKeys { for i, key := range t.SeriesKeys {
c := lm.tx.Cursor(key, forward) c := lm.tx.Cursor(key, direction)
if c == nil { if c == nil {
// No data exists for this key. // No data exists for this key.
continue continue
@ -245,7 +249,7 @@ func (lm *SelectMapper) Open() error {
for i := 0; i < len(tsc.cursors); i++ { for i := 0; i < len(tsc.cursors); i++ {
var k int64 var k int64
var v []byte var v []byte
if forward { if direction == Forward {
k, v = tsc.cursors[i].SeekTo(lm.queryTMin) k, v = tsc.cursors[i].SeekTo(lm.queryTMin)
if k == -1 { if k == -1 {
k, v = tsc.cursors[i].Next() k, v = tsc.cursors[i].Next()