use heap for tagSetCursor lookahead

pull/3520/head
David Norton 2015-07-22 11:39:47 -04:00
parent a09c00653c
commit b003522a18
1 changed files with 128 additions and 65 deletions

View File

@ -1,10 +1,10 @@
package tsdb package tsdb
import ( import (
"container/heap"
"encoding/binary" "encoding/binary"
"errors" "errors"
"fmt" "fmt"
"math"
"sort" "sort"
"strings" "strings"
@ -212,11 +212,19 @@ func (lm *LocalMapper) Open() error {
} }
tsc := newTagSetCursor(m.Name, t.Tags, cursors, lm.shard.FieldCodec(m.Name)) tsc := newTagSetCursor(m.Name, t.Tags, cursors, lm.shard.FieldCodec(m.Name))
// Prime the buffers. tsc.pointHeap = newPointHeap()
//Prime the buffers.
for i := 0; i < len(tsc.cursors); i++ { for i := 0; i < len(tsc.cursors); i++ {
k, v := tsc.cursors[i].SeekTo(lm.queryTMin) k, v := tsc.cursors[i].SeekTo(lm.queryTMin)
tsc.keyBuffer[i] = k if k == -1 {
tsc.valueBuffer[i] = v continue
}
p := &pointHeapItem{
timestamp: k,
value: v,
cursor: tsc.cursors[i],
}
heap.Push(tsc.pointHeap, p)
} }
lm.cursors = append(lm.cursors, tsc) lm.cursors = append(lm.cursors, tsc)
} }
@ -325,6 +333,7 @@ func (lm *LocalMapper) nextChunkAgg() (interface{}, error) {
qmin = lm.queryTMin qmin = lm.queryTMin
} }
tsc.pointHeap = newPointHeap()
for i := range lm.mapFuncs { for i := range lm.mapFuncs {
// Prime the tagset cursor for the start of the interval. This is not ideal, as // Prime the tagset cursor for the start of the interval. This is not ideal, as
// it should really calculate the values all in 1 pass, but that would require // it should really calculate the values all in 1 pass, but that would require
@ -332,10 +341,16 @@ func (lm *LocalMapper) nextChunkAgg() (interface{}, error) {
// Prime the buffers. // Prime the buffers.
for i := 0; i < len(tsc.cursors); i++ { for i := 0; i < len(tsc.cursors); i++ {
k, v := tsc.cursors[i].SeekTo(tmin) k, v := tsc.cursors[i].SeekTo(tmin)
tsc.keyBuffer[i] = k if k == -1 {
tsc.valueBuffer[i] = v continue
}
p := &pointHeapItem{
timestamp: k,
value: v,
cursor: tsc.cursors[i],
}
heap.Push(tsc.pointHeap, p)
} }
// Wrap the tagset cursor so it implements the mapping functions interface. // Wrap the tagset cursor so it implements the mapping functions interface.
f := func() (time int64, value interface{}) { f := func() (time int64, value interface{}) {
return tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields) return tsc.Next(qmin, tmax, []string{lm.fieldNames[i]}, lm.whereFields)
@ -428,6 +443,42 @@ func (a *aggTagSetCursor) Next() (time int64, value interface{}) {
return a.nextFunc() return a.nextFunc()
} }
type pointHeapItem struct {
timestamp int64
value []byte
cursor *seriesCursor // cursor whence pointHeapItem came
}
type pointHeap []*pointHeapItem
func newPointHeap() *pointHeap {
q := make(pointHeap, 0)
heap.Init(&q)
return &q
}
func (pq pointHeap) Len() int { return len(pq) }
func (pq pointHeap) Less(i, j int) bool {
// We want a min-heap (points in chronological order), so use less than.
return pq[i].timestamp < pq[j].timestamp
}
func (pq pointHeap) Swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] }
func (pq *pointHeap) Push(x interface{}) {
item := x.(*pointHeapItem)
*pq = append(*pq, item)
}
func (pq *pointHeap) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
*pq = old[0 : n-1]
return item
}
// tagSetCursor is virtual cursor that iterates over mutiple series cursors, as though it were // tagSetCursor is virtual cursor that iterates over mutiple series cursors, as though it were
// a single series. // a single series.
type tagSetCursor struct { type tagSetCursor struct {
@ -436,10 +487,14 @@ type tagSetCursor struct {
cursors []*seriesCursor // Underlying series cursors. cursors []*seriesCursor // Underlying series cursors.
decoder *FieldCodec // decoder for the raw data bytes decoder *FieldCodec // decoder for the raw data bytes
// Lookahead buffers for the cursors. Performance analysis shows that it is critical // pointHeap is a min-heap, ordered by timestamp, that contains the next
// that these buffers are part of the tagSetCursor type and not part of the cursors type. // point from each seriesCursor. Queries sometimes pull points from
keyBuffer []int64 // The current timestamp key for each cursor // thousands of series. This makes it reasonably efficient to find the
valueBuffer [][]byte // The current value for each cursor // point with the next lowest timestamp among the thousands of series that
// the query is pulling points from.
// Performance profiling shows that this lookahead needs to be part
// of the tagSetCursor type and not part of the the cursors type.
pointHeap *pointHeap
} }
// tagSetCursors represents a sortable slice of tagSetCursors. // tagSetCursors represents a sortable slice of tagSetCursors.
@ -460,14 +515,15 @@ func (a tagSetCursors) Keys() []string {
// newTagSetCursor returns a tagSetCursor // newTagSetCursor returns a tagSetCursor
func newTagSetCursor(m string, t map[string]string, c []*seriesCursor, d *FieldCodec) *tagSetCursor { func newTagSetCursor(m string, t map[string]string, c []*seriesCursor, d *FieldCodec) *tagSetCursor {
return &tagSetCursor{ tsc := &tagSetCursor{
measurement: m, measurement: m,
tags: t, tags: t,
cursors: c, cursors: c,
decoder: d, decoder: d,
keyBuffer: make([]int64, len(c)), pointHeap: newPointHeap(),
valueBuffer: make([][]byte, len(c)),
} }
return tsc
} }
func (tsc *tagSetCursor) key() string { func (tsc *tagSetCursor) key() string {
@ -478,73 +534,80 @@ func (tsc *tagSetCursor) key() string {
// is enforced on the values. If there is no matching value, then a nil result is returned. // is enforced on the values. If there is no matching value, then a nil result is returned.
func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, interface{}) { func (tsc *tagSetCursor) Next(tmin, tmax int64, selectFields, whereFields []string) (int64, interface{}) {
for { for {
// Find the next lowest timestamp // If we're out of points, we're done.
min := -1 if tsc.pointHeap.Len() == 0 {
minKey := int64(math.MaxInt64)
for i, k := range tsc.keyBuffer {
if k != -1 && (k == tmin) || k < minKey && k >= tmin && k < tmax {
min = i
minKey = k
}
}
// Return if there is no more data for this tagset.
if min == -1 {
return -1, nil return -1, nil
} }
// set the current timestamp and seriesID // Grab the next point with the lowest timestamp.
timestamp := tsc.keyBuffer[min] p := heap.Pop(tsc.pointHeap).(*pointHeapItem)
var value interface{} // We're done if the point is outside the query's time range [tmin:tmax).
if len(selectFields) > 1 { if p.timestamp != tmin && (tmin > p.timestamp || p.timestamp >= tmax) {
if fieldsWithNames, err := tsc.decoder.DecodeFieldsWithNames(tsc.valueBuffer[min]); err == nil { return -1, nil
value = fieldsWithNames
// if there's a where clause, make sure we don't need to filter this value
if tsc.cursors[min].filter != nil && !matchesWhere(tsc.cursors[min].filter, fieldsWithNames) {
value = nil
}
}
} else {
// With only 1 field SELECTed, decoding all fields may be avoidable, which is faster.
var err error
value, err = tsc.decoder.DecodeByName(selectFields[0], tsc.valueBuffer[min])
if err != nil {
value = nil
} else {
// If there's a WHERE clase, see if we need to filter
if tsc.cursors[min].filter != nil {
// See if the WHERE is only on this field or on one or more other fields.
// If the latter, we'll have to decode everything
if len(whereFields) == 1 && whereFields[0] == selectFields[0] {
if !matchesWhere(tsc.cursors[min].filter, map[string]interface{}{selectFields[0]: value}) {
value = nil
}
} else { // Decode everything
fieldsWithNames, err := tsc.decoder.DecodeFieldsWithNames(tsc.valueBuffer[min])
if err != nil || !matchesWhere(tsc.cursors[min].filter, fieldsWithNames) {
value = nil
}
}
}
}
} }
// Advance the cursor // Advance the cursor
nextKey, nextVal := tsc.cursors[min].Next() nextKey, nextVal := p.cursor.Next()
tsc.keyBuffer[min] = nextKey if nextKey != -1 {
tsc.valueBuffer[min] = nextVal nextPoint := &pointHeapItem{
timestamp: nextKey,
value: nextVal,
cursor: p.cursor,
}
heap.Push(tsc.pointHeap, nextPoint)
}
// Decode the raw point.
value := tsc.decodeRawPoint(p, selectFields, whereFields)
// Value didn't match, look for the next one. // Value didn't match, look for the next one.
if value == nil { if value == nil {
continue continue
} }
return timestamp, value return p.timestamp, value
} }
} }
// decodeRawPoint decodes raw point data into field names & values and does WHERE filtering.
func (tsc *tagSetCursor) decodeRawPoint(p *pointHeapItem, selectFields, whereFields []string) interface{} {
if len(selectFields) > 1 {
if fieldsWithNames, err := tsc.decoder.DecodeFieldsWithNames(p.value); err == nil {
// if there's a where clause, make sure we don't need to filter this value
if p.cursor.filter != nil && !matchesWhere(p.cursor.filter, fieldsWithNames) {
return nil
}
return fieldsWithNames
}
}
// With only 1 field SELECTed, decoding all fields may be avoidable, which is faster.
value, err := tsc.decoder.DecodeByName(selectFields[0], p.value)
if err != nil {
return nil
}
// If there's a WHERE clase, see if we need to filter
if p.cursor.filter != nil {
// See if the WHERE is only on this field or on one or more other fields.
// If the latter, we'll have to decode everything
if len(whereFields) == 1 && whereFields[0] == selectFields[0] {
if !matchesWhere(p.cursor.filter, map[string]interface{}{selectFields[0]: value}) {
value = nil
}
} else { // Decode everything
fieldsWithNames, err := tsc.decoder.DecodeFieldsWithNames(p.value)
if err != nil || !matchesWhere(p.cursor.filter, fieldsWithNames) {
value = nil
}
}
}
return value
}
// seriesCursor is a cursor that walks a single series. It provides lookahead functionality. // seriesCursor is a cursor that walks a single series. It provides lookahead functionality.
type seriesCursor struct { type seriesCursor struct {
cursor Cursor // BoltDB cursor for a series cursor Cursor // BoltDB cursor for a series