Merge pull request #3862 from influxdb/seekonce
WIP don't bother seeking a cursor if it wont yield a useful valuepull/3872/head
commit
bb36faff4c
|
@ -228,19 +228,21 @@ func (lm *SelectMapper) Open() error {
|
|||
}
|
||||
|
||||
tsc := newTagSetCursor(m.Name, t.Tags, cursors, lm.shard.FieldCodec(m.Name))
|
||||
tsc.pointHeap = newPointHeap()
|
||||
//Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
k, v := tsc.cursors[i].SeekTo(lm.queryTMin)
|
||||
if k == -1 {
|
||||
continue
|
||||
if lm.rawMode {
|
||||
tsc.pointHeap = newPointHeap()
|
||||
//Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
k, v := tsc.cursors[i].SeekTo(lm.queryTMin)
|
||||
if k == -1 {
|
||||
continue
|
||||
}
|
||||
p := &pointHeapItem{
|
||||
timestamp: k,
|
||||
value: v,
|
||||
cursor: tsc.cursors[i],
|
||||
}
|
||||
heap.Push(tsc.pointHeap, p)
|
||||
}
|
||||
p := &pointHeapItem{
|
||||
timestamp: k,
|
||||
value: v,
|
||||
cursor: tsc.cursors[i],
|
||||
}
|
||||
heap.Push(tsc.pointHeap, p)
|
||||
}
|
||||
lm.cursors = append(lm.cursors, tsc)
|
||||
}
|
||||
|
@ -387,7 +389,7 @@ func (lm *SelectMapper) nextChunkAgg() (interface{}, error) {
|
|||
// Prime the buffers.
|
||||
for i := 0; i < len(tsc.cursors); i++ {
|
||||
k, v := tsc.cursors[i].SeekTo(tmin)
|
||||
if k == -1 {
|
||||
if k == -1 || k > tmax {
|
||||
continue
|
||||
}
|
||||
p := &pointHeapItem{
|
||||
|
@ -769,6 +771,11 @@ type seriesCursor struct {
|
|||
cursor Cursor // BoltDB cursor for a series
|
||||
filter influxql.Expr
|
||||
tags map[string]string
|
||||
seekto int64
|
||||
seekResult struct {
|
||||
k int64
|
||||
v []byte
|
||||
}
|
||||
}
|
||||
|
||||
// newSeriesCursor returns a new instance of a series cursor.
|
||||
|
@ -777,22 +784,38 @@ func newSeriesCursor(cur Cursor, filter influxql.Expr, tags map[string]string) *
|
|||
cursor: cur,
|
||||
filter: filter,
|
||||
tags: tags,
|
||||
seekto: -1,
|
||||
}
|
||||
}
|
||||
|
||||
// Seek positions returning the timestamp and value at that key.
|
||||
func (sc *seriesCursor) SeekTo(key int64) (timestamp int64, value []byte) {
|
||||
if sc.seekto != -1 && sc.seekto < key && (sc.seekResult.k == -1 || sc.seekResult.k >= key) {
|
||||
// we've seeked on this cursor. This seek is after that previous cached seek
|
||||
// and the result it gave was after the key for this seek.
|
||||
//
|
||||
// In this case, any seek would just return what we got before, so there's
|
||||
// no point in reseeking.
|
||||
return sc.seekResult.k, sc.seekResult.v
|
||||
}
|
||||
k, v := sc.cursor.Seek(u64tob(uint64(key)))
|
||||
if k == nil {
|
||||
timestamp = -1
|
||||
} else {
|
||||
timestamp, value = int64(btou64(k)), v
|
||||
}
|
||||
sc.seekto = key
|
||||
sc.seekResult.k = timestamp
|
||||
sc.seekResult.v = v
|
||||
return
|
||||
}
|
||||
|
||||
// Next returns the next timestamp and value from the cursor.
|
||||
func (sc *seriesCursor) Next() (key int64, value []byte) {
|
||||
// calling next on this cursor means that we need to invalidate the seek
|
||||
sc.seekto = -1
|
||||
sc.seekResult.k = 0
|
||||
sc.seekResult.v = nil
|
||||
k, v := sc.cursor.Next()
|
||||
if k == nil {
|
||||
key = -1
|
||||
|
|
Loading…
Reference in New Issue