diff --git a/CHANGELOG.md b/CHANGELOG.md index aca70a1120..21f830b8b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ - [#4222](https://github.com/influxdb/influxdb/pull/4222): Graphite TCP connections should not block shutdown - [#4180](https://github.com/influxdb/influxdb/pull/4180): Cursor & SelectMapper Refactor - [#1577](https://github.com/influxdb/influxdb/issues/1577): selectors (e.g. min, max, first, last) should have equivalents to return the actual point +- [#4264](https://github.com/influxdb/influxdb/issues/4264): Refactor map functions to use list of values ## v0.9.4 [2015-09-14] diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index 2723d495d9..15649dc5b8 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -2573,6 +2573,7 @@ func TestServer_Query_AggregateSelectors(t *testing.T) { t.Logf("SKIP:: %s", query.name) continue } + if err := query.Execute(s); err != nil { t.Error(query.Error(err)) } else if !query.success() { diff --git a/tsdb/functions.go b/tsdb/functions.go index 7cf66b08af..b266f24eda 100644 --- a/tsdb/functions.go +++ b/tsdb/functions.go @@ -19,7 +19,7 @@ import ( "github.com/influxdb/influxdb/influxql" ) -// iterator represents a forward-only iterator over a set of points. +// Iterator represents a forward-only iterator over a set of points. // These are used by the mapFunctions in this file type Iterator interface { Next() (time int64, value interface{}) @@ -28,9 +28,26 @@ type Iterator interface { TMin() int64 } +type MapInput struct { + TMin int64 + Items []MapItem +} + +type MapItem struct { + Timestamp int64 + Value interface{} + + // TODO(benbjohnson): + // Move fields and tags up to MapInput. Currently the engine combines + // multiple series together during processing. This needs to be fixed so + // that each map function only operates on a single series at a time instead. + Fields map[string]interface{} + Tags map[string]string +} + // mapFunc represents a function used for mapping over a sequential series of data. // The iterator represents a single group by interval -type mapFunc func(Iterator) interface{} +type mapFunc func(*MapInput) interface{} // reduceFunc represents a function used for reducing mapper output. type reduceFunc func([]interface{}) interface{} @@ -67,24 +84,24 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) { case "median": return MapStddev, nil case "min": - return func(itr Iterator) interface{} { - return MapMin(itr, c.Fields()[0]) + return func(input *MapInput) interface{} { + return MapMin(input, c.Fields()[0]) }, nil case "max": - return func(itr Iterator) interface{} { - return MapMax(itr, c.Fields()[0]) + return func(input *MapInput) interface{} { + return MapMax(input, c.Fields()[0]) }, nil case "spread": return MapSpread, nil case "stddev": return MapStddev, nil case "first": - return func(itr Iterator) interface{} { - return MapFirst(itr, c.Fields()[0]) + return func(input *MapInput) interface{} { + return MapFirst(input, c.Fields()[0]) }, nil case "last": - return func(itr Iterator) interface{} { - return MapLast(itr, c.Fields()[0]) + return func(input *MapInput) interface{} { + return MapLast(input, c.Fields()[0]) }, nil case "top", "bottom": @@ -93,8 +110,8 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) { limit := int(lit.Val) fields := topCallArgs(c) - return func(itr Iterator) interface{} { - return MapTopBottom(itr, limit, fields, len(c.Args), c.Name) + return func(input *MapInput) interface{} { + return MapTopBottom(input, limit, fields, len(c.Args), c.Name) }, nil case "percentile": return MapEcho, nil @@ -228,9 +245,9 @@ func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) { } // MapCount computes the number of values in an iterator. -func MapCount(itr Iterator) interface{} { +func MapCount(input *MapInput) interface{} { n := float64(0) - for k, _ := itr.Next(); k != -1; k, _ = itr.Next() { + for range input.Items { n++ } if n > 0 { @@ -253,20 +270,19 @@ func (d interfaceValues) Less(i, j int) bool { } // MapDistinct computes the unique values in an iterator. -func MapDistinct(itr Iterator) interface{} { - var index = make(map[interface{}]struct{}) - - for time, value := itr.Next(); time != -1; time, value = itr.Next() { - index[value] = struct{}{} +func MapDistinct(input *MapInput) interface{} { + m := make(map[interface{}]struct{}) + for _, item := range input.Items { + m[item.Value] = struct{}{} } - if len(index) == 0 { + if len(m) == 0 { return nil } - results := make(interfaceValues, len(index)) + results := make(interfaceValues, len(m)) var i int - for value, _ := range index { + for value, _ := range m { results[i] = value i++ } @@ -307,11 +323,11 @@ func ReduceDistinct(values []interface{}) interface{} { } // MapCountDistinct computes the unique count of values in an iterator. -func MapCountDistinct(itr Iterator) interface{} { +func MapCountDistinct(input *MapInput) interface{} { var index = make(map[interface{}]struct{}) - for time, value := itr.Next(); time != -1; time, value = itr.Next() { - index[value] = struct{}{} + for _, item := range input.Items { + index[item.Value] = struct{}{} } if len(index) == 0 { @@ -351,29 +367,31 @@ const ( ) // MapSum computes the summation of values in an iterator. -func MapSum(itr Iterator) interface{} { +func MapSum(input *MapInput) interface{} { + if len(input.Items) == 0 { + return nil + } + n := float64(0) - count := 0 var resultType NumberType - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - count++ - switch n1 := v.(type) { + for _, item := range input.Items { + switch v := item.Value.(type) { case float64: - n += n1 + n += v case int64: - n += float64(n1) + n += float64(v) resultType = Int64Type } } - if count > 0 { - switch resultType { - case Float64Type: - return n - case Int64Type: - return int64(n) - } + + switch resultType { + case Float64Type: + return n + case Int64Type: + return int64(n) + default: + return nil } - return nil } // ReduceSum computes the sum of values for each key. @@ -406,25 +424,23 @@ func ReduceSum(values []interface{}) interface{} { } // MapMean computes the count and sum of values in an iterator to be combined by the reducer. -func MapMean(itr Iterator) interface{} { - out := &meanMapOutput{} +func MapMean(input *MapInput) interface{} { + if len(input.Items) == 0 { + return nil + } - for k, v := itr.Next(); k != -1; k, v = itr.Next() { + out := &meanMapOutput{} + for _, item := range input.Items { out.Count++ - switch n1 := v.(type) { + switch v := item.Value.(type) { case float64: - out.Mean += (n1 - out.Mean) / float64(out.Count) + out.Mean += (v - out.Mean) / float64(out.Count) case int64: - out.Mean += (float64(n1) - out.Mean) / float64(out.Count) + out.Mean += (float64(v) - out.Mean) / float64(out.Count) out.ResultType = Int64Type } } - - if out.Count > 0 { - return out - } - - return nil + return out } type meanMapOutput struct { @@ -615,21 +631,21 @@ type minMaxMapOut struct { } // MapMin collects the values to pass to the reducer -func MapMin(itr Iterator, fieldName string) interface{} { +func MapMin(input *MapInput, fieldName string) interface{} { min := &minMaxMapOut{} pointsYielded := false var val float64 - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - switch n := v.(type) { + for _, item := range input.Items { + switch v := item.Value.(type) { case float64: - val = n + val = v case int64: - val = float64(n) + val = float64(v) min.Type = Int64Type case map[string]interface{}: - if d, t, ok := decodeValueAndNumberType(n[fieldName]); ok { + if d, t, ok := decodeValueAndNumberType(v[fieldName]); ok { val, min.Type = d, t } else { continue @@ -638,19 +654,20 @@ func MapMin(itr Iterator, fieldName string) interface{} { // Initialize min if !pointsYielded { - min.Time = k + min.Time = item.Timestamp min.Val = val - min.Fields = itr.Fields() - min.Tags = itr.Tags() + min.Fields = item.Fields + min.Tags = item.Tags pointsYielded = true } current := min.Val min.Val = math.Min(min.Val, val) + // Check to see if the value changed, if so, update the fields/tags if current != min.Val { - min.Time = k - min.Fields = itr.Fields() - min.Tags = itr.Tags() + min.Time = item.Timestamp + min.Fields = item.Fields + min.Tags = item.Tags } } if pointsYielded { @@ -724,21 +741,21 @@ func decodeValueAndNumberType(v interface{}) (float64, NumberType, bool) { } // MapMax collects the values to pass to the reducer -func MapMax(itr Iterator, fieldName string) interface{} { +func MapMax(input *MapInput, fieldName string) interface{} { max := &minMaxMapOut{} pointsYielded := false var val float64 - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - switch n := v.(type) { + for _, item := range input.Items { + switch v := item.Value.(type) { case float64: - val = n + val = v case int64: - val = float64(n) + val = float64(v) max.Type = Int64Type case map[string]interface{}: - if d, t, ok := decodeValueAndNumberType(n[fieldName]); ok { + if d, t, ok := decodeValueAndNumberType(v[fieldName]); ok { val, max.Type = d, t } else { continue @@ -747,19 +764,20 @@ func MapMax(itr Iterator, fieldName string) interface{} { // Initialize max if !pointsYielded { - max.Time = k + max.Time = item.Timestamp max.Val = val - max.Fields = itr.Fields() - max.Tags = itr.Tags() + max.Fields = item.Fields + max.Tags = item.Tags pointsYielded = true } current := max.Val max.Val = math.Max(max.Val, val) + // Check to see if the value changed, if so, update the fields/tags if current != max.Val { - max.Time = k - max.Fields = itr.Fields() - max.Tags = itr.Tags() + max.Time = item.Timestamp + max.Fields = item.Fields + max.Tags = item.Tags } } if pointsYielded { @@ -827,17 +845,17 @@ type spreadMapOutput struct { } // MapSpread collects the values to pass to the reducer -func MapSpread(itr Iterator) interface{} { +func MapSpread(input *MapInput) interface{} { out := &spreadMapOutput{} pointsYielded := false var val float64 - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - switch n := v.(type) { + for _, item := range input.Items { + switch v := item.Value.(type) { case float64: - val = n + val = v case int64: - val = float64(n) + val = float64(v) out.Type = Int64Type } @@ -888,19 +906,17 @@ func ReduceSpread(values []interface{}) interface{} { } // MapStddev collects the values to pass to the reducer -func MapStddev(itr Iterator) interface{} { - var values []float64 - - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - switch n := v.(type) { +func MapStddev(input *MapInput) interface{} { + var a []float64 + for _, item := range input.Items { + switch v := item.Value.(type) { case float64: - values = append(values, n) + a = append(a, v) case int64: - values = append(values, float64(n)) + a = append(a, float64(v)) } } - - return values + return a } // ReduceStddev computes the stddev of values. @@ -948,29 +964,35 @@ type firstLastMapOutput struct { // MapFirst collects the values to pass to the reducer // This function assumes time ordered input -func MapFirst(itr Iterator, fieldName string) interface{} { - var fields map[string]interface{} - k, v := itr.Next() - fields = itr.Fields() - if k == -1 { +func MapFirst(input *MapInput, fieldName string) interface{} { + if len(input.Items) == 0 { return nil } + + k, v := input.Items[0].Timestamp, input.Items[0].Value + tags := input.Items[0].Tags + fields := input.Items[0].Fields if n, ok := v.(map[string]interface{}); ok { v = n[fieldName] } - nextk, nextv := itr.Next() - if n, ok := nextv.(map[string]interface{}); ok { - nextv = n[fieldName] - } - for nextk == k { + // Find greatest value at same timestamp. + for _, item := range input.Items[1:] { + nextk, nextv := item.Timestamp, item.Value + if nextk != k { + break + } + if n, ok := nextv.(map[string]interface{}); ok { + nextv = n[fieldName] + } + if greaterThan(nextv, v) { - fields = itr.Fields() + fields = item.Fields + tags = item.Tags v = nextv } - nextk, nextv = itr.Next() } - return &firstLastMapOutput{Time: k, Value: v, Fields: fields, Tags: itr.Tags()} + return &firstLastMapOutput{Time: k, Value: v, Fields: fields, Tags: tags} } // ReduceFirst computes the first of value. @@ -1014,31 +1036,33 @@ func ReduceFirst(values []interface{}) interface{} { } // MapLast collects the values to pass to the reducer -func MapLast(itr Iterator, fieldName string) interface{} { +func MapLast(input *MapInput, fieldName string) interface{} { out := &firstLastMapOutput{} pointsYielded := false - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - if n, ok := v.(map[string]interface{}); ok { - v = n[fieldName] + for _, item := range input.Items { + k, v := item.Timestamp, item.Value + if m, ok := v.(map[string]interface{}); ok { + v = m[fieldName] } + // Initialize last if !pointsYielded { out.Time = k out.Value = v - out.Fields = itr.Fields() - out.Tags = itr.Tags() + out.Fields = item.Fields + out.Tags = item.Tags pointsYielded = true } if k > out.Time { out.Time = k out.Value = v - out.Fields = itr.Fields() - out.Tags = itr.Tags() + out.Fields = item.Fields + out.Tags = item.Tags } else if k == out.Time && greaterThan(v, out.Value) { out.Value = v - out.Fields = itr.Fields() - out.Tags = itr.Tags() + out.Fields = item.Fields + out.Tags = item.Tags } } if pointsYielded { @@ -1455,7 +1479,7 @@ func (m *mapIter) Next() (time int64, value interface{}) { } // MapTopBottom emits the top/bottom data points for each group by interval -func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callName string) interface{} { +func MapTopBottom(input *MapInput, limit int, fields []string, argCount int, callName string) interface{} { out := positionOut{callArgs: fields} out.points = make([]PositionPoint, 0, limit) minheap := topBottomMapOut{ @@ -1475,9 +1499,15 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa // For each unique permutation of the tags given, // select the max and then fall through to select top of those // points - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - pp = PositionPoint{k, v, itr.Fields(), itr.Tags()} - tags := itr.Tags() + for _, item := range input.Items { + pp = PositionPoint{ + Time: item.Timestamp, + Value: item.Value, + Fields: item.Fields, + Tags: item.Tags, + } + tags := item.Tags + // TODO in the future we need to send in fields as well // this will allow a user to query on both fields and tags // fields will take the priority over tags if there is a name collision @@ -1487,18 +1517,24 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa tagmap[key] = pp } } - itr = &mapIter{ - m: tagmap, - tmin: itr.TMin(), + + items := make([]MapItem, 0, len(tagmap)) + for _, p := range tagmap { + items = append(items, MapItem{Timestamp: p.Time, Value: p.Value, Fields: p.Fields, Tags: p.Tags}) + } + input = &MapInput{ + TMin: input.TMin, + Items: items, } } - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - t := k - if bt := itr.TMin(); bt > -1 { - t = bt + + for _, item := range input.Items { + t := item.Timestamp + if input.TMin > -1 { + t = input.TMin } if len(out.points) < limit { - out.points = append(out.points, PositionPoint{t, v, itr.Fields(), itr.Tags()}) + out.points = append(out.points, PositionPoint{t, item.Value, item.Fields, item.Tags}) if len(out.points) == limit { heap.Init(&minheap) } @@ -1506,12 +1542,13 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa // we're over the limit, so find out if we're bigger than the // smallest point in the set and eject it if we are minval := &out.points[0] - pp = PositionPoint{t, v, itr.Fields(), itr.Tags()} + pp = PositionPoint{t, item.Value, item.Fields, item.Tags} if minheap.positionPointLess(minval, &pp) { minheap.insert(pp) } } } + // should only happen on empty iterator. if len(out.points) == 0 { return nil @@ -1521,6 +1558,7 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa // rid of another sort order. heap.Init(&minheap) } + // minheap should now contain the largest/smallest values that were encountered // during iteration. // @@ -1533,10 +1571,12 @@ func MapTopBottom(itr Iterator, limit int, fields []string, argCount int, callNa for len(out.points) > 0 { p := out.points[0] heap.Pop(&minheap) + // reslice so that we can get to the element just after the heap endslice := out.points[:len(out.points)+1] endslice[len(endslice)-1] = p } + // the ascending order is now in the result slice return result } @@ -1589,11 +1629,10 @@ func ReduceTopBottom(values []interface{}, c *influxql.Call) interface{} { } // MapEcho emits the data points for each group by interval -func MapEcho(itr Iterator) interface{} { +func MapEcho(input *MapInput) interface{} { var values []interface{} - - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - values = append(values, v) + for _, item := range input.Items { + values = append(values, item.Value) } return values } @@ -1645,11 +1684,10 @@ func IsNumeric(c *influxql.Call) bool { } // MapRawQuery is for queries without aggregates -func MapRawQuery(itr Iterator) interface{} { +func MapRawQuery(input *MapInput) interface{} { var values []*rawQueryMapOutput - for k, v := itr.Next(); k != -1; k, v = itr.Next() { - val := &rawQueryMapOutput{k, v} - values = append(values, val) + for _, item := range input.Items { + values = append(values, &rawQueryMapOutput{item.Timestamp, item.Value}) } return values } diff --git a/tsdb/functions_test.go b/tsdb/functions_test.go index 5ac82acccb..24b3f55182 100644 --- a/tsdb/functions_test.go +++ b/tsdb/functions_test.go @@ -11,63 +11,15 @@ import ( import "sort" -type testPoint struct { - seriesKey string - time int64 - value interface{} - fields map[string]interface{} - tags map[string]string -} - -type testIterator struct { - values []testPoint - lastFields map[string]interface{} - lastTags map[string]string - nextFunc func() (timestamp int64, value interface{}) - fieldsFunc func() map[string]interface{} - tagsFunc func() map[string]string - tMinFunc func() int64 -} - -func (t *testIterator) Next() (timestamp int64, value interface{}) { - if t.nextFunc != nil { - return t.nextFunc() - } - if len(t.values) > 0 { - v := t.values[0] - t.lastFields = t.values[0].fields - t.lastTags = t.values[0].tags - t.values = t.values[1:] - return v.time, v.value - } - - return -1, nil -} - -func (t *testIterator) Fields() map[string]interface{} { - if t.fieldsFunc != nil { - return t.fieldsFunc() - } - return t.lastFields -} - -func (t *testIterator) Tags() map[string]string { - if t.tagsFunc != nil { - return t.tagsFunc() - } - return t.lastTags -} - -func (t *testIterator) TMin() int64 { - if t.tMinFunc != nil { - return t.tMinFunc() - } - return -1 -} +// type testPoint struct { +// time int64 +// value interface{} +// fields map[string]interface{} +// tags map[string]string +// } func TestMapMeanNoValues(t *testing.T) { - iter := &testIterator{} - if got := MapMean(iter); got != nil { + if got := MapMean(&MapInput{}); got != nil { t.Errorf("output mismatch: exp nil got %v", got) } } @@ -75,28 +27,30 @@ func TestMapMeanNoValues(t *testing.T) { func TestMapMean(t *testing.T) { tests := []struct { - input []testPoint + input *MapInput output *meanMapOutput }{ { // Single point - input: []testPoint{testPoint{"0", 1, 1.0, nil, nil}}, + input: &MapInput{ + Items: []MapItem{ + {Timestamp: 1, Value: 1.0}, + }, + }, output: &meanMapOutput{1, 1, Float64Type}, }, { // Two points - input: []testPoint{ - testPoint{"0", 1, 2.0, nil, nil}, - testPoint{"0", 2, 8.0, nil, nil}, + input: &MapInput{ + Items: []MapItem{ + {Timestamp: 1, Value: float64(2.0)}, + {Timestamp: 2, Value: float64(8.0)}, + }, }, output: &meanMapOutput{2, 5.0, Float64Type}, }, } for _, test := range tests { - iter := &testIterator{ - values: test.input, - } - - got := MapMean(iter) + got := MapMean(test.input) if got == nil { t.Fatalf("MapMean(%v): output mismatch: exp %v got %v", test.input, test.output, got) } @@ -154,11 +108,6 @@ func TestReducePercentileNil(t *testing.T) { } func TestMapDistinct(t *testing.T) { - const ( // prove that we're ignoring seriesKey - seriesKey1 = "1" - seriesKey2 = "2" - ) - const ( // prove that we're ignoring time timeId1 = iota + 1 timeId2 @@ -168,18 +117,18 @@ func TestMapDistinct(t *testing.T) { timeId6 ) - iter := &testIterator{ - values: []testPoint{ - {seriesKey1, timeId1, uint64(1), nil, nil}, - {seriesKey1, timeId2, uint64(1), nil, nil}, - {seriesKey1, timeId3, "1", nil, nil}, - {seriesKey2, timeId4, uint64(1), nil, nil}, - {seriesKey2, timeId5, float64(1.0), nil, nil}, - {seriesKey2, timeId6, "1", nil, nil}, + input := &MapInput{ + Items: []MapItem{ + {Timestamp: timeId1, Value: uint64(1)}, + {Timestamp: timeId2, Value: uint64(1)}, + {Timestamp: timeId3, Value: "1"}, + {Timestamp: timeId4, Value: uint64(1)}, + {Timestamp: timeId5, Value: float64(1.0)}, + {Timestamp: timeId6, Value: "1"}, }, } - values := MapDistinct(iter).(interfaceValues) + values := MapDistinct(input).(interfaceValues) if exp, got := 3, len(values); exp != got { t.Errorf("Wrong number of values. exp %v got %v", exp, got) @@ -199,11 +148,7 @@ func TestMapDistinct(t *testing.T) { } func TestMapDistinctNil(t *testing.T) { - iter := &testIterator{ - values: []testPoint{}, - } - - values := MapDistinct(iter) + values := MapDistinct(&MapInput{}) if values != nil { t.Errorf("Wrong values. exp nil got %v", spew.Sdump(values)) @@ -307,11 +252,6 @@ func Test_distinctValues_Sort(t *testing.T) { } func TestMapCountDistinct(t *testing.T) { - const ( // prove that we're ignoring seriesKey - seriesKey1 = "1" - seriesKey2 = "2" - ) - const ( // prove that we're ignoring time timeId1 = iota + 1 timeId2 @@ -322,19 +262,19 @@ func TestMapCountDistinct(t *testing.T) { timeId7 ) - iter := &testIterator{ - values: []testPoint{ - {seriesKey1, timeId1, uint64(1), nil, nil}, - {seriesKey1, timeId2, uint64(1), nil, nil}, - {seriesKey1, timeId3, "1", nil, nil}, - {seriesKey2, timeId4, uint64(1), nil, nil}, - {seriesKey2, timeId5, float64(1.0), nil, nil}, - {seriesKey2, timeId6, "1", nil, nil}, - {seriesKey2, timeId7, true, nil, nil}, + input := &MapInput{ + Items: []MapItem{ + {Timestamp: timeId1, Value: uint64(1)}, + {Timestamp: timeId2, Value: uint64(1)}, + {Timestamp: timeId3, Value: "1"}, + {Timestamp: timeId4, Value: uint64(1)}, + {Timestamp: timeId5, Value: float64(1.0)}, + {Timestamp: timeId6, Value: "1"}, + {Timestamp: timeId7, Value: true}, }, } - values := MapCountDistinct(iter).(map[interface{}]struct{}) + values := MapCountDistinct(input).(map[interface{}]struct{}) if exp, got := 4, len(values); exp != got { t.Errorf("Wrong number of values. exp %v got %v", exp, got) @@ -353,13 +293,7 @@ func TestMapCountDistinct(t *testing.T) { } func TestMapCountDistinctNil(t *testing.T) { - iter := &testIterator{ - values: []testPoint{}, - } - - values := MapCountDistinct(iter) - - if values != nil { + if values := MapCountDistinct(&MapInput{}); values != nil { t.Errorf("Wrong values. exp nil got %v", spew.Sdump(values)) } } @@ -493,286 +427,225 @@ func BenchmarkGetSortedRangeBySort(b *testing.B) { func TestMapTopBottom(t *testing.T) { tests := []struct { - name string - skip bool - iter *testIterator - exp positionOut - call *influxql.Call + name string + skip bool + input *MapInput + exp positionOut + call *influxql.Call }{ { name: "top int64 - basic", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "b"}}, - {"", 20, int64(88), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: int64(88), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{20, int64(88), nil, map[string]string{"host": "a"}}, + {20, int64(88), nil, map[string]string{"host": "a"}}, + {10, int64(53), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, - { - name: "top int64 - basic with tag", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 20, int64(53), nil, map[string]string{"host": "b"}}, - {"", 30, int64(88), nil, map[string]string{"host": "a"}}, - }, - }, - exp: positionOut{ - callArgs: []string{"host"}, - points: PositionPoints{ - PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{20, int64(53), nil, map[string]string{"host": "b"}}, - }, - }, - call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}}, - }, { name: "top int64 - tie on value, resolve based on time", - iter: &testIterator{ - values: []testPoint{ - {"", 20, int64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "a"}}, - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 20, Value: int64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ callArgs: []string{"host"}, points: PositionPoints{ - PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{20, int64(99), nil, map[string]string{"host": "a"}}, + {10, int64(99), nil, map[string]string{"host": "a"}}, + {20, int64(99), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, - { - name: "top int64 - tie on value, time, resolve based on tags", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "b"}}, - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 20, int64(88), nil, map[string]string{"host": "a"}}, - }, - }, - exp: positionOut{ - callArgs: []string{"host"}, - points: PositionPoints{ - PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{10, int64(99), nil, map[string]string{"host": "b"}}, - }, - }, - call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}}, - }, { name: "top mixed numerics - ints", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "b"}}, - {"", 20, uint64(88), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}}, + {10, int64(99), nil, map[string]string{"host": "a"}}, + {20, uint64(88), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "top mixed numerics - ints & floats", - iter: &testIterator{ - values: []testPoint{ - {"", 10, float64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "b"}}, - {"", 20, uint64(88), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: float64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, float64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}}, + {10, float64(99), nil, map[string]string{"host": "a"}}, + {20, uint64(88), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "top mixed numerics - ints, floats, & strings", - iter: &testIterator{ - values: []testPoint{ - {"", 10, float64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "b"}}, - {"", 20, "88", nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: float64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: "88", Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, float64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}}, + {10, float64(99), nil, map[string]string{"host": "a"}}, + {10, int64(53), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "top bools", - iter: &testIterator{ - values: []testPoint{ - {"", 10, true, nil, map[string]string{"host": "a"}}, - {"", 10, true, nil, map[string]string{"host": "b"}}, - {"", 20, false, nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: false, Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, true, nil, map[string]string{"host": "a"}}, - PositionPoint{10, true, nil, map[string]string{"host": "b"}}, + {10, true, nil, map[string]string{"host": "a"}}, + {10, true, nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "bottom int64 - basic", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "b"}}, - {"", 20, int64(88), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: int64(88), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}}, - PositionPoint{20, int64(88), nil, map[string]string{"host": "a"}}, + {10, int64(53), nil, map[string]string{"host": "a"}}, + {20, int64(88), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, - { - name: "bottom int64 - basic with tag", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(20), nil, map[string]string{"host": "a"}}, - {"", 20, int64(53), nil, map[string]string{"host": "b"}}, - {"", 30, int64(30), nil, map[string]string{"host": "a"}}, - }, - }, - exp: positionOut{ - callArgs: []string{"host"}, - points: PositionPoints{ - PositionPoint{10, int64(20), nil, map[string]string{"host": "a"}}, - PositionPoint{20, int64(53), nil, map[string]string{"host": "b"}}, - }, - }, - call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}}, - }, { name: "bottom int64 - tie on value, resolve based on time", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(53), nil, map[string]string{"host": "a"}}, - {"", 20, int64(53), nil, map[string]string{"host": "a"}}, - {"", 20, int64(53), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: int64(53), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ callArgs: []string{"host"}, points: PositionPoints{ - PositionPoint{10, int64(53), nil, map[string]string{"host": "a"}}, - PositionPoint{20, int64(53), nil, map[string]string{"host": "a"}}, + {10, int64(53), nil, map[string]string{"host": "a"}}, + {20, int64(53), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, - { - name: "bottom int64 - tie on value, time, resolve based on tags", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "b"}}, - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 20, int64(100), nil, map[string]string{"host": "a"}}, - }, - }, - exp: positionOut{ - callArgs: []string{"host"}, - points: PositionPoints{ - PositionPoint{10, int64(99), nil, map[string]string{"host": "a"}}, - PositionPoint{10, int64(99), nil, map[string]string{"host": "b"}}, - }, - }, - call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}}, - }, { name: "bottom mixed numerics - ints", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "b"}}, - {"", 20, uint64(88), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}}, - PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}}, + {10, int64(53), nil, map[string]string{"host": "a"}}, + {20, uint64(88), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "bottom mixed numerics - ints & floats", - iter: &testIterator{ - values: []testPoint{ - {"", 10, int64(99), nil, map[string]string{"host": "a"}}, - {"", 10, float64(53), nil, map[string]string{"host": "b"}}, - {"", 20, uint64(88), nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: int64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: float64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: uint64(88), Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, float64(53), nil, map[string]string{"host": "b"}}, - PositionPoint{20, uint64(88), nil, map[string]string{"host": "a"}}, + {10, float64(53), nil, map[string]string{"host": "a"}}, + {20, uint64(88), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "bottom mixed numerics - ints, floats, & strings", - iter: &testIterator{ - values: []testPoint{ - {"", 10, float64(99), nil, map[string]string{"host": "a"}}, - {"", 10, int64(53), nil, map[string]string{"host": "b"}}, - {"", 20, "88", nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: float64(99), Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: int64(53), Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: "88", Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{10, int64(53), nil, map[string]string{"host": "b"}}, - PositionPoint{10, float64(99), nil, map[string]string{"host": "a"}}, + {10, int64(53), nil, map[string]string{"host": "a"}}, + {10, float64(99), nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, }, { name: "bottom bools", - iter: &testIterator{ - values: []testPoint{ - {"", 10, true, nil, map[string]string{"host": "a"}}, - {"", 10, true, nil, map[string]string{"host": "b"}}, - {"", 20, false, nil, map[string]string{"host": "a"}}, + input: &MapInput{ + TMin: -1, + Items: []MapItem{ + {Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}}, + {Timestamp: 10, Value: true, Tags: map[string]string{"host": "a"}}, + {Timestamp: 20, Value: false, Tags: map[string]string{"host": "a"}}, }, }, exp: positionOut{ points: PositionPoints{ - PositionPoint{20, false, nil, map[string]string{"host": "a"}}, - PositionPoint{10, true, nil, map[string]string{"host": "a"}}, + {20, false, nil, map[string]string{"host": "a"}}, + {10, true, nil, map[string]string{"host": "a"}}, }, }, call: &influxql.Call{Name: "bottom", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}}, @@ -787,7 +660,7 @@ func TestMapTopBottom(t *testing.T) { limit := int(lit.Val) fields := topCallArgs(test.call) - values := MapTopBottom(test.iter, limit, fields, len(test.call.Args), test.call.Name).(PositionPoints) + values := MapTopBottom(test.input, limit, fields, len(test.call.Args), test.call.Name).(PositionPoints) t.Logf("Test: %s", test.name) if exp, got := len(test.exp.points), len(values); exp != got { t.Errorf("Wrong number of values. exp %v got %v", exp, got) diff --git a/tsdb/mapper.go b/tsdb/mapper.go index 1e0bf30fe8..b49dfefba9 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -605,15 +605,25 @@ func (m *AggregateMapper) NextChunk() (interface{}, error) { tsc.SelectFields = []string{m.fieldNames[i]} tsc.SelectWhereFields = uniqueStrings([]string{m.fieldNames[i]}, m.whereFields) - // Execute the map function which walks the entire interval, and aggregates the result. - mapValue := m.mapFuncs[i](&AggregateTagSetCursor{ - cursor: tsc, - tmin: tmin, - stmt: m.stmt, + // Build a map input from the cursor. + input := &MapInput{ + TMin: -1, + } + if len(m.stmt.Dimensions) > 0 && !m.stmt.HasTimeFieldSpecified() { + input.TMin = tmin + } - qmin: qmin, - qmax: qmax, - }) + for k, v := tsc.Next(qmin, qmax); k != -1; k, v = tsc.Next(qmin, qmax) { + input.Items = append(input.Items, MapItem{ + Timestamp: k, + Value: v, + Fields: tsc.Fields(), + Tags: tsc.Tags(), + }) + } + + // Execute the map function which walks the entire interval, and aggregates the result. + mapValue := m.mapFuncs[i](input) output.Values[0].Value = append(output.Values[0].Value.([]interface{}), mapValue) } @@ -635,40 +645,6 @@ func (m *AggregateMapper) nextInterval() (start, end int64) { return } -// AggregateTagSetCursor wraps a standard tagSetCursor, such that the values it emits are aggregated by intervals. -type AggregateTagSetCursor struct { - cursor *TagSetCursor - qmin int64 - qmax int64 - - tmin int64 - stmt *influxql.SelectStatement -} - -// Next returns the next aggregate value for the cursor. -func (a *AggregateTagSetCursor) Next() (time int64, value interface{}) { - return a.cursor.Next(a.qmin, a.qmax) -} - -// Fields returns the current fields for the cursor -func (a *AggregateTagSetCursor) Fields() map[string]interface{} { - return a.cursor.Fields() -} - -// Tags returns the current tags for the cursor -func (a *AggregateTagSetCursor) Tags() map[string]string { return a.cursor.Tags() } - -// TMin returns the current floor time for the bucket being worked on -func (a *AggregateTagSetCursor) TMin() int64 { - if len(a.stmt.Dimensions) == 0 { - return -1 - } - if !a.stmt.HasTimeFieldSpecified() { - return a.tmin - } - return -1 -} - // uniqueStrings returns a slice of unique strings from all lists in a. func uniqueStrings(a ...[]string) []string { // Calculate unique set of strings.