refactor map functions to use list of values

This commit changes `tsdb.mapFunc` to use `tsdb.MapInput` instead
of an iterator. This will make it easier and faster to pass blocks
of values from the new storage engine into the engine.
pull/4264/head
Ben Johnson 2015-09-28 16:34:10 -06:00
parent f79377b488
commit 343dd23ee7
5 changed files with 333 additions and 444 deletions

View File

@ -27,6 +27,7 @@
- [#4222](https://github.com/influxdb/influxdb/pull/4222): Graphite TCP connections should not block shutdown
- [#4180](https://github.com/influxdb/influxdb/pull/4180): Cursor & SelectMapper Refactor
- [#1577](https://github.com/influxdb/influxdb/issues/1577): selectors (e.g. min, max, first, last) should have equivalents to return the actual point
- [#4264](https://github.com/influxdb/influxdb/issues/4264): Refactor map functions to use list of values
## v0.9.4 [2015-09-14]

View File

@ -2573,6 +2573,7 @@ func TestServer_Query_AggregateSelectors(t *testing.T) {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {

View File

@ -19,7 +19,7 @@ import (
"github.com/influxdb/influxdb/influxql"
)
// iterator represents a forward-only iterator over a set of points.
// Iterator represents a forward-only iterator over a set of points.
// These are used by the mapFunctions in this file
type Iterator interface {
Next() (time int64, value interface{})
@ -28,9 +28,26 @@ type Iterator interface {
TMin() int64
}
type MapInput struct {
TMin int64
Items []MapItem
}
type MapItem struct {
Timestamp int64
Value interface{}
// TODO(benbjohnson):
// Move fields and tags up to MapInput. Currently the engine combines
// multiple series together during processing. This needs to be fixed so
// that each map function only operates on a single series at a time instead.
Fields map[string]interface{}
Tags map[string]string
}
// mapFunc represents a function used for mapping over a sequential series of data.
// The iterator represents a single group by interval
type mapFunc func(Iterator) interface{}
type mapFunc func(*MapInput) interface{}
// reduceFunc represents a function used for reducing mapper output.
type reduceFunc func([]interface{}) interface{}
@ -67,24 +84,24 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
case "median":
return MapStddev, nil
case "min":
return func(itr Iterator) interface{} {
return MapMin(itr, c.Fields()[0])
return func(input *MapInput) interface{} {
return MapMin(input, c.Fields()[0])
}, nil
case "max":
return func(itr Iterator) interface{} {
return MapMax(itr, c.Fields()[0])
return func(input *MapInput) interface{} {
return MapMax(input, c.Fields()[0])
}, nil
case "spread":
return MapSpread, nil
case "stddev":
return MapStddev, nil
case "first":
return func(itr Iterator) interface{} {
return MapFirst(itr, c.Fields()[0])
return func(input *MapInput) interface{} {
return MapFirst(input, c.Fields()[0])
}, nil
case "last":
return func(itr Iterator) interface{} {
return MapLast(itr, c.Fields()[0])
return func(input *MapInput) interface{} {
return MapLast(input, c.Fields()[0])
}, nil
case "top", "bottom":
@ -93,8 +110,8 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
limit := int(lit.Val)
fields := topCallArgs(c)
return func(itr Iterator) interface{} {
return MapTopBottom(itr, limit, fields, len(c.Args), c.Name)
return func(input *MapInput) interface{} {
return MapTopBottom(input, limit, fields, len(c.Args), c.Name)
}, nil
case "percentile":
return MapEcho, nil
@ -228,9 +245,9 @@ func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) {
}
// MapCount computes the number of values in an iterator.
func MapCount(itr Iterator) interface{} {
func MapCount(input *MapInput) interface{} {
n := float64(0)
for k, _ := itr.Next(); k != -1; k, _ = itr.Next() {
for range input.Items {
n++
}
if n > 0 {
@ -253,20 +270,19 @@ func (d interfaceValues) Less(i, j int) bool {
}
// MapDistinct computes the unique values in an iterator.
func MapDistinct(itr Iterator) interface{} {
var index = make(map[interface{}]struct{})
for time, value := itr.Next(); time != -1; time, value = itr.Next() {
index[value] = struct{}{}
func MapDistinct(input *MapInput) interface{} {
m := make(map[interface{}]struct{})
for _, item := range input.Items {
m[item.Value] = struct{}{}
}
if len(index) == 0 {
if len(m) == 0 {
return nil
}
results := make(interfaceValues, len(index))
results := make(interfaceValues, len(m))
var i int
for value, _ := range index {
for value, _ := range m {
results[i] = value
i++
}
@ -307,11 +323,11 @@ func ReduceDistinct(values []interface{}) interface{} {
}
// MapCountDistinct computes the unique count of values in an iterator.
func MapCountDistinct(itr Iterator) interface{} {
func MapCountDistinct(input *MapInput) interface{} {
var index = make(map[interface{}]struct{})
for time, value := itr.Next(); time != -1; time, value = itr.Next() {
index[value] = struct{}{}
for _, item := range input.Items {
index[item.Value] = struct{}{}
}
if len(index) == 0 {
@ -351,29 +367,31 @@ const (
)
// MapSum computes the summation of values in an iterator.
func MapSum(itr Iterator) interface{} {
func MapSum(input *MapInput) interface{} {
if len(input.Items) == 0 {
return nil
}
n := float64(0)
count := 0
var resultType NumberType
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
count++
switch n1 := v.(type) {
for _, item := range input.Items {
switch v := item.Value.(type) {
case float64:
n += n1
n += v
case int64:
n += float64(n1)
n += float64(v)
resultType = Int64Type
}
}
if count > 0 {
switch resultType {
case Float64Type:
return n
case Int64Type:
return int64(n)
}
switch resultType {
case Float64Type:
return n
case Int64Type:
return int64(n)
default:
return nil
}
return nil
}
// ReduceSum computes the sum of values for each key.
@ -406,25 +424,23 @@ func ReduceSum(values []interface{}) interface{} {
}
// MapMean computes the count and sum of values in an iterator to be combined by the reducer.
func MapMean(itr Iterator) interface{} {
out := &meanMapOutput{}
func MapMean(input *MapInput) interface{} {
if len(input.Items) == 0 {
return nil
}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
out := &meanMapOutput{}
for _, item := range input.Items {
out.Count++
switch n1 := v.(type) {
switch v := item.Value.(type) {
case float64:
out.Mean += (n1 - out.Mean) / float64(out.Count)
out.Mean += (v - out.Mean) / float64(out.Count)
case int64:
out.Mean += (float64(n1) - out.Mean) / float64(out.Count)
out.Mean += (float64(v) - out.Mean) / float64(out.Count)
out.ResultType = Int64Type
}
}
if out.Count > 0 {
return out
}
return nil
return out
}
type meanMapOutput struct {
@ -615,21 +631,21 @@ type minMaxMapOut struct {
}
// MapMin collects the values to pass to the reducer
func MapMin(itr Iterator, fieldName string) interface{} {
func MapMin(input *MapInput, fieldName string) interface{} {
min := &minMaxMapOut{}
pointsYielded := false
var val float64
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
switch n := v.(type) {
for _, item := range input.Items {
switch v := item.Value.(type) {
case float64:
val = n
val = v
case int64:
val = float64(n)
val = float64(v)
min.Type = Int64Type
case map[string]interface{}:
if d, t, ok := decodeValueAndNumberType(n[fieldName]); ok {
if d, t, ok := decodeValueAndNumberType(v[fieldName]); ok {
val, min.Type = d, t
} else {
continue
@ -638,19 +654,20 @@ func MapMin(itr Iterator, fieldName string) interface{} {
// Initialize min
if !pointsYielded {
min.Time = k
min.Time = item.Timestamp
min.Val = val
min.Fields = itr.Fields()
min.Tags = itr.Tags()
min.Fields = item.Fields
min.Tags = item.Tags
pointsYielded = true
}
current := min.Val
min.Val = math.Min(min.Val, val)
// Check to see if the value changed, if so, update the fields/tags
if current != min.Val {
min.Time = k
min.Fields = itr.Fields()
min.Tags = itr.Tags()
min.Time = item.Timestamp
min.Fields = item.Fields
min.Tags = item.Tags
}
}
if pointsYielded {
@ -724,21 +741,21 @@ func decodeValueAndNumberType(v interface{}) (float64, NumberType, bool) {
}
// MapMax collects the values to pass to the reducer
func MapMax(itr Iterator, fieldName string) interface{} {
func MapMax(input *MapInput, fieldName string) interface{} {
max := &minMaxMapOut{}
pointsYielded := false
var val float64
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
switch n := v.(type) {
for _, item := range input.Items {
switch v := item.Value.(type) {
case float64:
val = n
val = v
case int64:
val = float64(n)
val = float64(v)
max.Type = Int64Type
case map[string]interface{}:
if d, t, ok := decodeValueAndNumberType(n[fieldName]); ok {
if d, t, ok := decodeValueAndNumberType(v[fieldName]); ok {
val, max.Type = d, t
} else {
continue
@ -747,19 +764,20 @@ func MapMax(itr Iterator, fieldName string) interface{} {
// Initialize max
if !pointsYielded {
max.Time = k
max.Time = item.Timestamp
max.Val = val
max.Fields = itr.Fields()
max.Tags = itr.Tags()
max.Fields = item.Fields
max.Tags = item.Tags
pointsYielded = true
}
current := max.Val
max.Val = math.Max(max.Val, val)
// Check to see if the value changed, if so, update the fields/tags
if current != max.Val {
max.Time = k
max.Fields = itr.Fields()
max.Tags = itr.Tags()
max.Time = item.Timestamp
max.Fields = item.Fields
max.Tags = item.Tags
}
}
if pointsYielded {
@ -827,17 +845,17 @@ type spreadMapOutput struct {
}
// MapSpread collects the values to pass to the reducer
func MapSpread(itr Iterator) interface{} {
func MapSpread(input *MapInput) interface{} {
out := &spreadMapOutput{}
pointsYielded := false
var val float64
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
switch n := v.(type) {
for _, item := range input.Items {
switch v := item.Value.(type) {
case float64:
val = n
val = v
case int64:
val = float64(n)
val = float64(v)
out.Type = Int64Type
}
@ -888,19 +906,17 @@ func ReduceSpread(values []interface{}) interface{} {
}
// MapStddev collects the values to pass to the reducer
func MapStddev(itr Iterator) interface{} {
var values []float64
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
switch n := v.(type) {
func MapStddev(input *MapInput) interface{} {
var a []float64
for _, item := range input.Items {
switch v := item.Value.(type) {
case float64:
values = append(values, n)
a = append(a, v)
case int64:
values = append(values, float64(n))
a = append(a, float64(v))
}
}
return values
return a
}
// ReduceStddev computes the stddev of values.
@ -948,29 +964,35 @@ type firstLastMapOutput struct {
// MapFirst collects the values to pass to the reducer
// This function assumes time ordered input
func MapFirst(itr Iterator, fieldName string) interface{} {
var fields map[string]interface{}
k, v := itr.Next()
fields = itr.Fields()
if k == -1 {
func MapFirst(input *MapInput, fieldName string) interface{} {
if len(input.Items) == 0 {
return nil
}
k, v := input.Items[0].Timestamp, input.Items[0].Value
tags := input.Items[0].Tags
fields := input.Items[0].Fields
if n, ok := v.(map[string]interface{}); ok {
v = n[fieldName]
}
nextk, nextv := itr.Next()
if n, ok := nextv.(map[string]interface{}); ok {
nextv = n[fieldName]
}
for nextk == k {
// Find greatest value at same timestamp.
for _, item := range input.Items[1:] {
nextk, nextv := item.Timestamp, item.Value
if nextk != k {
break
}
if n, ok := nextv.(map[string]interface{}); ok {
nextv = n[fieldName]
}
if greaterThan(nextv, v) {
fields = itr.Fields()
fields = item.Fields
tags = item.Tags
v = nextv
}
nextk, nextv = itr.Next()
}
return &firstLastMapOutput{Time: k, Value: v, Fields: fields, Tags: itr.Tags()}
return &firstLastMapOutput{Time: k, Value: v, Fields: fields, Tags: tags}
}
// ReduceFirst computes the first of value.
@ -1014,31 +1036,33 @@ func ReduceFirst(values []interface{}) interface{} {
}
// MapLast collects the values to pass to the reducer
func MapLast(itr Iterator, fieldName string) interface{} {
func MapLast(input *MapInput, fieldName string) interface{} {
out := &firstLastMapOutput{}
pointsYielded := false
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
if n, ok := v.(map[string]interface{}); ok {
v = n[fieldName]
for _, item := range input.Items {
k, v := item.Timestamp, item.Value
if m, ok := v.(map[string]interface{}); ok {
v = m[fieldName]
}
// Initialize last
if !pointsYielded {
out.Time = k
out.Value = v
out.Fields = itr.Fields()
out.Tags = itr.Tags()
out.Fields = item.Fields
out.Tags = item.Tags
pointsYielded = true
}
if k > out.Time {
out.Time = k
out.Value = v
out.Fields = itr.Fields()
out.Tags = itr.Tags()
out.Fields = item.Fields
out.Tags = item.Tags
} else if k == out.Time && greaterThan(v, out.Value) {
out.Value = v
out.Fields = itr.Fields()
out.Tags = itr.Tags()
out.Fields = item.Fields
out.Tags = item.Tags
}
}
if pointsYielded {
@ -1455,7 +1479,7 @@ func (m *mapIter) Next() (time int64, value interface{}) {
}
// MapTopBottom emits the top/bottom data points for each group by interval
func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callName string) interface{} {
func MapTopBottom(input *MapInput, limit int, fields []string, argCount int, callName string) interface{} {
out := positionOut{callArgs: fields}
out.points = make([]PositionPoint, 0, limit)
minheap := topBottomMapOut{
@ -1475,9 +1499,15 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa
// For each unique permutation of the tags given,
// select the max and then fall through to select top of those
// points
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
pp = PositionPoint{k, v, itr.Fields(), itr.Tags()}
tags := itr.Tags()
for _, item := range input.Items {
pp = PositionPoint{
Time: item.Timestamp,
Value: item.Value,
Fields: item.Fields,
Tags: item.Tags,
}
tags := item.Tags
// TODO in the future we need to send in fields as well
// this will allow a user to query on both fields and tags
// fields will take the priority over tags if there is a name collision
@ -1487,18 +1517,24 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa
tagmap[key] = pp
}
}
itr = &mapIter{
m: tagmap,
tmin: itr.TMin(),
items := make([]MapItem, 0, len(tagmap))
for _, p := range tagmap {
items = append(items, MapItem{Timestamp: p.Time, Value: p.Value, Fields: p.Fields, Tags: p.Tags})
}
input = &MapInput{
TMin: input.TMin,
Items: items,
}
}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
t := k
if bt := itr.TMin(); bt > -1 {
t = bt
for _, item := range input.Items {
t := item.Timestamp
if input.TMin > -1 {
t = input.TMin
}
if len(out.points) < limit {
out.points = append(out.points, PositionPoint{t, v, itr.Fields(), itr.Tags()})
out.points = append(out.points, PositionPoint{t, item.Value, item.Fields, item.Tags})
if len(out.points) == limit {
heap.Init(&minheap)
}
@ -1506,12 +1542,13 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa
// we're over the limit, so find out if we're bigger than the
// smallest point in the set and eject it if we are
minval := &out.points[0]
pp = PositionPoint{t, v, itr.Fields(), itr.Tags()}
pp = PositionPoint{t, item.Value, item.Fields, item.Tags}
if minheap.positionPointLess(minval, &pp) {
minheap.insert(pp)
}
}
}
// should only happen on empty iterator.
if len(out.points) == 0 {
return nil
@ -1521,6 +1558,7 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa
// rid of another sort order.
heap.Init(&minheap)
}
// minheap should now contain the largest/smallest values that were encountered
// during iteration.
//
@ -1533,10 +1571,12 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa
for len(out.points) > 0 {
p := out.points[0]
heap.Pop(&minheap)
// reslice so that we can get to the element just after the heap
endslice := out.points[:len(out.points)+1]
endslice[len(endslice)-1] = p
}
// the ascending order is now in the result slice
return result
}
@ -1589,11 +1629,10 @@ func ReduceTopBottom(values []interface{}, c *influxql.Call) interface{} {
}
// MapEcho emits the data points for each group by interval
func MapEcho(itr Iterator) interface{} {
func MapEcho(input *MapInput) interface{} {
var values []interface{}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
values = append(values, v)
for _, item := range input.Items {
values = append(values, item.Value)
}
return values
}
@ -1645,11 +1684,10 @@ func IsNumeric(c *influxql.Call) bool {
}
// MapRawQuery is for queries without aggregates
func MapRawQuery(itr Iterator) interface{} {
func MapRawQuery(input *MapInput) interface{} {
var values []*rawQueryMapOutput
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
val := &rawQueryMapOutput{k, v}
values = append(values, val)
for _, item := range input.Items {
values = append(values, &rawQueryMapOutput{item.Timestamp, item.Value})
}
return values
}

View File

@ -11,63 +11,15 @@ import (
import "sort"
type testPoint struct {
seriesKey string
time int64
value interface{}
fields map[string]interface{}
tags map[string]string
}
type testIterator struct {
values []testPoint
lastFields map[string]interface{}
lastTags map[string]string
nextFunc func() (timestamp int64, value interface{})
fieldsFunc func() map[string]interface{}
tagsFunc func() map[string]string
tMinFunc func() int64
}
func (t *testIterator) Next() (timestamp int64, value interface{}) {
if t.nextFunc != nil {
return t.nextFunc()
}
if len(t.values) > 0 {
v := t.values[0]
t.lastFields = t.values[0].fields
t.lastTags = t.values[0].tags
t.values = t.values[1:]
return v.time, v.value
}
return -1, nil
}
func (t *testIterator) Fields() map[string]interface{} {
if t.fieldsFunc != nil {
return t.fieldsFunc()
}
return t.lastFields
}
func (t *testIterator) Tags() map[string]string {
if t.tagsFunc != nil {
return t.tagsFunc()
}
return t.lastTags
}
func (t *testIterator) TMin() int64 {
if t.tMinFunc != nil {
return t.tMinFunc()
}
return -1
}
// type testPoint struct {
// time int64
// value interface{}
// fields map[string]interface{}
// tags map[string]string
// }
func TestMapMeanNoValues(t *testing.T) {
iter := &testIterator{}
if got := MapMean(iter); got != nil {
if got := MapMean(&MapInput{}); got != nil {
t.Errorf("output mismatch: exp nil got %v", got)
}
}
@ -75,28 +27,30 @@ func TestMapMeanNoValues(t *testing.T) {
func TestMapMean(t *testing.T) {
tests := []struct {
input []testPoint
input *MapInput
output *meanMapOutput
}{
{ // Single point
input: []testPoint{testPoint{"0", 1, 1.0, nil, nil}},
input: &MapInput{
Items: []MapItem{
{Timestamp: 1, Value: 1.0},
},
},
output: &meanMapOutput{1, 1, Float64Type},
},
{ // Two points
input: []testPoint{
testPoint{"0", 1, 2.0, nil, nil},
testPoint{"0", 2, 8.0, nil, nil},
input: &MapInput{
Items: []MapItem{
{Timestamp: 1, Value: float64(2.0)},
{Timestamp: 2, Value: float64(8.0)},
},
},
output: &meanMapOutput{2, 5.0, Float64Type},
},
}
for _, test := range tests {
iter := &testIterator{
values: test.input,
}
got := MapMean(iter)
got := MapMean(test.input)
if got == nil {
t.Fatalf("MapMean(%v): output mismatch: exp %v got %v", test.input, test.output, got)
}
@ -154,11 +108,6 @@ func TestReducePercentileNil(t *testing.T) {
}
func TestMapDistinct(t *testing.T) {
const ( // prove that we're ignoring seriesKey
seriesKey1 = "1"
seriesKey2 = "2"
)
const ( // prove that we're ignoring time
timeId1 = iota + 1
timeId2
@ -168,18 +117,18 @@ func TestMapDistinct(t *testing.T) {
timeId6
)
iter := &testIterator{
values: []testPoint{
{seriesKey1, timeId1, uint64(1), nil, nil},
{seriesKey1, timeId2, uint64(1), nil, nil},
{seriesKey1, timeId3, "1", nil, nil},
{seriesKey2, timeId4, uint64(1), nil, nil},
{seriesKey2, timeId5, float64(1.0), nil, nil},
{seriesKey2, timeId6, "1", nil, nil},
input := &MapInput{
Items: []MapItem{
{Timestamp: timeId1, Value: uint64(1)},
{Timestamp: timeId2, Value: uint64(1)},
{Timestamp: timeId3, Value: "1"},
{Timestamp: timeId4, Value: uint64(1)},
{Timestamp: timeId5, Value: float64(1.0)},
{Timestamp: timeId6, Value: "1"},
},
}
values := MapDistinct(iter).(interfaceValues)
values := MapDistinct(input).(interfaceValues)
if exp, got := 3, len(values); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)
@ -199,11 +148,7 @@ func TestMapDistinct(t *testing.T) {
}
func TestMapDistinctNil(t *testing.T) {
iter := &testIterator{
values: []testPoint{},
}
values := MapDistinct(iter)
values := MapDistinct(&MapInput{})
if values != nil {
t.Errorf("Wrong values. exp nil got %v", spew.Sdump(values))
@ -307,11 +252,6 @@ func Test_distinctValues_Sort(t *testing.T) {
}
func TestMapCountDistinct(t *testing.T) {
const ( // prove that we're ignoring seriesKey
seriesKey1 = "1"
seriesKey2 = "2"
)
const ( // prove that we're ignoring time
timeId1 = iota + 1
timeId2
@ -322,19 +262,19 @@ func TestMapCountDistinct(t *testing.T) {
timeId7
)
iter := &testIterator{
values: []testPoint{
{seriesKey1, timeId1, uint64(1), nil, nil},
{seriesKey1, timeId2, uint64(1), nil, nil},
{seriesKey1, timeId3, "1", nil, nil},
{seriesKey2, timeId4, uint64(1), nil, nil},
{seriesKey2, timeId5, float64(1.0), nil, nil},
{seriesKey2, timeId6, "1", nil, nil},
{seriesKey2, timeId7, true, nil, nil},
input := &MapInput{
Items: []MapItem{
{Timestamp: timeId1, Value: uint64(1)},
{Timestamp: timeId2, Value: uint64(1)},
{Timestamp: timeId3, Value: "1"},
{Timestamp: timeId4, Value: uint64(1)},
{Timestamp: timeId5, Value: float64(1.0)},
{Timestamp: timeId6, Value: "1"},
{Timestamp: timeId7, Value: true},
},
}
values := MapCountDistinct(iter).(map[interface{}]struct{})
values := MapCountDistinct(input).(map[interface{}]struct{})
if exp, got := 4, len(values); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)
@ -353,13 +293,7 @@ func TestMapCountDistinct(t *testing.T) {
}
func TestMapCountDistinctNil(t *testing.T) {
iter := &testIterator{
values: []testPoint{},
}
values := MapCountDistinct(iter)
if values != nil {
if values := MapCountDistinct(&MapInput{}); values != nil {
t.Errorf("Wrong values. exp nil got %v", spew.Sdump(values))
}
}
@ -493,286 +427,225 @@ func BenchmarkGetSortedRangeBySort(b *testing.B) {
func TestMapTopBottom(t *testing.T) {
tests := []struct {
name string
skip bool
iter *testIterator
exp positionOut
call *influxql.Call
name string
skip bool
input *MapInput
exp positionOut
call *influxql.Call
}{
{
name: "top int64 - basic",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "b"}},
{"", 20, int64(88), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: int64(88), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}},
PositionPoint{20, int64(88), nil, map[string]string{"host": "a"}},
{20, int64(88), nil, map[string]string{"host": "a"}},
{10, int64(53), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "top int64 - basic with tag",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 20, int64(53), nil, map[string]string{"host": "b"}},
{"", 30, int64(88), nil, map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}},
PositionPoint{20, int64(53), nil, map[string]string{"host": "b"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "top int64 - tie on value, resolve based on time",
iter: &testIterator{
values: []testPoint{
{"", 20, int64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "a"}},
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 20, Value: int64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}},
PositionPoint{20, int64(99), nil, map[string]string{"host": "a"}},
{10, int64(99), nil, map[string]string{"host": "a"}},
{20, int64(99), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "top int64 - tie on value, time, resolve based on tags",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "b"}},
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 20, int64(88), nil, map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}},
PositionPoint{10, int64(99), nil, map[string]string{"host": "b"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "top mixed numerics - ints",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "b"}},
{"", 20, uint64(88), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}},
PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}},
{10, int64(99), nil, map[string]string{"host": "a"}},
{20, uint64(88), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "top mixed numerics - ints & floats",
iter: &testIterator{
values: []testPoint{
{"", 10, float64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "b"}},
{"", 20, uint64(88), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: float64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, float64(99), nil, map[string]string{"host": "a"}},
PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}},
{10, float64(99), nil, map[string]string{"host": "a"}},
{20, uint64(88), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "top mixed numerics - ints, floats, & strings",
iter: &testIterator{
values: []testPoint{
{"", 10, float64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "b"}},
{"", 20, "88", nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: float64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: "88", Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, float64(99), nil, map[string]string{"host": "a"}},
PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}},
{10, float64(99), nil, map[string]string{"host": "a"}},
{10, int64(53), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "top bools",
iter: &testIterator{
values: []testPoint{
{"", 10, true, nil, map[string]string{"host": "a"}},
{"", 10, true, nil, map[string]string{"host": "b"}},
{"", 20, false, nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: false, Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, true, nil, map[string]string{"host": "a"}},
PositionPoint{10, true, nil, map[string]string{"host": "b"}},
{10, true, nil, map[string]string{"host": "a"}},
{10, true, nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - basic",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "b"}},
{"", 20, int64(88), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: int64(88), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}},
PositionPoint{20, int64(88), nil, map[string]string{"host": "a"}},
{10, int64(53), nil, map[string]string{"host": "a"}},
{20, int64(88), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - basic with tag",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(20), nil, map[string]string{"host": "a"}},
{"", 20, int64(53), nil, map[string]string{"host": "b"}},
{"", 30, int64(30), nil, map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(20), nil, map[string]string{"host": "a"}},
PositionPoint{20, int64(53), nil, map[string]string{"host": "b"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - tie on value, resolve based on time",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(53), nil, map[string]string{"host": "a"}},
{"", 20, int64(53), nil, map[string]string{"host": "a"}},
{"", 20, int64(53), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: int64(53), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(53), nil, map[string]string{"host": "a"}},
PositionPoint{20, int64(53), nil, map[string]string{"host": "a"}},
{10, int64(53), nil, map[string]string{"host": "a"}},
{20, int64(53), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom int64 - tie on value, time, resolve based on tags",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "b"}},
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 20, int64(100), nil, map[string]string{"host": "a"}},
},
},
exp: positionOut{
callArgs: []string{"host"},
points: PositionPoints{
PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}},
PositionPoint{10, int64(99), nil, map[string]string{"host": "b"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom mixed numerics - ints",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "b"}},
{"", 20, uint64(88), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}},
PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}},
{10, int64(53), nil, map[string]string{"host": "a"}},
{20, uint64(88), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom mixed numerics - ints & floats",
iter: &testIterator{
values: []testPoint{
{"", 10, int64(99), nil, map[string]string{"host": "a"}},
{"", 10, float64(53), nil, map[string]string{"host": "b"}},
{"", 20, uint64(88), nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: float64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, float64(53), nil, map[string]string{"host": "b"}},
PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}},
{10, float64(53), nil, map[string]string{"host": "a"}},
{20, uint64(88), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom mixed numerics - ints, floats, & strings",
iter: &testIterator{
values: []testPoint{
{"", 10, float64(99), nil, map[string]string{"host": "a"}},
{"", 10, int64(53), nil, map[string]string{"host": "b"}},
{"", 20, "88", nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: float64(99), Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: "88", Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}},
PositionPoint{10, float64(99), nil, map[string]string{"host": "a"}},
{10, int64(53), nil, map[string]string{"host": "a"}},
{10, float64(99), nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bottom bools",
iter: &testIterator{
values: []testPoint{
{"", 10, true, nil, map[string]string{"host": "a"}},
{"", 10, true, nil, map[string]string{"host": "b"}},
{"", 20, false, nil, map[string]string{"host": "a"}},
input: &MapInput{
TMin: -1,
Items: []MapItem{
{Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}},
{Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}},
{Timestamp: 20, Value: false, Tags: map[string]string{"host": "a"}},
},
},
exp: positionOut{
points: PositionPoints{
PositionPoint{20, false, nil, map[string]string{"host": "a"}},
PositionPoint{10, true, nil, map[string]string{"host": "a"}},
{20, false, nil, map[string]string{"host": "a"}},
{10, true, nil, map[string]string{"host": "a"}},
},
},
call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
@ -787,7 +660,7 @@ func TestMapTopBottom(t *testing.T) {
limit := int(lit.Val)
fields := topCallArgs(test.call)
values := MapTopBottom(test.iter, limit, fields, len(test.call.Args), test.call.Name).(PositionPoints)
values := MapTopBottom(test.input, limit, fields, len(test.call.Args), test.call.Name).(PositionPoints)
t.Logf("Test: %s", test.name)
if exp, got := len(test.exp.points), len(values); exp != got {
t.Errorf("Wrong number of values. exp %v got %v", exp, got)

View File

@ -605,15 +605,25 @@ func (m *AggregateMapper) NextChunk() (interface{}, error) {
tsc.SelectFields = []string{m.fieldNames[i]}
tsc.SelectWhereFields = uniqueStrings([]string{m.fieldNames[i]}, m.whereFields)
// Execute the map function which walks the entire interval, and aggregates the result.
mapValue := m.mapFuncs[i](&AggregateTagSetCursor{
cursor: tsc,
tmin: tmin,
stmt: m.stmt,
// Build a map input from the cursor.
input := &MapInput{
TMin: -1,
}
if len(m.stmt.Dimensions) > 0 && !m.stmt.HasTimeFieldSpecified() {
input.TMin = tmin
}
qmin: qmin,
qmax: qmax,
})
for k, v := tsc.Next(qmin, qmax); k != -1; k, v = tsc.Next(qmin, qmax) {
input.Items = append(input.Items, MapItem{
Timestamp: k,
Value: v,
Fields: tsc.Fields(),
Tags: tsc.Tags(),
})
}
// Execute the map function which walks the entire interval, and aggregates the result.
mapValue := m.mapFuncs[i](input)
output.Values[0].Value = append(output.Values[0].Value.([]interface{}), mapValue)
}
@ -635,40 +645,6 @@ func (m *AggregateMapper) nextInterval() (start, end int64) {
return
}
// AggregateTagSetCursor wraps a standard tagSetCursor, such that the values it emits are aggregated by intervals.
type AggregateTagSetCursor struct {
cursor *TagSetCursor
qmin int64
qmax int64
tmin int64
stmt *influxql.SelectStatement
}
// Next returns the next aggregate value for the cursor.
func (a *AggregateTagSetCursor) Next() (time int64, value interface{}) {
return a.cursor.Next(a.qmin, a.qmax)
}
// Fields returns the current fields for the cursor
func (a *AggregateTagSetCursor) Fields() map[string]interface{} {
return a.cursor.Fields()
}
// Tags returns the current tags for the cursor
func (a *AggregateTagSetCursor) Tags() map[string]string { return a.cursor.Tags() }
// TMin returns the current floor time for the bucket being worked on
func (a *AggregateTagSetCursor) TMin() int64 {
if len(a.stmt.Dimensions) == 0 {
return -1
}
if !a.stmt.HasTimeFieldSpecified() {
return a.tmin
}
return -1
}
// uniqueStrings returns a slice of unique strings from all lists in a.
func uniqueStrings(a ...[]string) []string {
// Calculate unique set of strings.