optimize top queries

Instead of rounding up the points, sorting and then slicing, keep a
heap that allows us to quickly see if the point needs to be in the
set. This cuts a top query on a dataset of 8 million points from 35
seconds to 11 seconds.
pull/4130/head
Daniel Morsing 2015-09-16 16:07:50 +00:00
parent afe5cc67f4
commit 59307b8b78
2 changed files with 170 additions and 112 deletions

View File

@ -7,6 +7,7 @@ package tsdb
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function
import (
"container/heap"
"encoding/json"
"fmt"
"math"
@ -1120,7 +1121,7 @@ func interfaceCompare(a, b interface{}) int {
case string:
return stringWeight, 0
}
panic("interfaceValues.Less - unreachable code")
panic(fmt.Sprintf("interfaceValues.Less - unreachable code; type was %t", val))
}
w1, n1 := infer(a)
@ -1157,24 +1158,41 @@ type PositionPoint struct {
}
type topMapOut struct {
positionOut
*positionOut
}
func (t topMapOut) Len() int { return len(t.points) }
func (t topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t topMapOut) Less(i, j int) bool {
func (t *topMapOut) Len() int { return len(t.points) }
func (t *topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t *topMapOut) Less(i, j int) bool {
// old C trick makes this code easier to read. Imagine
// that the OP in "cmp(i, j) OP 0" is the comparison you want
// between i and j
cmp := interfaceCompare(t.points[i].Value, t.points[j].Value)
if cmp != 0 {
return cmp > 0
return cmp < 0
}
k1, k2 := t.points[i].Time, t.points[j].Time
if k1 != k2 {
return k1 < k2
return k1 > k2
}
return t.lessKey(i, j)
return !t.lessKey(i, j)
}
// We never use this function, so make it a no-op.
func (t *topMapOut) Push(i interface{}) {
panic("someone used the function")
}
// this function doesn't return anything meaningful, since we don't look at the
// return value and we don't want to allocate for generating an interface.
func (t *topMapOut) Pop() interface{} {
t.points = t.points[:len(t.points)-1]
return nil
}
func (t *topMapOut) insert(p PositionPoint) {
t.points[0] = p
heap.Fix(t, 0)
}
type topReduceOut struct {
@ -1210,38 +1228,7 @@ func topCallArgs(c *influxql.Call) []string {
return names
}
// MapTop emits the top data points for each group by interval
func MapTop(itr iterator, c *influxql.Call) interface{} {
// Capture the limit if it was specified in the call
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int64(lit.Val)
// Simple case where only value and limit are specified.
if len(c.Args) == 2 {
out := positionOut{callArgs: topCallArgs(c)}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
t := k
if bt := itr.TMin(); bt > -1 {
t = bt
}
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
}
// If we have more than we asked for, only send back the top values
if int64(len(out.points)) > limit {
sort.Sort(topMapOut{out})
out.points = out.points[:limit]
}
if len(out.points) > 0 {
return out.points
}
return nil
}
// They specified tags in the call to get unique sets, so we need to map them as we accumulate them
outMap := make(map[string]positionOut)
mapKey := func(args []string, fields map[string]interface{}, keys map[string]string) string {
func tagkeytop(args []string, fields map[string]interface{}, keys map[string]string) string {
key := ""
for _, a := range args {
if v, ok := fields[a]; ok {
@ -1254,6 +1241,70 @@ func MapTop(itr iterator, c *influxql.Call) interface{} {
}
}
return key
}
// map iterator. We need this for the top
// query, but luckily that doesn't require ordered
// iteration, so we can fake it
type mapIter struct {
m map[string]PositionPoint
currTags map[string]string
tmin int64
}
func (m *mapIter) TMin() int64 {
return m.tmin
}
func (m *mapIter) Tags() map[string]string {
return m.currTags
}
func (m *mapIter) Next() (time int64, value interface{}) {
// this is a bit ugly, but can't think of any other way that doesn't involve dumping
// the entire map to an array
for key, p := range m.m {
m.currTags = p.Tags
time = p.Time
value = p.Value
delete(m.m, key)
return
}
return -1, nil
}
// MapTop emits the top data points for each group by interval
func MapTop(itr iterator, c *influxql.Call) interface{} {
// Capture the limit if it was specified in the call
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int(lit.Val)
out := positionOut{callArgs: topCallArgs(c)}
out.points = make([]PositionPoint, 0, limit)
minheap := topMapOut{&out}
tagmap := make(map[string]PositionPoint)
if len(c.Args) > 2 {
// this is a tag aggregating query.
// For each unique permutation of the tags given,
// select the max and then fall through to select top of those
// points
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
callArgs := c.Fields()
tags := itr.Tags()
// TODO in the future we need to send in fields as well
// this will allow a user to query on both fields and tags
// fields will take the priority over tags if there is a name collision
key := tagkeytop(callArgs, nil, tags)
p, ok := tagmap[key]
if !ok || interfaceCompare(p.Value, v) < 0 {
tagmap[key] = PositionPoint{k, v, itr.Tags()}
}
}
itr = &mapIter{
m: tagmap,
tmin: itr.TMin(),
}
}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
@ -1261,92 +1312,99 @@ func MapTop(itr iterator, c *influxql.Call) interface{} {
if bt := itr.TMin(); bt > -1 {
t = bt
}
callArgs := c.Fields()
tags := itr.Tags()
// TODO in the future we need to send in fields as well
// this will allow a user to query on both fields and tags
// fields will take the priority over tags if there is a name collision
key := mapKey(callArgs, nil, tags)
if out, ok := outMap[key]; ok {
if len(out.points) < limit {
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
outMap[key] = out
if len(out.points) == limit {
heap.Init(&minheap)
}
} else {
out = positionOut{callArgs: topCallArgs(c)}
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
outMap[key] = out
// we're over the limit, so find out if we're bigger than the
// smallest point in the set and eject it if we are
p := &out.points[0]
cmp := interfaceCompare(p.Value, v)
if cmp == 0 {
// equal values, insert if the highest timestamp
if k > p.Time {
minheap.insert(PositionPoint{t, v, itr.Tags()})
}
} else if cmp < 0 {
minheap.insert(PositionPoint{t, v, itr.Tags()})
}
}
// Sort all the maps
for k, v := range outMap {
sort.Sort(topMapOut{v})
outMap[k] = v
}
slice := func(needed int64, m map[string]positionOut) PositionPoints {
points := PositionPoints{}
var collected int64
for k, v := range m {
if len(v.points) > 0 {
points = append(points, v.points[0])
v.points = v.points[1:]
m[k] = v
collected++
}
}
o := positionOut{callArgs: topCallArgs(c), points: points}
sort.Sort(topMapOut{o})
points = o.points
// If we got more than we needed, sort them and return the top
if collected > needed {
points = o.points[:needed]
}
return points
}
points := PositionPoints{}
var collected int64
for collected < limit {
p := slice(limit-collected, outMap)
if len(p) == 0 {
break
}
points = append(points, p...)
collected += int64(len(p))
}
if len(points) > 0 {
return points
}
// should only happen on empty iterator.
if len(out.points) == 0 {
return nil
} else if len(out.points) < limit {
// it would be as fast to just sort regularly here,
// but falling down to the heapsort will mean we can get
// rid of another sort order.
heap.Init(&minheap)
}
// minheap should now contain the largest values that were encountered
// during iteration.
//
// we want these values in ascending sorted order. We can achieve this by iteratively
// removing the lowest element and putting it at the end of the array. This is analogous
// to a heap sort.
//
// computer science is fun!
result := out.points
for len(out.points) > 0 {
p := out.points[0]
heap.Pop(&minheap)
// reslice so that we can get to the element just after the heap
endslice := out.points[:len(out.points)+1]
endslice[len(endslice)-1] = p
}
// the ascending order is now in the result slice
return result
}
// ReduceTop computes the top values for each key.
// This function assumes that its inputs are in sorted ascending order.
func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int64(lit.Val)
limit := int(lit.Val)
out := positionOut{callArgs: topCallArgs(c)}
results := make([]PositionPoints, 0, len(values))
out.points = make([]PositionPoint, 0, limit)
for _, v := range values {
if v == nil {
continue
}
o, _ := v.(PositionPoints)
out.points = append(out.points, o...)
o, ok := v.(PositionPoints)
if ok {
results = append(results, o)
}
// Get the top of the top values
sort.Sort(topMapOut{out})
// If we have more than we asked for, only send back the top values
if int64(len(out.points)) > limit {
out.points = out.points[:limit]
}
// These ranges are all in sorted ascending order
// so we can grab the top value out of all of them
// to figure out the top X ones.
for i := 0; i < limit; i++ {
max := interface{}(nil)
whichselected := -1
for iter, v := range results {
if len(v) > 0 && (max == nil || interfaceCompare(max, v[0].Value) < 0) {
max = v[0].Value
whichselected = iter
}
}
if whichselected == -1 {
// none of the points have any values
// so we can return what we have now
sort.Sort(topReduceOut{out})
return out.points
}
v := results[whichselected]
out.points = append(out.points, v[0])
results[whichselected] = v[1:]
}
// now we need to resort the tops by time
sort.Sort(topReduceOut{out})
if len(out.points) > 0 {
return out.points
}
return nil
}
// MapEcho emits the data points for each group by interval

View File

@ -539,7 +539,7 @@ func TestMapTop(t *testing.T) {
PositionPoint{20, int64(99), map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - tie on value, time, resolve based on tags",
@ -657,8 +657,8 @@ func TestReduceTop(t *testing.T) {
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
},
},
exp: PositionPoints{
@ -674,8 +674,8 @@ func TestReduceTop(t *testing.T) {
{10, int64(99), map[string]string{"host": "a"}},
},
PositionPoints{
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
},
},
exp: PositionPoints{
@ -689,8 +689,8 @@ func TestReduceTop(t *testing.T) {
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
},
nil,
},
@ -705,8 +705,8 @@ func TestReduceTop(t *testing.T) {
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{}},
{10, int64(53), map[string]string{"host": "b"}},
},
nil,
},
@ -722,8 +722,8 @@ func TestReduceTop(t *testing.T) {
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{}},
{10, int64(53), map[string]string{"host": "b"}},
},
nil,
},