diff --git a/tsdb/cursor.go b/tsdb/cursor.go index e5ac15e823..b1f0e771fa 100644 --- a/tsdb/cursor.go +++ b/tsdb/cursor.go @@ -5,21 +5,48 @@ import ( "container/heap" ) +// Direction represents a cursor navigation direction. +type Direction bool + +const ( + // Forward indicates that a cursor will move forward over its values. + Forward Direction = true + // Reverse indicates that a cursor will move backwards over its values. + Reverse Direction = false +) + +func (d Direction) String() string { + if d.Forward() { + return "forward" + } + return "reverse" +} + +// Forward returns true if direction is forward +func (d Direction) Forward() bool { + return d == Forward +} + +// Forward returns true if direction is reverse +func (d Direction) Reverse() bool { + return d == Reverse +} + // MultiCursor returns a single cursor that combines the results of all cursors in order. // // If the same key is returned from multiple cursors then the first cursor // specified will take precendence. A key will only be returned once from the // returned cursor. -func MultiCursor(forward bool, cursors ...Cursor) Cursor { - return &multiCursor{cursors: cursors, forward: forward} +func MultiCursor(d Direction, cursors ...Cursor) Cursor { + return &multiCursor{cursors: cursors, direction: d} } // multiCursor represents a cursor that combines multiple cursors into one. type multiCursor struct { - cursors []Cursor - heap cursorHeap - prev []byte - forward bool + cursors []Cursor + heap cursorHeap + prev []byte + direction Direction } // Seek moves the cursor to a given key. @@ -49,7 +76,7 @@ func (mc *multiCursor) Seek(seek []byte) (key, value []byte) { return mc.pop() } -func (mc *multiCursor) Direction() bool { return mc.forward } +func (mc *multiCursor) Direction() Direction { return mc.direction } // Next returns the next key/value from the cursor. func (mc *multiCursor) Next() (key, value []byte) { return mc.pop() } diff --git a/tsdb/cursor_test.go b/tsdb/cursor_test.go index 8ba3f57c90..5f5d053845 100644 --- a/tsdb/cursor_test.go +++ b/tsdb/cursor_test.go @@ -212,27 +212,27 @@ func TestMultiCursor_Quick(t *testing.T) { // Cursor represents an in-memory test cursor. type Cursor struct { - forward bool - items []CursorItem - index int + direction tsdb.Direction + items []CursorItem + index int } // NewCursor returns a new instance of Cursor. -func NewCursor(forward bool, items []CursorItem) *Cursor { +func NewCursor(direction tsdb.Direction, items []CursorItem) *Cursor { index := 0 sort.Sort(CursorItems(items)) - if !forward { + if direction.Reverse() { index = len(items) } - return &Cursor{forward: forward, items: items, index: index} + return &Cursor{direction: direction, items: items, index: index} } -func (c *Cursor) Direction() bool { return c.forward } +func (c *Cursor) Direction() tsdb.Direction { return c.direction } // Seek seeks to an item by key. func (c *Cursor) Seek(seek []byte) (key, value []byte) { - if c.forward { + if c.direction.Forward() { return c.seekForward(seek) } return c.seekReverse(seek) @@ -260,17 +260,17 @@ func (c *Cursor) seekReverse(seek []byte) (key, value []byte) { // Next returns the next key/value pair. func (c *Cursor) Next() (key, value []byte) { - if !c.forward && c.index < 0 { + if c.direction.Reverse() && c.index < 0 { return nil, nil } - if c.forward && c.index >= len(c.items) { + if c.direction.Forward() && c.index >= len(c.items) { return nil, nil } k, v := c.items[c.index].Key, c.items[c.index].Value - if c.forward { + if c.direction.Forward() { c.index++ } else { c.index-- @@ -281,7 +281,7 @@ func (c *Cursor) Next() (key, value []byte) { // Generate returns a randomly generated cursor. Implements quick.Generator. func (c Cursor) Generate(rand *rand.Rand, size int) reflect.Value { c.index = 0 - c.forward = true + c.direction = tsdb.Forward c.items = make([]CursorItem, rand.Intn(size)) for i := range c.items { diff --git a/tsdb/engine.go b/tsdb/engine.go index d902a6d21a..2eee3c556a 100644 --- a/tsdb/engine.go +++ b/tsdb/engine.go @@ -123,7 +123,7 @@ func NewEngineOptions() EngineOptions { type Tx interface { io.WriterTo - Cursor(series string, forward bool) Cursor + Cursor(series string, direction Direction) Cursor Size() int64 Commit() error Rollback() error @@ -133,7 +133,7 @@ type Tx interface { type Cursor interface { Seek(seek []byte) (key, value []byte) Next() (key, value []byte) - Direction() bool + Direction() Direction } // DedupeEntries returns slices with unique keys (the first 8 bytes). diff --git a/tsdb/engine/b1/b1.go b/tsdb/engine/b1/b1.go index 46727b183a..f5a7b17a6f 100644 --- a/tsdb/engine/b1/b1.go +++ b/tsdb/engine/b1/b1.go @@ -550,7 +550,7 @@ type Tx struct { } // Cursor returns an iterator for a key. -func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor { +func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { // Retrieve key bucket. b := tx.Bucket([]byte(key)) @@ -591,7 +591,7 @@ type Cursor struct { prev []byte } -func (c *Cursor) Direction() bool { return true } +func (c *Cursor) Direction() tsdb.Direction { return tsdb.Forward } // Seek moves the cursor to a position and returns the closest key/value pair. func (c *Cursor) Seek(seek []byte) (key, value []byte) { diff --git a/tsdb/engine/bz1/bz1.go b/tsdb/engine/bz1/bz1.go index d828ca5b26..49de4525b6 100644 --- a/tsdb/engine/bz1/bz1.go +++ b/tsdb/engine/bz1/bz1.go @@ -59,7 +59,7 @@ type WAL interface { WritePoints(points []tsdb.Point, measurementFieldsToSave map[string]*tsdb.MeasurementFields, seriesToCreate []*tsdb.SeriesCreate) error LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields map[string]*tsdb.MeasurementFields) error DeleteSeries(keys []string) error - Cursor(key string, forward bool) tsdb.Cursor + Cursor(key string, direction tsdb.Direction) tsdb.Cursor Open() error Close() error Flush() error @@ -606,8 +606,8 @@ type Tx struct { } // Cursor returns an iterator for a key. -func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor { - walCursor := tx.wal.Cursor(key, forward) +func (tx *Tx) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { + walCursor := tx.wal.Cursor(key, direction) // Retrieve points bucket. Ignore if there is no bucket. b := tx.Bucket([]byte("points")).Bucket([]byte(key)) @@ -616,15 +616,15 @@ func (tx *Tx) Cursor(key string, forward bool) tsdb.Cursor { } c := &Cursor{ - cursor: b.Cursor(), - forward: forward, + cursor: b.Cursor(), + direction: direction, } - if !forward { + if direction.Reverse() { c.last() } - return tsdb.MultiCursor(forward, walCursor, c) + return tsdb.MultiCursor(direction, walCursor, c) } // Cursor provides ordered iteration across a series. @@ -632,7 +632,7 @@ type Cursor struct { cursor *bolt.Cursor buf []byte // uncompressed buffer off int // buffer offset - forward bool + direction tsdb.Direction fieldIndices []int index int } @@ -642,7 +642,7 @@ func (c *Cursor) last() { c.setBuf(v) } -func (c *Cursor) Direction() bool { return c.forward } +func (c *Cursor) Direction() tsdb.Direction { return c.direction } // Seek moves the cursor to a position and returns the closest key/value pair. func (c *Cursor) Seek(seek []byte) (key, value []byte) { @@ -678,13 +678,13 @@ func (c *Cursor) seekBuf(seek []byte) (key, value []byte) { return } - if c.forward && bytes.Compare(buf[0:8], seek) != -1 { + if c.direction.Forward() && bytes.Compare(buf[0:8], seek) != -1 { return - } else if !c.forward && bytes.Compare(buf[0:8], seek) != 1 { + } else if c.direction.Reverse() && bytes.Compare(buf[0:8], seek) != 1 { return } - if c.forward { + if c.direction.Forward() { // Otherwise skip ahead to the next entry. c.off += entryHeaderSize + entryDataSize(buf) } else { @@ -704,7 +704,7 @@ func (c *Cursor) Next() (key, value []byte) { return nil, nil } - if c.forward { + if c.direction.Forward() { // Move forward to next entry. c.off += entryHeaderSize + entryDataSize(c.buf[c.off:]) } else { @@ -745,7 +745,7 @@ func (c *Cursor) setBuf(block []byte) { log.Printf("block decode error: %s", err) } - if c.forward { + if c.direction.Forward() { c.buf, c.off = buf, 0 } else { c.buf, c.off = buf, 0 diff --git a/tsdb/engine/bz1/bz1_test.go b/tsdb/engine/bz1/bz1_test.go index 6f7ecb1dc3..fbcabb8ccb 100644 --- a/tsdb/engine/bz1/bz1_test.go +++ b/tsdb/engine/bz1/bz1_test.go @@ -582,18 +582,18 @@ func (w *EnginePointsWriter) Open() error { return nil } func (w *EnginePointsWriter) Close() error { return nil } -func (w *EnginePointsWriter) Cursor(key string, forward bool) tsdb.Cursor { - return &Cursor{forward: forward} +func (w *EnginePointsWriter) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { + return &Cursor{direction: direction} } func (w *EnginePointsWriter) Flush() error { return nil } // Cursor represents a mock that implements tsdb.Curosr. type Cursor struct { - forward bool + direction tsdb.Direction } -func (c *Cursor) Direction() bool { return c.forward } +func (c *Cursor) Direction() tsdb.Direction { return c.direction } func (c *Cursor) Seek(key []byte) ([]byte, []byte) { return nil, nil } diff --git a/tsdb/engine/wal/wal.go b/tsdb/engine/wal/wal.go index 8a22d4f40a..bdf8bf82c0 100644 --- a/tsdb/engine/wal/wal.go +++ b/tsdb/engine/wal/wal.go @@ -223,11 +223,11 @@ func (l *Log) Open() error { } // Cursor will return a cursor object to Seek and iterate with Next for the WAL cache for the given -func (l *Log) Cursor(key string, forward bool) tsdb.Cursor { +func (l *Log) Cursor(key string, direction tsdb.Direction) tsdb.Cursor { l.mu.RLock() defer l.mu.RUnlock() - return l.partition([]byte(key)).cursor(key, forward) + return l.partition([]byte(key)).cursor(key, direction) } func (l *Log) WritePoints(points []tsdb.Point, fields map[string]*tsdb.MeasurementFields, series []*tsdb.SeriesCreate) error { @@ -1380,7 +1380,7 @@ func (p *Partition) addToCache(key, data []byte, timestamp int64) { } // cursor will combine the in memory cache and flush cache (if a flush is currently happening) to give a single ordered cursor for the key -func (p *Partition) cursor(key string, forward bool) *cursor { +func (p *Partition) cursor(key string, direction tsdb.Direction) *cursor { p.mu.Lock() defer p.mu.Unlock() @@ -1398,7 +1398,7 @@ func (p *Partition) cursor(key string, forward bool) *cursor { c = append(c, entry.points...) dedupe := tsdb.DedupeEntries(c) - return newCursor(dedupe, forward) + return newCursor(dedupe, direction) } } @@ -1410,7 +1410,7 @@ func (p *Partition) cursor(key string, forward bool) *cursor { // build a copy so modifications to the partition don't change the result set a := make([][]byte, len(entry.points)) copy(a, entry.points) - return newCursor(a, forward) + return newCursor(a, direction) } // idFromFileName parses the segment file ID from its name @@ -1589,22 +1589,22 @@ type entry struct { timestamp int64 } -// cursor is a forward cursor for a given entry in the cache +// cursor is a unidirectional iterator for a given entry in the cache type cursor struct { - cache [][]byte - position int - forward bool + cache [][]byte + position int + direction tsdb.Direction } -func newCursor(cache [][]byte, forward bool) *cursor { - c := &cursor{cache: cache, forward: forward} - if !forward { +func newCursor(cache [][]byte, direction tsdb.Direction) *cursor { + c := &cursor{cache: cache, direction: direction} + if direction.Reverse() { c.position = len(c.cache) } return c } -func (c *cursor) Direction() bool { return c.forward } +func (c *cursor) Direction() tsdb.Direction { return c.direction } // Seek will point the cursor to the given time (or key) func (c *cursor) Seek(seek []byte) (key, value []byte) { @@ -1619,28 +1619,27 @@ func (c *cursor) Seek(seek []byte) (key, value []byte) { // Next moves the cursor to the next key/value. will return nil if at the end func (c *cursor) Next() (key, value []byte) { - if !c.forward && c.position >= len(c.cache) { + if c.direction.Reverse() && c.position >= len(c.cache) { c.position-- } - if c.forward && c.position >= len(c.cache) { + if c.direction.Forward() && c.position >= len(c.cache) { return nil, nil } - if !c.forward && c.position < 0 { + if c.direction.Reverse() && c.position < 0 { return nil, nil } v := c.cache[c.position] - if c.forward { + if c.direction.Forward() { c.position++ } else { c.position-- } return v[0:8], v[8:] - } // seriesAndFields is a data struct to serialize new series and fields diff --git a/tsdb/mapper.go b/tsdb/mapper.go index 68ac80cb25..004943f062 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -218,9 +218,13 @@ func (lm *SelectMapper) Open() error { } } - forward := true + direction := Forward if len(lm.selectStmt.SortFields) > 0 { - forward = lm.selectStmt.SortFields[0].Ascending + if lm.selectStmt.SortFields[0].Ascending { + direction = Forward + } else { + direction = Reverse + } } // Create all cursors for reading the data from this shard. @@ -228,7 +232,7 @@ func (lm *SelectMapper) Open() error { cursors := []*seriesCursor{} for i, key := range t.SeriesKeys { - c := lm.tx.Cursor(key, forward) + c := lm.tx.Cursor(key, direction) if c == nil { // No data exists for this key. continue @@ -245,7 +249,7 @@ func (lm *SelectMapper) Open() error { for i := 0; i < len(tsc.cursors); i++ { var k int64 var v []byte - if forward { + if direction == Forward { k, v = tsc.cursors[i].SeekTo(lm.queryTMin) if k == -1 { k, v = tsc.cursors[i].Next()