refactor MapTopBottom signature

pull/4202/head
Cory LaNou 2015-09-23 10:53:21 -05:00
parent 7c3a542e13
commit 81ad1f87a4
2 changed files with 18 additions and 12 deletions

View File

@ -88,8 +88,14 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
}, nil
case "top", "bottom":
// Capture information from the call that the Map function will require
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int(lit.Val)
args := topCallArgs(c)
fields := c.Fields()
return func(itr Iterator) interface{} {
return MapTopBottom(itr, c)
return MapTopBottom(itr, limit, args, fields, len(c.Args), c.Name)
}, nil
case "percentile":
return MapEcho, nil
@ -1445,16 +1451,12 @@ func (m *mapIter) Next() (time int64, value interface{}) {
}
// MapTopBottom emits the top/bottom data points for each group by interval
func MapTopBottom(itr Iterator, c *influxql.Call) interface{} {
// Capture the limit if it was specified in the call
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int(lit.Val)
out := positionOut{callArgs: topCallArgs(c)}
func MapTopBottom(itr Iterator, limit int, callArgs, fields []string, argCount int, callName string) interface{} {
out := positionOut{callArgs: callArgs}
out.points = make([]PositionPoint, 0, limit)
minheap := topBottomMapOut{
&out,
c.Name == "bottom",
callName == "bottom",
}
tagmap := make(map[string]PositionPoint)
@ -1464,19 +1466,18 @@ func MapTopBottom(itr Iterator, c *influxql.Call) interface{} {
// buffer so we don't allocate every time through
var pp PositionPoint
if len(c.Args) > 2 {
if argCount > 2 {
// this is a tag aggregating query.
// For each unique permutation of the tags given,
// select the max and then fall through to select top of those
// points
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
pp = PositionPoint{k, v, itr.Fields(), itr.Tags()}
callArgs := c.Fields()
tags := itr.Tags()
// TODO in the future we need to send in fields as well
// this will allow a user to query on both fields and tags
// fields will take the priority over tags if there is a name collision
key := tagkeytop(callArgs, nil, tags)
key := tagkeytop(fields, nil, tags)
p, ok := tagmap[key]
if !ok || minheap.positionPointLess(&p, &pp) {
tagmap[key] = pp

View File

@ -783,7 +783,12 @@ func TestMapTopBottom(t *testing.T) {
if test.skip {
continue
}
values := MapTopBottom(test.iter, test.call).(PositionPoints)
lit, _ := test.call.Args[len(test.call.Args)-1].(*influxql.NumberLiteral)
limit := int(lit.Val)
args := topCallArgs(test.call)
fields := test.call.Fields()
values := MapTopBottom(test.iter, limit, args, fields, len(test.call.Args), test.call.Name).(PositionPoints)
t.Logf("Test: %s", test.name)
if exp, got := len(test.exp.points), len(values); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)