diff --git a/tsdb/mapper.go b/tsdb/mapper.go index 6bcff36251..818759aacb 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -228,19 +228,21 @@ func (lm *SelectMapper) Open() error { } tsc := newTagSetCursor(m.Name, t.Tags, cursors, lm.shard.FieldCodec(m.Name)) - tsc.pointHeap = newPointHeap() - //Prime the buffers. - for i := 0; i < len(tsc.cursors); i++ { - k, v := tsc.cursors[i].SeekTo(lm.queryTMin) - if k == -1 { - continue + if lm.rawMode { + tsc.pointHeap = newPointHeap() + //Prime the buffers. + for i := 0; i < len(tsc.cursors); i++ { + k, v := tsc.cursors[i].SeekTo(lm.queryTMin) + if k == -1 { + continue + } + p := &pointHeapItem{ + timestamp: k, + value: v, + cursor: tsc.cursors[i], + } + heap.Push(tsc.pointHeap, p) } - p := &pointHeapItem{ - timestamp: k, - value: v, - cursor: tsc.cursors[i], - } - heap.Push(tsc.pointHeap, p) } lm.cursors = append(lm.cursors, tsc) } @@ -387,7 +389,7 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) { // Prime the buffers. for i := 0; i < len(tsc.cursors); i++ { k, v := tsc.cursors[i].SeekTo(tmin) - if k == -1 { + if k == -1 || k > tmax { continue } p := &pointHeapItem{ @@ -769,6 +771,11 @@ type seriesCursor struct { cursor Cursor // BoltDB cursor for a series filter influxql.Expr tags map[string]string + seekto int64 + seekResult struct { + k int64 + v []byte + } } // newSeriesCursor returns a new instance of a series cursor. @@ -777,22 +784,38 @@ func newSeriesCursor(cur Cursor, filter influxql.Expr, tags map[string]string) * cursor: cur, filter: filter, tags: tags, + seekto: -1, } } // Seek positions returning the timestamp and value at that key. func (sc *seriesCursor) SeekTo(key int64) (timestamp int64, value []byte) { + if sc.seekto != -1 && sc.seekto < key && (sc.seekResult.k == -1 || sc.seekResult.k >= key) { + // we've seeked on this cursor. This seek is after that previous cached seek + // and the result it gave was after the key for this seek. + // + // In this case, any seek would just return what we got before, so there's + // no point in reseeking. + return sc.seekResult.k, sc.seekResult.v + } k, v := sc.cursor.Seek(u64tob(uint64(key))) if k == nil { timestamp = -1 } else { timestamp, value = int64(btou64(k)), v } + sc.seekto = key + sc.seekResult.k = timestamp + sc.seekResult.v = v return } // Next returns the next timestamp and value from the cursor. func (sc *seriesCursor) Next() (key int64, value []byte) { + // calling next on this cursor means that we need to invalidate the seek + sc.seekto = -1 + sc.seekResult.k = 0 + sc.seekResult.v = nil k, v := sc.cursor.Next() if k == nil { key = -1