Merge pull request #4161 from influxdb/bottom

Implement bottom
pull/4184/head
Daniel Morsing 2015-09-21 12:42:15 +00:00
commit 49327f69f1
3 changed files with 278 additions and 42 deletions

View File

@ -4,6 +4,7 @@
- [#4141](https://github.com/influxdb/influxdb/pull/4141): Control whether each query should be logged
- [#4065](https://github.com/influxdb/influxdb/pull/4065): Added precision support in cmd client. Thanks @sbouchex
- [#4140](https://github.com/influxdb/influxdb/pull/4140): Make storage engine configurable
- [#4161](https://github.com/influxdb/influxdb/pull/4161): Implement bottom selector function
### Bugfixes
- [#3457](https://github.com/influxdb/influxdb/issues/3457): [0.9.3] cannot select field names with prefix + "." that match the measurement name

View File

@ -77,9 +77,9 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
return MapFirst, nil
case "last":
return MapLast, nil
case "top":
case "top", "bottom":
return func(itr iterator) interface{} {
return MapTop(itr, c)
return MapTopBottom(itr, c)
}, nil
case "percentile":
return MapEcho, nil
@ -129,9 +129,9 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) {
return ReduceFirst, nil
case "last":
return ReduceLast, nil
case "top":
case "top", "bottom":
return func(values []interface{}) interface{} {
return ReduceTop(values, c)
return ReduceTopBottom(values, c)
}, nil
case "percentile":
return func(values []interface{}) interface{} {
@ -1164,23 +1164,27 @@ type PositionPoint struct {
Tags map[string]string
}
type topMapOut struct {
type topBottomMapOut struct {
*positionOut
bottom bool
}
func (t *topMapOut) Len() int { return len(t.points) }
func (t *topMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t *topMapOut) Less(i, j int) bool {
func (t *topBottomMapOut) Len() int { return len(t.points) }
func (t *topBottomMapOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t *topBottomMapOut) Less(i, j int) bool {
return t.positionPointLess(&t.points[i], &t.points[j])
}
func (t *topMapOut) positionPointLess(pa, pb *PositionPoint) bool {
func (t *topBottomMapOut) positionPointLess(pa, pb *PositionPoint) bool {
// old C trick makes this code easier to read. Imagine
// that the OP in "cmp(i, j) OP 0" is the comparison you want
// between i and j
cmpt, a, b := typeCompare(pa.Value, pb.Value)
cmpv := valueCompare(a, b)
if cmpv != 0 {
if t.bottom {
return cmpv > 0
}
return cmpv < 0
}
if cmpt != 0 {
@ -1194,29 +1198,30 @@ func (t *topMapOut) positionPointLess(pa, pb *PositionPoint) bool {
}
// We never use this function, so make it a no-op.
func (t *topMapOut) Push(i interface{}) {
func (t *topBottomMapOut) Push(i interface{}) {
panic("someone used the function")
}
// this function doesn't return anything meaningful, since we don't look at the
// return value and we don't want to allocate for generating an interface.
func (t *topMapOut) Pop() interface{} {
func (t *topBottomMapOut) Pop() interface{} {
t.points = t.points[:len(t.points)-1]
return nil
}
func (t *topMapOut) insert(p PositionPoint) {
func (t *topBottomMapOut) insert(p PositionPoint) {
t.points[0] = p
heap.Fix(t, 0)
}
type topReduceOut struct {
type topBottomReduceOut struct {
positionOut
bottom bool
}
func (t topReduceOut) Len() int { return len(t.points) }
func (t topReduceOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t topReduceOut) Less(i, j int) bool {
func (t topBottomReduceOut) Len() int { return len(t.points) }
func (t topBottomReduceOut) Swap(i, j int) { t.points[i], t.points[j] = t.points[j], t.points[i] }
func (t topBottomReduceOut) Less(i, j int) bool {
// Now sort by time first, not value
k1, k2 := t.points[i].Time, t.points[j].Time
@ -1226,6 +1231,9 @@ func (t topReduceOut) Less(i, j int) bool {
cmpt, a, b := typeCompare(t.points[i].Value, t.points[j].Value)
cmpv := valueCompare(a, b)
if cmpv != 0 {
if t.bottom {
return cmpv < 0
}
return cmpv > 0
}
if cmpt != 0 {
@ -1292,17 +1300,24 @@ func (m *mapIter) Next() (time int64, value interface{}) {
return -1, nil
}
// MapTop emits the top data points for each group by interval
func MapTop(itr iterator, c *influxql.Call) interface{} {
// MapTopBottom emits the top/bottom data points for each group by interval
func MapTopBottom(itr iterator, c *influxql.Call) interface{} {
// Capture the limit if it was specified in the call
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int(lit.Val)
out := positionOut{callArgs: topCallArgs(c)}
out.points = make([]PositionPoint, 0, limit)
minheap := topMapOut{&out}
minheap := topBottomMapOut{
&out,
c.Name == "bottom",
}
tagmap := make(map[string]PositionPoint)
// throughout this function, we refer to max and top. This is by the ordering specified by
// minheap, not the ordering based on value. Since this function handles both top and bottom
// max can be the lowest valued entry.
// buffer so we don't allocate every time through
var pp PositionPoint
if len(c.Args) > 2 {
@ -1357,7 +1372,7 @@ func MapTop(itr iterator, c *influxql.Call) interface{} {
// rid of another sort order.
heap.Init(&minheap)
}
// minheap should now contain the largest values that were encountered
// minheap should now contain the largest/smallest values that were encountered
// during iteration.
//
// we want these values in ascending sorted order. We can achieve this by iteratively
@ -1379,12 +1394,12 @@ func MapTop(itr iterator, c *influxql.Call) interface{} {
// ReduceTop computes the top values for each key.
// This function assumes that its inputs are in sorted ascending order.
func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
func ReduceTopBottom(values []interface{}, c *influxql.Call) interface{} {
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int(lit.Val)
out := positionOut{callArgs: topCallArgs(c)}
minheap := topMapOut{&out}
minheap := topBottomMapOut{&out, c.Name == "bottom"}
results := make([]PositionPoints, 0, len(values))
out.points = make([]PositionPoint, 0, limit)
for _, v := range values {
@ -1411,7 +1426,7 @@ func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
if whichselected == -1 {
// none of the points have any values
// so we can return what we have now
sort.Sort(topReduceOut{out})
sort.Sort(topBottomReduceOut{out, c.Name == "bottom"})
return out.points
}
v := results[whichselected]
@ -1420,7 +1435,7 @@ func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
}
// now we need to resort the tops by time
sort.Sort(topReduceOut{out})
sort.Sort(topBottomReduceOut{out, c.Name == "bottom"})
return out.points
}

View File

@ -480,7 +480,7 @@ func BenchmarkGetSortedRangeBySort(b *testing.B) {
benchGetSortedRangeResults = results
}
func TestMapTop(t *testing.T) {
func TestMapTopBottom(t *testing.T) {
tests := []struct {
name string
skip bool
@ -489,7 +489,7 @@ func TestMapTop(t *testing.T) {
call *influxql.Call
}{
{
name: "int64 - basic",
name: "top int64 - basic",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
@ -506,7 +506,7 @@ func TestMapTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - basic with tag",
name: "top int64 - basic with tag",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
@ -524,7 +524,7 @@ func TestMapTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - tie on value, resolve based on time",
name: "top int64 - tie on value, resolve based on time",
iter: &testIterator{
values: []testPoint{
{"", 20, int64(99), map[string]string{"host": "a"}},
@ -542,7 +542,7 @@ func TestMapTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - tie on value, time, resolve based on tags",
name: "top int64 - tie on value, time, resolve based on tags",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "b"}},
@ -560,7 +560,7 @@ func TestMapTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints",
name: "top mixed numerics - ints",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
@ -577,7 +577,7 @@ func TestMapTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints & floats",
name: "top mixed numerics - ints & floats",
iter: &testIterator{
values: []testPoint{
{"", 10, float64(99), map[string]string{"host": "a"}},
@ -594,7 +594,7 @@ func TestMapTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints, floats, & strings",
name: "top mixed numerics - ints, floats, & strings",
iter: &testIterator{
values: []testPoint{
{"", 10, float64(99), map[string]string{"host": "a"}},
@ -611,7 +611,7 @@ func TestMapTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bools",
name: "top bools",
iter: &testIterator{
values: []testPoint{
{"", 10, true, map[string]string{"host": "a"}},
@ -627,13 +627,152 @@ func TestMapTop(t *testing.T) {
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - basic",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, int64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - basic with tag",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(20), map[string]string{"host": "a"}},
{"", 20, int64(53), map[string]string{"host": "b"}},
{"", 30, int64(30), map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(20), map[string]string{"host": "a"}},
PositionPoint{20, int64(53), map[string]string{"host": "b"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - tie on value, resolve based on time",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(53), map[string]string{"host": "a"}},
{"", 20, int64(53), map[string]string{"host": "a"}},
{"", 20, int64(53), map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "a"}},
PositionPoint{20, int64(53), map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - tie on value, time, resolve based on tags",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "b"}},
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 20, int64(100), map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{10, int64(99), map[string]string{"host": "b"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom mixed numerics - ints",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, uint64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom mixed numerics - ints & floats",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 10, float64(53), map[string]string{"host": "b"}},
{"", 20, uint64(88), map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, float64(53), map[string]string{"host": "b"}},
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom mixed numerics - ints, floats, & strings",
iter: &testIterator{
values: []testPoint{
{"", 10, float64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, "88", map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
PositionPoint{10, float64(99), map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom bools",
iter: &testIterator{
values: []testPoint{
{"", 10, true, map[string]string{"host": "a"}},
{"", 10, true, map[string]string{"host": "b"}},
{"", 20, false, map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{20, false, map[string]string{"host": "a"}},
PositionPoint{10, true, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
}
for _, test := range tests {
if test.skip {
continue
}
values := MapTop(test.iter, test.call).(PositionPoints)
values := MapTopBottom(test.iter, test.call).(PositionPoints)
t.Logf("Test: %s", test.name)
if exp, got := len(test.exp.points), len(values); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)
@ -644,7 +783,7 @@ func TestMapTop(t *testing.T) {
}
}
func TestReduceTop(t *testing.T) {
func TestReduceTopBottom(t *testing.T) {
tests := []struct {
name string
skip bool
@ -653,7 +792,7 @@ func TestReduceTop(t *testing.T) {
call *influxql.Call
}{
{
name: "int64 - single map",
name: "top int64 - single map",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
@ -668,7 +807,7 @@ func TestReduceTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map",
name: "top int64 - double map",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
@ -685,7 +824,7 @@ func TestReduceTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map with nil",
name: "top int64 - double map with nil",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
@ -701,7 +840,7 @@ func TestReduceTop(t *testing.T) {
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map with non-matching tags and tag selected",
name: "top int64 - double map with non-matching tags and tag selected",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
@ -718,7 +857,7 @@ func TestReduceTop(t *testing.T) {
},
{
skip: true,
name: "int64 - double map with non-matching tags",
name: "top int64 - double map with non-matching tags",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
@ -731,7 +870,88 @@ func TestReduceTop(t *testing.T) {
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(55), map[string]string{"host": "b"}},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - single map",
values: []interface{}{
PositionPoints{
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
{10, int64(99), map[string]string{"host": "a"}},
},
},
exp: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - double map",
values: []interface{}{
PositionPoints{
{10, int64(99), map[string]string{"host": "a"}},
},
PositionPoints{
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
},
},
exp: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - double map with nil",
values: []interface{}{
PositionPoints{
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{"host": "a"}},
{10, int64(99), map[string]string{"host": "a"}},
},
nil,
},
exp: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - double map with non-matching tags and tag selected",
values: []interface{}{
PositionPoints{
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{}},
{10, int64(99), map[string]string{"host": "a"}},
},
nil,
},
exp: PositionPoints{
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
PositionPoint{20, int64(88), map[string]string{}},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
skip: true,
name: "bottom int64 - double map with non-matching tags",
values: []interface{}{
PositionPoints{
{10, int64(53), map[string]string{"host": "b"}},
{20, int64(88), map[string]string{}},
{10, int64(99), map[string]string{"host": "a"}},
},
nil,
},
exp: PositionPoints{
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(55), map[string]string{"host": "b"}},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
}
@ -739,7 +959,7 @@ func TestReduceTop(t *testing.T) {
if test.skip {
continue
}
values := ReduceTop(test.values, test.call)
values := ReduceTopBottom(test.values, test.call)
t.Logf("Test: %s", test.name)
if values != nil {
v, _ := values.(PositionPoints)