commit
b07c36288d
|
@ -7,10 +7,12 @@ package tsdb
|
||||||
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function
|
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"container/heap"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
"math"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
@ -227,7 +229,12 @@ type interfaceValues []interface{}
|
||||||
func (d interfaceValues) Len() int { return len(d) }
|
func (d interfaceValues) Len() int { return len(d) }
|
||||||
func (d interfaceValues) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
func (d interfaceValues) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
|
||||||
func (d interfaceValues) Less(i, j int) bool {
|
func (d interfaceValues) Less(i, j int) bool {
|
||||||
return interfaceCompare(d[i], d[j]) < 0
|
cmpt, a, b := typeCompare(d[i], d[j])
|
||||||
|
cmpv := valueCompare(a, b)
|
||||||
|
if cmpv == 0 {
|
||||||
|
return cmpt < 0
|
||||||
|
}
|
||||||
|
return cmpv < 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapDistinct computes the unique values in an iterator.
|
// MapDistinct computes the unique values in an iterator.
|
||||||
|
@ -944,8 +951,8 @@ type positionOut struct {
|
||||||
callArgs []string // ordered args in the call
|
callArgs []string // ordered args in the call
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *positionOut) lessKey(i, j int) bool {
|
func (p *positionOut) lessKey(a, b *PositionPoint) bool {
|
||||||
t1, t2 := p.points[i].Tags, p.points[j].Tags
|
t1, t2 := a.Tags, b.Tags
|
||||||
for _, k := range p.callArgs {
|
for _, k := range p.callArgs {
|
||||||
if t1[k] != t2[k] {
|
if t1[k] != t2[k] {
|
||||||
return t1[k] < t2[k]
|
return t1[k] < t2[k]
|
||||||
|
@ -954,6 +961,64 @@ func (p *positionOut) lessKey(i, j int) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// typeCompare compares the types of a and b and returns an arbitrary ordering.
|
||||||
|
// It returns -1 if type(a) < type(b) , 0 if type(a) == type(b), or 1 if type(a) > type(b), following the strcmp convention
|
||||||
|
// from C.
|
||||||
|
//
|
||||||
|
// If the types are not equal, then it will attempt to coerce them to floating point and return them in the last 2 arguments.
|
||||||
|
// If the type cannot be coerced to floating point, it is returned unaltered.
|
||||||
|
func typeCompare(a, b interface{}) (int, interface{}, interface{}) {
|
||||||
|
const (
|
||||||
|
stringWeight = iota
|
||||||
|
boolWeight
|
||||||
|
intWeight
|
||||||
|
floatWeight
|
||||||
|
)
|
||||||
|
|
||||||
|
va := reflect.ValueOf(a)
|
||||||
|
vb := reflect.ValueOf(b)
|
||||||
|
|
||||||
|
vakind := va.Type().Kind()
|
||||||
|
vbkind := vb.Type().Kind()
|
||||||
|
|
||||||
|
// same kind. Ordering is dependent on value
|
||||||
|
if vakind == vbkind {
|
||||||
|
return 0, a, b
|
||||||
|
}
|
||||||
|
wa, a := inferFloat(va)
|
||||||
|
wb, b := inferFloat(vb)
|
||||||
|
if wa < wb {
|
||||||
|
return -1, a, b
|
||||||
|
} else if wa == wb {
|
||||||
|
return 0, a, b
|
||||||
|
}
|
||||||
|
return 1, a, b
|
||||||
|
}
|
||||||
|
|
||||||
|
// returns a weighting and if applicable, the value coerced to a float
|
||||||
|
func inferFloat(v reflect.Value) (weight int, value interface{}) {
|
||||||
|
const (
|
||||||
|
stringWeight = iota
|
||||||
|
boolWeight
|
||||||
|
intWeight
|
||||||
|
floatWeight
|
||||||
|
)
|
||||||
|
kind := v.Kind()
|
||||||
|
switch kind {
|
||||||
|
case reflect.Uint64, reflect.Uint32, reflect.Uint16, reflect.Uint8:
|
||||||
|
return intWeight, float64(v.Uint())
|
||||||
|
case reflect.Int64, reflect.Int32, reflect.Int16, reflect.Int8:
|
||||||
|
return intWeight, float64(v.Int())
|
||||||
|
case reflect.Float64, reflect.Float32:
|
||||||
|
return floatWeight, v.Float()
|
||||||
|
case reflect.Bool:
|
||||||
|
return boolWeight, v.Interface()
|
||||||
|
case reflect.String:
|
||||||
|
return stringWeight, v.Interface()
|
||||||
|
}
|
||||||
|
panic(fmt.Sprintf("interfaceValues.Less - unreachable code; type was %T", v.Interface()))
|
||||||
|
}
|
||||||
|
|
||||||
func cmpFloat(a, b float64) int {
|
func cmpFloat(a, b float64) int {
|
||||||
if a == b {
|
if a == b {
|
||||||
return 0
|
return 0
|
||||||
|
@ -981,7 +1046,12 @@ func cmpUint(a, b uint64) int {
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
func interfaceCompare(a, b interface{}) int {
|
// valueCompare returns -1 if a < b , 0 if a == b, or 1 if a > b
|
||||||
|
// If the interfaces are 2 different types, then 0 is returned
|
||||||
|
func valueCompare(a, b interface{}) int {
|
||||||
|
if reflect.TypeOf(a).Kind() != reflect.TypeOf(b).Kind() {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
// compare by float64/int64 first as that is the most likely match
|
// compare by float64/int64 first as that is the most likely match
|
||||||
{
|
{
|
||||||
d1, ok1 := a.(float64)
|
d1, ok1 := a.(float64)
|
||||||
|
@ -1084,69 +1154,7 @@ func interfaceCompare(a, b interface{}) int {
|
||||||
return strings.Compare(d1, d2)
|
return strings.Compare(d1, d2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
panic(fmt.Sprintf("unreachable code; types were %T, %T", a, b))
|
||||||
// Types did not match, need to sort based on arbitrary weighting of type
|
|
||||||
const (
|
|
||||||
stringWeight = iota
|
|
||||||
boolWeight
|
|
||||||
intWeight
|
|
||||||
floatWeight
|
|
||||||
)
|
|
||||||
|
|
||||||
infer := func(val interface{}) (int, float64) {
|
|
||||||
switch v := val.(type) {
|
|
||||||
case uint64:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case uint32:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case uint16:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case uint8:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case int64:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case int32:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case int16:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case int8:
|
|
||||||
return intWeight, float64(v)
|
|
||||||
case float64:
|
|
||||||
return floatWeight, float64(v)
|
|
||||||
case float32:
|
|
||||||
return floatWeight, float64(v)
|
|
||||||
case bool:
|
|
||||||
return boolWeight, 0
|
|
||||||
case string:
|
|
||||||
return stringWeight, 0
|
|
||||||
}
|
|
||||||
panic("interfaceValues.Less - unreachable code")
|
|
||||||
}
|
|
||||||
|
|
||||||
w1, n1 := infer(a)
|
|
||||||
w2, n2 := infer(b)
|
|
||||||
|
|
||||||
// If we had "numeric" data, use that for comparison
|
|
||||||
if (w1 == floatWeight || w1 == intWeight) && (w2 == floatWeight || w2 == intWeight) {
|
|
||||||
cmp := cmpFloat(n1, n2)
|
|
||||||
// break ties
|
|
||||||
if cmp == 0 {
|
|
||||||
if w1 < w2 {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
return 1
|
|
||||||
}
|
|
||||||
return cmp
|
|
||||||
}
|
|
||||||
|
|
||||||
if w1 == w2 {
|
|
||||||
// this should never happen, since equal weight means
|
|
||||||
// it should have been handled at the start of this function.
|
|
||||||
panic("unreachable")
|
|
||||||
} else if w1 < w2 {
|
|
||||||
return -1
|
|
||||||
}
|
|
||||||
return 1
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type PositionPoints []PositionPoint
|
type PositionPoints []PositionPoint
|
||||||
|
@ -1157,24 +1165,49 @@ type PositionPoint struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type topMapOut struct {
|
type topMapOut struct {
|
||||||
positionOut
|
*positionOut
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t topMapOut) Len() int { return len(t.points) }
|
func (t *topMapOut) Len() int { return len(t.points) }
|
||||||
func (t topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
|
func (t *topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
|
||||||
func (t topMapOut) Less(i, j int) bool {
|
func (t *topMapOut) Less(i, j int) bool {
|
||||||
|
return t.positionPointLess(&t.points[i], &t.points[j])
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *topMapOut) positionPointLess(pa, pb *PositionPoint) bool {
|
||||||
// old C trick makes this code easier to read. Imagine
|
// old C trick makes this code easier to read. Imagine
|
||||||
// that the OP in "cmp(i, j) OP 0" is the comparison you want
|
// that the OP in "cmp(i, j) OP 0" is the comparison you want
|
||||||
// between i and j
|
// between i and j
|
||||||
cmp := interfaceCompare(t.points[i].Value, t.points[j].Value)
|
cmpt, a, b := typeCompare(pa.Value, pb.Value)
|
||||||
if cmp != 0 {
|
cmpv := valueCompare(a, b)
|
||||||
return cmp > 0
|
if cmpv != 0 {
|
||||||
|
return cmpv < 0
|
||||||
}
|
}
|
||||||
k1, k2 := t.points[i].Time, t.points[j].Time
|
if cmpt != 0 {
|
||||||
|
return cmpt < 0
|
||||||
|
}
|
||||||
|
k1, k2 := pa.Time, pb.Time
|
||||||
if k1 != k2 {
|
if k1 != k2 {
|
||||||
return k1 < k2
|
return k1 > k2
|
||||||
}
|
}
|
||||||
return t.lessKey(i, j)
|
return !t.lessKey(pa, pb)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We never use this function, so make it a no-op.
|
||||||
|
func (t *topMapOut) Push(i interface{}) {
|
||||||
|
panic("someone used the function")
|
||||||
|
}
|
||||||
|
|
||||||
|
// this function doesn't return anything meaningful, since we don't look at the
|
||||||
|
// return value and we don't want to allocate for generating an interface.
|
||||||
|
func (t *topMapOut) Pop() interface{} {
|
||||||
|
t.points = t.points[:len(t.points)-1]
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *topMapOut) insert(p PositionPoint) {
|
||||||
|
t.points[0] = p
|
||||||
|
heap.Fix(t, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
type topReduceOut struct {
|
type topReduceOut struct {
|
||||||
|
@ -1190,11 +1223,15 @@ func (t topReduceOut) Less(i, j int) bool {
|
||||||
if k1 != k2 {
|
if k1 != k2 {
|
||||||
return k1 < k2
|
return k1 < k2
|
||||||
}
|
}
|
||||||
cmp := interfaceCompare(t.points[i].Value, t.points[j].Value)
|
cmpt, a, b := typeCompare(t.points[i].Value, t.points[j].Value)
|
||||||
if cmp != 0 {
|
cmpv := valueCompare(a, b)
|
||||||
return cmp > 0
|
if cmpv != 0 {
|
||||||
|
return cmpv > 0
|
||||||
}
|
}
|
||||||
return t.lessKey(i, j)
|
if cmpt != 0 {
|
||||||
|
return cmpt < 0
|
||||||
|
}
|
||||||
|
return t.lessKey(&t.points[i], &t.points[j])
|
||||||
}
|
}
|
||||||
|
|
||||||
// callArgs will get any additional field/tag names that may be needed to sort with
|
// callArgs will get any additional field/tag names that may be needed to sort with
|
||||||
|
@ -1210,143 +1247,181 @@ func topCallArgs(c *influxql.Call) []string {
|
||||||
return names
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tagkeytop(args []string, fields map[string]interface{}, keys map[string]string) string {
|
||||||
|
key := ""
|
||||||
|
for _, a := range args {
|
||||||
|
if v, ok := fields[a]; ok {
|
||||||
|
key += a + ":" + fmt.Sprintf("%v", v) + ","
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if v, ok := keys[a]; ok {
|
||||||
|
key += a + ":" + v + ","
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return key
|
||||||
|
}
|
||||||
|
|
||||||
|
// map iterator. We need this for the top
|
||||||
|
// query, but luckily that doesn't require ordered
|
||||||
|
// iteration, so we can fake it
|
||||||
|
type mapIter struct {
|
||||||
|
m map[string]PositionPoint
|
||||||
|
currTags map[string]string
|
||||||
|
tmin int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mapIter) TMin() int64 {
|
||||||
|
return m.tmin
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mapIter) Tags() map[string]string {
|
||||||
|
return m.currTags
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mapIter) Next() (time int64, value interface{}) {
|
||||||
|
// this is a bit ugly, but can't think of any other way that doesn't involve dumping
|
||||||
|
// the entire map to an array
|
||||||
|
for key, p := range m.m {
|
||||||
|
m.currTags = p.Tags
|
||||||
|
time = p.Time
|
||||||
|
value = p.Value
|
||||||
|
delete(m.m, key)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return -1, nil
|
||||||
|
}
|
||||||
|
|
||||||
// MapTop emits the top data points for each group by interval
|
// MapTop emits the top data points for each group by interval
|
||||||
func MapTop(itr iterator, c *influxql.Call) interface{} {
|
func MapTop(itr iterator, c *influxql.Call) interface{} {
|
||||||
// Capture the limit if it was specified in the call
|
// Capture the limit if it was specified in the call
|
||||||
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
|
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
|
||||||
limit := int64(lit.Val)
|
limit := int(lit.Val)
|
||||||
|
|
||||||
// Simple case where only value and limit are specified.
|
out := positionOut{callArgs: topCallArgs(c)}
|
||||||
if len(c.Args) == 2 {
|
out.points = make([]PositionPoint, 0, limit)
|
||||||
out := positionOut{callArgs: topCallArgs(c)}
|
minheap := topMapOut{&out}
|
||||||
|
tagmap := make(map[string]PositionPoint)
|
||||||
|
|
||||||
|
// buffer so we don't allocate every time through
|
||||||
|
var pp PositionPoint
|
||||||
|
if len(c.Args) > 2 {
|
||||||
|
// this is a tag aggregating query.
|
||||||
|
// For each unique permutation of the tags given,
|
||||||
|
// select the max and then fall through to select top of those
|
||||||
|
// points
|
||||||
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
||||||
t := k
|
pp = PositionPoint{k, v, itr.Tags()}
|
||||||
if bt := itr.TMin(); bt > -1 {
|
callArgs := c.Fields()
|
||||||
t = bt
|
tags := itr.Tags()
|
||||||
|
// TODO in the future we need to send in fields as well
|
||||||
|
// this will allow a user to query on both fields and tags
|
||||||
|
// fields will take the priority over tags if there is a name collision
|
||||||
|
key := tagkeytop(callArgs, nil, tags)
|
||||||
|
p, ok := tagmap[key]
|
||||||
|
if !ok || minheap.positionPointLess(&p, &pp) {
|
||||||
|
tagmap[key] = pp
|
||||||
}
|
}
|
||||||
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
|
|
||||||
}
|
}
|
||||||
|
itr = &mapIter{
|
||||||
// If we have more than we asked for, only send back the top values
|
m: tagmap,
|
||||||
if int64(len(out.points)) > limit {
|
tmin: itr.TMin(),
|
||||||
sort.Sort(topMapOut{out})
|
|
||||||
out.points = out.points[:limit]
|
|
||||||
}
|
}
|
||||||
if len(out.points) > 0 {
|
|
||||||
return out.points
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
// They specified tags in the call to get unique sets, so we need to map them as we accumulate them
|
|
||||||
outMap := make(map[string]positionOut)
|
|
||||||
|
|
||||||
mapKey := func(args []string, fields map[string]interface{}, keys map[string]string) string {
|
|
||||||
key := ""
|
|
||||||
for _, a := range args {
|
|
||||||
if v, ok := fields[a]; ok {
|
|
||||||
key += a + ":" + fmt.Sprintf("%v", v) + ","
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if v, ok := keys[a]; ok {
|
|
||||||
key += a + ":" + v + ","
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return key
|
|
||||||
}
|
|
||||||
|
|
||||||
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
||||||
t := k
|
t := k
|
||||||
if bt := itr.TMin(); bt > -1 {
|
if bt := itr.TMin(); bt > -1 {
|
||||||
t = bt
|
t = bt
|
||||||
}
|
}
|
||||||
callArgs := c.Fields()
|
if len(out.points) < limit {
|
||||||
tags := itr.Tags()
|
|
||||||
// TODO in the future we need to send in fields as well
|
|
||||||
// this will allow a user to query on both fields and tags
|
|
||||||
// fields will take the priority over tags if there is a name collision
|
|
||||||
key := mapKey(callArgs, nil, tags)
|
|
||||||
if out, ok := outMap[key]; ok {
|
|
||||||
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
|
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
|
||||||
outMap[key] = out
|
if len(out.points) == limit {
|
||||||
|
heap.Init(&minheap)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
out = positionOut{callArgs: topCallArgs(c)}
|
// we're over the limit, so find out if we're bigger than the
|
||||||
out.points = append(out.points, PositionPoint{t, v, itr.Tags()})
|
// smallest point in the set and eject it if we are
|
||||||
outMap[key] = out
|
minval := &out.points[0]
|
||||||
}
|
pp = PositionPoint{t, v, itr.Tags()}
|
||||||
}
|
if minheap.positionPointLess(minval, &pp) {
|
||||||
// Sort all the maps
|
minheap.insert(pp)
|
||||||
for k, v := range outMap {
|
|
||||||
sort.Sort(topMapOut{v})
|
|
||||||
outMap[k] = v
|
|
||||||
}
|
|
||||||
|
|
||||||
slice := func(needed int64, m map[string]positionOut) PositionPoints {
|
|
||||||
points := PositionPoints{}
|
|
||||||
var collected int64
|
|
||||||
for k, v := range m {
|
|
||||||
if len(v.points) > 0 {
|
|
||||||
points = append(points, v.points[0])
|
|
||||||
v.points = v.points[1:]
|
|
||||||
m[k] = v
|
|
||||||
collected++
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
o := positionOut{callArgs: topCallArgs(c), points: points}
|
|
||||||
sort.Sort(topMapOut{o})
|
|
||||||
points = o.points
|
|
||||||
// If we got more than we needed, sort them and return the top
|
|
||||||
if collected > needed {
|
|
||||||
points = o.points[:needed]
|
|
||||||
}
|
|
||||||
|
|
||||||
return points
|
|
||||||
}
|
}
|
||||||
|
// should only happen on empty iterator.
|
||||||
points := PositionPoints{}
|
if len(out.points) == 0 {
|
||||||
var collected int64
|
return nil
|
||||||
for collected < limit {
|
} else if len(out.points) < limit {
|
||||||
p := slice(limit-collected, outMap)
|
// it would be as fast to just sort regularly here,
|
||||||
if len(p) == 0 {
|
// but falling down to the heapsort will mean we can get
|
||||||
break
|
// rid of another sort order.
|
||||||
}
|
heap.Init(&minheap)
|
||||||
points = append(points, p...)
|
|
||||||
collected += int64(len(p))
|
|
||||||
}
|
}
|
||||||
if len(points) > 0 {
|
// minheap should now contain the largest values that were encountered
|
||||||
return points
|
// during iteration.
|
||||||
|
//
|
||||||
|
// we want these values in ascending sorted order. We can achieve this by iteratively
|
||||||
|
// removing the lowest element and putting it at the end of the array. This is analogous
|
||||||
|
// to a heap sort.
|
||||||
|
//
|
||||||
|
// computer science is fun!
|
||||||
|
result := out.points
|
||||||
|
for len(out.points) > 0 {
|
||||||
|
p := out.points[0]
|
||||||
|
heap.Pop(&minheap)
|
||||||
|
// reslice so that we can get to the element just after the heap
|
||||||
|
endslice := out.points[:len(out.points)+1]
|
||||||
|
endslice[len(endslice)-1] = p
|
||||||
}
|
}
|
||||||
return nil
|
// the ascending order is now in the result slice
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReduceTop computes the top values for each key.
|
// ReduceTop computes the top values for each key.
|
||||||
|
// This function assumes that its inputs are in sorted ascending order.
|
||||||
func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
|
func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
|
||||||
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
|
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
|
||||||
limit := int64(lit.Val)
|
limit := int(lit.Val)
|
||||||
|
|
||||||
out := positionOut{callArgs: topCallArgs(c)}
|
out := positionOut{callArgs: topCallArgs(c)}
|
||||||
|
minheap := topMapOut{&out}
|
||||||
|
results := make([]PositionPoints, 0, len(values))
|
||||||
|
out.points = make([]PositionPoint, 0, limit)
|
||||||
for _, v := range values {
|
for _, v := range values {
|
||||||
if v == nil {
|
if v == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
o, _ := v.(PositionPoints)
|
o, ok := v.(PositionPoints)
|
||||||
out.points = append(out.points, o...)
|
if ok {
|
||||||
|
results = append(results, o)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
// These ranges are all in sorted ascending order
|
||||||
// Get the top of the top values
|
// so we can grab the top value out of all of them
|
||||||
sort.Sort(topMapOut{out})
|
// to figure out the top X ones.
|
||||||
// If we have more than we asked for, only send back the top values
|
for i := 0; i < limit; i++ {
|
||||||
if int64(len(out.points)) > limit {
|
var max *PositionPoint
|
||||||
out.points = out.points[:limit]
|
whichselected := -1
|
||||||
|
for iter, v := range results {
|
||||||
|
if len(v) > 0 && (max == nil || minheap.positionPointLess(max, &v[0])) {
|
||||||
|
max = &v[0]
|
||||||
|
whichselected = iter
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if whichselected == -1 {
|
||||||
|
// none of the points have any values
|
||||||
|
// so we can return what we have now
|
||||||
|
sort.Sort(topReduceOut{out})
|
||||||
|
return out.points
|
||||||
|
}
|
||||||
|
v := results[whichselected]
|
||||||
|
out.points = append(out.points, v[0])
|
||||||
|
results[whichselected] = v[1:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// now we need to resort the tops by time
|
// now we need to resort the tops by time
|
||||||
sort.Sort(topReduceOut{out})
|
sort.Sort(topReduceOut{out})
|
||||||
if len(out.points) > 0 {
|
return out.points
|
||||||
return out.points
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MapEcho emits the data points for each group by interval
|
// MapEcho emits the data points for each group by interval
|
||||||
|
|
|
@ -539,7 +539,7 @@ func TestMapTop(t *testing.T) {
|
||||||
PositionPoint{20, int64(99), map[string]string{"host": "a"}},
|
PositionPoint{20, int64(99), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
|
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "int64 - tie on value, time, resolve based on tags",
|
name: "int64 - tie on value, time, resolve based on tags",
|
||||||
|
@ -657,8 +657,8 @@ func TestReduceTop(t *testing.T) {
|
||||||
values: []interface{}{
|
values: []interface{}{
|
||||||
PositionPoints{
|
PositionPoints{
|
||||||
{10, int64(99), map[string]string{"host": "a"}},
|
{10, int64(99), map[string]string{"host": "a"}},
|
||||||
{10, int64(53), map[string]string{"host": "b"}},
|
|
||||||
{20, int64(88), map[string]string{"host": "a"}},
|
{20, int64(88), map[string]string{"host": "a"}},
|
||||||
|
{10, int64(53), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
exp: PositionPoints{
|
exp: PositionPoints{
|
||||||
|
@ -674,8 +674,8 @@ func TestReduceTop(t *testing.T) {
|
||||||
{10, int64(99), map[string]string{"host": "a"}},
|
{10, int64(99), map[string]string{"host": "a"}},
|
||||||
},
|
},
|
||||||
PositionPoints{
|
PositionPoints{
|
||||||
{10, int64(53), map[string]string{"host": "b"}},
|
|
||||||
{20, int64(88), map[string]string{"host": "a"}},
|
{20, int64(88), map[string]string{"host": "a"}},
|
||||||
|
{10, int64(53), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
exp: PositionPoints{
|
exp: PositionPoints{
|
||||||
|
@ -689,8 +689,8 @@ func TestReduceTop(t *testing.T) {
|
||||||
values: []interface{}{
|
values: []interface{}{
|
||||||
PositionPoints{
|
PositionPoints{
|
||||||
{10, int64(99), map[string]string{"host": "a"}},
|
{10, int64(99), map[string]string{"host": "a"}},
|
||||||
{10, int64(53), map[string]string{"host": "b"}},
|
|
||||||
{20, int64(88), map[string]string{"host": "a"}},
|
{20, int64(88), map[string]string{"host": "a"}},
|
||||||
|
{10, int64(53), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
nil,
|
nil,
|
||||||
},
|
},
|
||||||
|
@ -705,8 +705,8 @@ func TestReduceTop(t *testing.T) {
|
||||||
values: []interface{}{
|
values: []interface{}{
|
||||||
PositionPoints{
|
PositionPoints{
|
||||||
{10, int64(99), map[string]string{"host": "a"}},
|
{10, int64(99), map[string]string{"host": "a"}},
|
||||||
{10, int64(53), map[string]string{"host": "b"}},
|
|
||||||
{20, int64(88), map[string]string{}},
|
{20, int64(88), map[string]string{}},
|
||||||
|
{10, int64(53), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
nil,
|
nil,
|
||||||
},
|
},
|
||||||
|
@ -722,8 +722,8 @@ func TestReduceTop(t *testing.T) {
|
||||||
values: []interface{}{
|
values: []interface{}{
|
||||||
PositionPoints{
|
PositionPoints{
|
||||||
{10, int64(99), map[string]string{"host": "a"}},
|
{10, int64(99), map[string]string{"host": "a"}},
|
||||||
{10, int64(53), map[string]string{"host": "b"}},
|
|
||||||
{20, int64(88), map[string]string{}},
|
{20, int64(88), map[string]string{}},
|
||||||
|
{10, int64(53), map[string]string{"host": "b"}},
|
||||||
},
|
},
|
||||||
nil,
|
nil,
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue