From 59307b8b789de0cab03badff7fc276c0abad46ec Mon Sep 17 00:00:00 2001 From: Daniel Morsing Date: Wed, 16 Sep 2015 16:07:50 +0000 Subject: [PATCH] optimize top queries Instead of rounding up the points, sorting and then slicing, keep a heap that allows us to quickly see if the point needs to be in the set. This cuts a top query on a dataset of 8 million points from 35 seconds to 11 seconds. --- tsdb/functions.go | 270 +++++++++++++++++++++++++---------------- tsdb/functions_test.go | 12 +- 2 files changed, 170 insertions(+), 112 deletions(-) diff --git a/tsdb/functions.go b/tsdb/functions.go index f7341de63a..fc65ec4795 100644 --- a/tsdb/functions.go +++ b/tsdb/functions.go @@ -7,6 +7,7 @@ package tsdb // When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function import ( + "container/heap" "encoding/json" "fmt" "math" @@ -1120,7 +1121,7 @@ func interfaceCompare(a, b interface{}) int { case string: return stringWeight, 0 } - panic("interfaceValues.Less - unreachable code") + panic(fmt.Sprintf("interfaceValues.Less - unreachable code; type was %t", val)) } w1, n1 := infer(a) @@ -1157,24 +1158,41 @@ type PositionPoint struct { } type topMapOut struct { - positionOut + *positionOut } -func (t topMapOut) Len() int { return len(t.points) } -func (t topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] } -func (t topMapOut) Less(i, j int) bool { +func (t *topMapOut) Len() int { return len(t.points) } +func (t *topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] } +func (t *topMapOut) Less(i, j int) bool { // old C trick makes this code easier to read. Imagine // that the OP in "cmp(i, j) OP 0" is the comparison you want // between i and j cmp := interfaceCompare(t.points[i].Value, t.points[j].Value) if cmp != 0 { - return cmp > 0 + return cmp < 0 } k1, k2 := t.points[i].Time, t.points[j].Time if k1 != k2 { - return k1 < k2 + return k1 > k2 } - return t.lessKey(i, j) + return !t.lessKey(i, j) +} + +// We never use this function, so make it a no-op. +func (t *topMapOut) Push(i interface{}) { + panic("someone used the function") +} + +// this function doesn't return anything meaningful, since we don't look at the +// return value and we don't want to allocate for generating an interface. +func (t *topMapOut) Pop() interface{} { + t.points = t.points[:len(t.points)-1] + return nil +} + +func (t *topMapOut) insert(p PositionPoint) { + t.points[0] = p + heap.Fix(t, 0) } type topReduceOut struct { @@ -1210,50 +1228,83 @@ func topCallArgs(c *influxql.Call) []string { return names } +func tagkeytop(args []string, fields map[string]interface{}, keys map[string]string) string { + key := "" + for _, a := range args { + if v, ok := fields[a]; ok { + key += a + ":" + fmt.Sprintf("%v", v) + "," + continue + } + if v, ok := keys[a]; ok { + key += a + ":" + v + "," + continue + } + } + return key +} + +// map iterator. We need this for the top +// query, but luckily that doesn't require ordered +// iteration, so we can fake it +type mapIter struct { + m map[string]PositionPoint + currTags map[string]string + tmin int64 +} + +func (m *mapIter) TMin() int64 { + return m.tmin +} + +func (m *mapIter) Tags() map[string]string { + return m.currTags +} + +func (m *mapIter) Next() (time int64, value interface{}) { + // this is a bit ugly, but can't think of any other way that doesn't involve dumping + // the entire map to an array + for key, p := range m.m { + m.currTags = p.Tags + time = p.Time + value = p.Value + delete(m.m, key) + return + } + return -1, nil +} + // MapTop emits the top data points for each group by interval func MapTop(itr iterator, c *influxql.Call) interface{} { // Capture the limit if it was specified in the call lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral) - limit := int64(lit.Val) + limit := int(lit.Val) - // Simple case where only value and limit are specified. - if len(c.Args) == 2 { - out := positionOut{callArgs: topCallArgs(c)} + out := positionOut{callArgs: topCallArgs(c)} + out.points = make([]PositionPoint, 0, limit) + minheap := topMapOut{&out} + tagmap := make(map[string]PositionPoint) + if len(c.Args) > 2 { + // this is a tag aggregating query. + // For each unique permutation of the tags given, + // select the max and then fall through to select top of those + // points for k, v := itr.Next(); k != -1; k, v = itr.Next() { - t := k - if bt := itr.TMin(); bt > -1 { - t = bt - } - out.points = append(out.points, PositionPoint{t, v, itr.Tags()}) - } - - // If we have more than we asked for, only send back the top values - if int64(len(out.points)) > limit { - sort.Sort(topMapOut{out}) - out.points = out.points[:limit] - } - if len(out.points) > 0 { - return out.points - } - return nil - } - // They specified tags in the call to get unique sets, so we need to map them as we accumulate them - outMap := make(map[string]positionOut) - - mapKey := func(args []string, fields map[string]interface{}, keys map[string]string) string { - key := "" - for _, a := range args { - if v, ok := fields[a]; ok { - key += a + ":" + fmt.Sprintf("%v", v) + "," - continue - } - if v, ok := keys[a]; ok { - key += a + ":" + v + "," - continue + callArgs := c.Fields() + tags := itr.Tags() + // TODO in the future we need to send in fields as well + // this will allow a user to query on both fields and tags + // fields will take the priority over tags if there is a name collision + key := tagkeytop(callArgs, nil, tags) + p, ok := tagmap[key] + if !ok || interfaceCompare(p.Value, v) < 0 { + tagmap[key] = PositionPoint{k, v, itr.Tags()} } } - return key + itr = &mapIter{ + m: tagmap, + tmin: itr.TMin(), + } } for k, v := itr.Next(); k != -1; k, v = itr.Next() { @@ -1261,92 +1312,99 @@ func MapTop(itr iterator, c *influxql.Call) interface{} { if bt := itr.TMin(); bt > -1 { t = bt } - callArgs := c.Fields() - tags := itr.Tags() - // TODO in the future we need to send in fields as well - // this will allow a user to query on both fields and tags - // fields will take the priority over tags if there is a name collision - key := mapKey(callArgs, nil, tags) - if out, ok := outMap[key]; ok { + if len(out.points) < limit { out.points = append(out.points, PositionPoint{t, v, itr.Tags()}) - outMap[key] = out + if len(out.points) == limit { + heap.Init(&minheap) + } } else { - out = positionOut{callArgs: topCallArgs(c)} - out.points = append(out.points, PositionPoint{t, v, itr.Tags()}) - outMap[key] = out - } - } - // Sort all the maps - for k, v := range outMap { - sort.Sort(topMapOut{v}) - outMap[k] = v - } - - slice := func(needed int64, m map[string]positionOut) PositionPoints { - points := PositionPoints{} - var collected int64 - for k, v := range m { - if len(v.points) > 0 { - points = append(points, v.points[0]) - v.points = v.points[1:] - m[k] = v - collected++ + // we're over the limit, so find out if we're bigger than the + // smallest point in the set and eject it if we are + p := &out.points[0] + cmp := interfaceCompare(p.Value, v) + if cmp == 0 { + // equal values, insert if the highest timestamp + if k > p.Time { + minheap.insert(PositionPoint{t, v, itr.Tags()}) + } + } else if cmp < 0 { + minheap.insert(PositionPoint{t, v, itr.Tags()}) } } - o := positionOut{callArgs: topCallArgs(c), points: points} - sort.Sort(topMapOut{o}) - points = o.points - // If we got more than we needed, sort them and return the top - if collected > needed { - points = o.points[:needed] - } - - return points } - - points := PositionPoints{} - var collected int64 - for collected < limit { - p := slice(limit-collected, outMap) - if len(p) == 0 { - break - } - points = append(points, p...) - collected += int64(len(p)) + // should only happen on empty iterator. + if len(out.points) == 0 { + return nil + } else if len(out.points) < limit { + // it would be as fast to just sort regularly here, + // but falling down to the heapsort will mean we can get + // rid of another sort order. + heap.Init(&minheap) } - if len(points) > 0 { - return points + // minheap should now contain the largest values that were encountered + // during iteration. + // + // we want these values in ascending sorted order. We can achieve this by iteratively + // removing the lowest element and putting it at the end of the array. This is analogous + // to a heap sort. + // + // computer science is fun! + result := out.points + for len(out.points) > 0 { + p := out.points[0] + heap.Pop(&minheap) + // reslice so that we can get to the element just after the heap + endslice := out.points[:len(out.points)+1] + endslice[len(endslice)-1] = p } - return nil + // the ascending order is now in the result slice + return result } // ReduceTop computes the top values for each key. +// This function assumes that its inputs are in sorted ascending order. func ReduceTop(values []interface{}, c *influxql.Call) interface{} { lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral) - limit := int64(lit.Val) + limit := int(lit.Val) out := positionOut{callArgs: topCallArgs(c)} + results := make([]PositionPoints, 0, len(values)) + out.points = make([]PositionPoint, 0, limit) for _, v := range values { if v == nil { continue } - o, _ := v.(PositionPoints) - out.points = append(out.points, o...) + o, ok := v.(PositionPoints) + if ok { + results = append(results, o) + } } - - // Get the top of the top values - sort.Sort(topMapOut{out}) - // If we have more than we asked for, only send back the top values - if int64(len(out.points)) > limit { - out.points = out.points[:limit] + // These ranges are all in sorted ascending order + // so we can grab the top value out of all of them + // to figure out the top X ones. + for i := 0; i < limit; i++ { + max := interface{}(nil) + whichselected := -1 + for iter, v := range results { + if len(v) > 0 && (max == nil || interfaceCompare(max, v[0].Value) < 0) { + max = v[0].Value + whichselected = iter + } + } + if whichselected == -1 { + // none of the points have any values + // so we can return what we have now + sort.Sort(topReduceOut{out}) + return out.points + } + v := results[whichselected] + out.points = append(out.points, v[0]) + results[whichselected] = v[1:] } // now we need to resort the tops by time sort.Sort(topReduceOut{out}) - if len(out.points) > 0 { - return out.points - } - return nil + return out.points } // MapEcho emits the data points for each group by interval diff --git a/tsdb/functions_test.go b/tsdb/functions_test.go index 3fa66123ae..e43751923c 100644 --- a/tsdb/functions_test.go +++ b/tsdb/functions_test.go @@ -539,7 +539,7 @@ func TestMapTop(t *testing.T) { PositionPoint{20, int64(99), map[string]string{"host": "a"}}, }, }, - call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}}, + call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "int64 - tie on value, time, resolve based on tags", @@ -657,8 +657,8 @@ func TestReduceTop(t *testing.T) { values: []interface{}{ PositionPoints{ {10, int64(99), map[string]string{"host": "a"}}, - {10, int64(53), map[string]string{"host": "b"}}, {20, int64(88), map[string]string{"host": "a"}}, + {10, int64(53), map[string]string{"host": "b"}}, }, }, exp: PositionPoints{ @@ -674,8 +674,8 @@ func TestReduceTop(t *testing.T) { {10, int64(99), map[string]string{"host": "a"}}, }, PositionPoints{ - {10, int64(53), map[string]string{"host": "b"}}, {20, int64(88), map[string]string{"host": "a"}}, + {10, int64(53), map[string]string{"host": "b"}}, }, }, exp: PositionPoints{ @@ -689,8 +689,8 @@ func TestReduceTop(t *testing.T) { values: []interface{}{ PositionPoints{ {10, int64(99), map[string]string{"host": "a"}}, - {10, int64(53), map[string]string{"host": "b"}}, {20, int64(88), map[string]string{"host": "a"}}, + {10, int64(53), map[string]string{"host": "b"}}, }, nil, }, @@ -705,8 +705,8 @@ func TestReduceTop(t *testing.T) { values: []interface{}{ PositionPoints{ {10, int64(99), map[string]string{"host": "a"}}, - {10, int64(53), map[string]string{"host": "b"}}, {20, int64(88), map[string]string{}}, + {10, int64(53), map[string]string{"host": "b"}}, }, nil, }, @@ -722,8 +722,8 @@ func TestReduceTop(t *testing.T) { values: []interface{}{ PositionPoints{ {10, int64(99), map[string]string{"host": "a"}}, - {10, int64(53), map[string]string{"host": "b"}}, {20, int64(88), map[string]string{}}, + {10, int64(53), map[string]string{"host": "b"}}, }, nil, },