Merge pull request #4085 from influxdb/movefuncs

Movefuncs
pull/3798/head^2
Daniel Morsing 2015-09-11 16:05:41 +00:00
commit 2c4622c954
5 changed files with 112 additions and 109 deletions

View File

@ -386,9 +386,9 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) {
// the offsets within the value slices that are returned by the
// mapper.
aggregates := e.stmt.FunctionCalls()
reduceFuncs := make([]influxql.ReduceFunc, len(aggregates))
reduceFuncs := make([]reduceFunc, len(aggregates))
for i, c := range aggregates {
reduceFunc, err := influxql.InitializeReduceFunc(c)
reduceFunc, err := initializeReduceFunc(c)
if err != nil {
out <- &influxql.Row{Err: err}
return
@ -646,7 +646,7 @@ func (e *SelectExecutor) processTopBottom(results [][]interface{}, columnNames [
// start at 1 because the first value is always time
for j := 1; j < len(vals); j++ {
switch v := vals[j].(type) {
case influxql.PositionPoints:
case PositionPoints:
tMin := vals[0].(time.Time)
for _, p := range v {
result := e.topBottomPointToQueryResult(p, tMin, call, columnNames)
@ -662,7 +662,7 @@ func (e *SelectExecutor) processTopBottom(results [][]interface{}, columnNames [
return values, nil
}
func (e *SelectExecutor) topBottomPointToQueryResult(p influxql.PositionPoint, tMin time.Time, call *influxql.Call, columnNames []string) []interface{} {
func (e *SelectExecutor) topBottomPointToQueryResult(p PositionPoint, tMin time.Time, call *influxql.Call, columnNames []string) []interface{} {
tm := time.Unix(0, p.Time).UTC().Format(time.RFC3339Nano)
// If we didn't explicity ask for time, and we have a group by, then use TMIN for the time returned
if len(e.stmt.Dimensions) > 0 && !e.stmt.HasTimeFieldSpecified() {

View File

@ -1,10 +1,10 @@
package influxql
package tsdb
// All aggregate and query functions are defined in this file along with any intermediate data objects they need to process.
// Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce
// paradigm popularized by Google and Hadoop.
//
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapReduceFuncs function
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function
import (
"encoding/json"
@ -12,29 +12,31 @@ import (
"math"
"math/rand"
"sort"
"github.com/influxdb/influxdb/influxql"
)
// Iterator represents a forward-only iterator over a set of points.
// These are used by the MapFunctions in this file
type Iterator interface {
// iterator represents a forward-only iterator over a set of points.
// These are used by the mapFunctions in this file
type iterator interface {
Next() (time int64, value interface{})
Tags() map[string]string
TMin() int64
}
// MapFunc represents a function used for mapping over a sequential series of data.
// mapFunc represents a function used for mapping over a sequential series of data.
// The iterator represents a single group by interval
type MapFunc func(Iterator) interface{}
type mapFunc func(iterator) interface{}
// ReduceFunc represents a function used for reducing mapper output.
type ReduceFunc func([]interface{}) interface{}
// reduceFunc represents a function used for reducing mapper output.
type reduceFunc func([]interface{}) interface{}
// UnmarshalFunc represents a function that can take bytes from a mapper from remote
// server and marshal it into an interface the reducer can use
type UnmarshalFunc func([]byte) (interface{}, error)
type unmarshalFunc func([]byte) (interface{}, error)
// InitializeMapFunc takes an aggregate call from the query and returns the MapFunc
func InitializeMapFunc(c *Call) (MapFunc, error) {
// initializemapFunc takes an aggregate call from the query and returns the mapFunc
func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
// see if it's a query for raw data
if c == nil {
return MapRawQuery, nil
@ -43,10 +45,10 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
// Retrieve map function by name.
switch c.Name {
case "count":
if _, ok := c.Args[0].(*Distinct); ok {
if _, ok := c.Args[0].(*influxql.Distinct); ok {
return MapCountDistinct, nil
}
if c, ok := c.Args[0].(*Call); ok {
if c, ok := c.Args[0].(*influxql.Call); ok {
if c.Name == "distinct" {
return MapCountDistinct, nil
}
@ -73,7 +75,7 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
case "last":
return MapLast, nil
case "top":
return func(itr Iterator) interface{} {
return func(itr iterator) interface{} {
return MapTop(itr, c)
}, nil
case "percentile":
@ -81,8 +83,8 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then
// use the map func for that nested aggregate
if fn, ok := c.Args[0].(*Call); ok {
return InitializeMapFunc(fn)
if fn, ok := c.Args[0].(*influxql.Call); ok {
return initializeMapFunc(fn)
}
return MapRawQuery, nil
default:
@ -90,15 +92,15 @@ func InitializeMapFunc(c *Call) (MapFunc, error) {
}
}
// InitializeReduceFunc takes an aggregate call from the query and returns the ReduceFunc
func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
// InitializereduceFunc takes an aggregate call from the query and returns the reduceFunc
func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) {
// Retrieve reduce function by name.
switch c.Name {
case "count":
if _, ok := c.Args[0].(*Distinct); ok {
if _, ok := c.Args[0].(*influxql.Distinct); ok {
return ReduceCountDistinct, nil
}
if c, ok := c.Args[0].(*Call); ok {
if c, ok := c.Args[0].(*influxql.Call); ok {
if c.Name == "distinct" {
return ReduceCountDistinct, nil
}
@ -135,8 +137,8 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then
// use the map func for that nested aggregate
if fn, ok := c.Args[0].(*Call); ok {
return InitializeReduceFunc(fn)
if fn, ok := c.Args[0].(*influxql.Call); ok {
return initializeReduceFunc(fn)
}
return nil, fmt.Errorf("expected function argument to %s", c.Name)
default:
@ -144,7 +146,7 @@ func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
}
}
func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) {
func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) {
// if c is nil it's a raw data query
if c == nil {
return func(b []byte) (interface{}, error) {
@ -208,7 +210,7 @@ func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) {
}
// MapCount computes the number of values in an iterator.
func MapCount(itr Iterator) interface{} {
func MapCount(itr iterator) interface{} {
n := float64(0)
for k, _ := itr.Next(); k != -1; k, _ = itr.Next() {
n++
@ -374,7 +376,7 @@ func (d interfaceValues) Less(i, j int) bool {
}
// MapDistinct computes the unique values in an iterator.
func MapDistinct(itr Iterator) interface{} {
func MapDistinct(itr iterator) interface{} {
var index = make(map[interface{}]struct{})
for time, value := itr.Next(); time != -1; time, value = itr.Next() {
@ -428,7 +430,7 @@ func ReduceDistinct(values []interface{}) interface{} {
}
// MapCountDistinct computes the unique count of values in an iterator.
func MapCountDistinct(itr Iterator) interface{} {
func MapCountDistinct(itr iterator) interface{} {
var index = make(map[interface{}]struct{})
for time, value := itr.Next(); time != -1; time, value = itr.Next() {
@ -472,7 +474,7 @@ const (
)
// MapSum computes the summation of values in an iterator.
func MapSum(itr Iterator) interface{} {
func MapSum(itr iterator) interface{} {
n := float64(0)
count := 0
var resultType NumberType
@ -527,7 +529,7 @@ func ReduceSum(values []interface{}) interface{} {
}
// MapMean computes the count and sum of values in an iterator to be combined by the reducer.
func MapMean(itr Iterator) interface{} {
func MapMean(itr iterator) interface{} {
out := &meanMapOutput{}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
@ -733,7 +735,7 @@ type minMaxMapOut struct {
}
// MapMin collects the values to pass to the reducer
func MapMin(itr Iterator) interface{} {
func MapMin(itr iterator) interface{} {
min := &minMaxMapOut{}
pointsYielded := false
@ -796,7 +798,7 @@ func ReduceMin(values []interface{}) interface{} {
}
// MapMax collects the values to pass to the reducer
func MapMax(itr Iterator) interface{} {
func MapMax(itr iterator) interface{} {
max := &minMaxMapOut{}
pointsYielded := false
@ -864,7 +866,7 @@ type spreadMapOutput struct {
}
// MapSpread collects the values to pass to the reducer
func MapSpread(itr Iterator) interface{} {
func MapSpread(itr iterator) interface{} {
out := &spreadMapOutput{}
pointsYielded := false
var val float64
@ -925,7 +927,7 @@ func ReduceSpread(values []interface{}) interface{} {
}
// MapStddev collects the values to pass to the reducer
func MapStddev(itr Iterator) interface{} {
func MapStddev(itr iterator) interface{} {
var values []float64
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
@ -983,7 +985,7 @@ type firstLastMapOutput struct {
// MapFirst collects the values to pass to the reducer
// This function assumes time ordered input
func MapFirst(itr Iterator) interface{} {
func MapFirst(itr iterator) interface{} {
k, v := itr.Next()
if k == -1 {
return nil
@ -1028,7 +1030,7 @@ func ReduceFirst(values []interface{}) interface{} {
}
// MapLast collects the values to pass to the reducer
func MapLast(itr Iterator) interface{} {
func MapLast(itr iterator) interface{} {
out := &firstLastMapOutput{}
pointsYielded := false
@ -1341,10 +1343,10 @@ func (t topReduceOut) Less(i, j int) bool {
// callArgs will get any additional field/tag names that may be needed to sort with
// it is important to maintain the order of these that they were asked for in the call
// for sorting purposes
func topCallArgs(c *Call) []string {
func topCallArgs(c *influxql.Call) []string {
var names []string
for _, v := range c.Args[1 : len(c.Args)-1] {
if f, ok := v.(*VarRef); ok {
if f, ok := v.(*influxql.VarRef); ok {
names = append(names, f.Val)
}
}
@ -1352,9 +1354,9 @@ func topCallArgs(c *Call) []string {
}
// MapTop emits the top data points for each group by interval
func MapTop(itr Iterator, c *Call) interface{} {
func MapTop(itr iterator, c *influxql.Call) interface{} {
// Capture the limit if it was specified in the call
lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral)
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int64(lit.Val)
// Simple case where only value and limit are specified.
@ -1462,8 +1464,8 @@ func MapTop(itr Iterator, c *Call) interface{} {
}
// ReduceTop computes the top values for each key.
func ReduceTop(values []interface{}, c *Call) interface{} {
lit, _ := c.Args[len(c.Args)-1].(*NumberLiteral)
func ReduceTop(values []interface{}, c *influxql.Call) interface{} {
lit, _ := c.Args[len(c.Args)-1].(*influxql.NumberLiteral)
limit := int64(lit.Val)
out := positionOut{callArgs: topCallArgs(c)}
@ -1491,7 +1493,7 @@ func ReduceTop(values []interface{}, c *Call) interface{} {
}
// MapEcho emits the data points for each group by interval
func MapEcho(itr Iterator) interface{} {
func MapEcho(itr iterator) interface{} {
var values []interface{}
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
@ -1501,10 +1503,10 @@ func MapEcho(itr Iterator) interface{} {
}
// ReducePercentile computes the percentile of values for each key.
func ReducePercentile(values []interface{}, c *Call) interface{} {
func ReducePercentile(values []interface{}, c *influxql.Call) interface{} {
// Checks that this arg exists and is a valid type are done in the parsing validation
// and have test coverage there
lit, _ := c.Args[1].(*NumberLiteral)
lit, _ := c.Args[1].(*influxql.NumberLiteral)
percentile := lit.Val
var allValues []float64
@ -1537,7 +1539,7 @@ func ReducePercentile(values []interface{}, c *Call) interface{} {
}
// IsNumeric returns whether a given aggregate can only be run on numeric fields.
func IsNumeric(c *Call) bool {
func IsNumeric(c *influxql.Call) bool {
switch c.Name {
case "count", "first", "last", "distinct":
return false
@ -1547,7 +1549,7 @@ func IsNumeric(c *Call) bool {
}
// MapRawQuery is for queries without aggregates
func MapRawQuery(itr Iterator) interface{} {
func MapRawQuery(itr iterator) interface{} {
var values []*rawQueryMapOutput
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
val := &rawQueryMapOutput{k, v}

View File

@ -1,4 +1,4 @@
package influxql
package tsdb
import (
"reflect"
@ -6,11 +6,12 @@ import (
"time"
"github.com/davecgh/go-spew/spew"
"github.com/influxdb/influxdb/influxql"
)
import "sort"
type point struct {
type testPoint struct {
seriesKey string
time int64
value interface{}
@ -18,7 +19,7 @@ type point struct {
}
type testIterator struct {
values []point
values []testPoint
lastTags map[string]string
nextFunc func() (timestamp int64, value interface{})
tagsFunc func() map[string]string
@ -63,17 +64,17 @@ func TestMapMeanNoValues(t *testing.T) {
func TestMapMean(t *testing.T) {
tests := []struct {
input []point
input []testPoint
output *meanMapOutput
}{
{ // Single point
input: []point{point{"0", 1, 1.0, nil}},
input: []testPoint{testPoint{"0", 1, 1.0, nil}},
output: &meanMapOutput{1, 1, Float64Type},
},
{ // Two points
input: []point{
point{"0", 1, 2.0, nil},
point{"0", 2, 8.0, nil},
input: []testPoint{
testPoint{"0", 1, 2.0, nil},
testPoint{"0", 2, 8.0, nil},
},
output: &meanMapOutput{2, 5.0, Float64Type},
},
@ -99,29 +100,29 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
for _, fn := range []string{"derivative", "non_negative_derivative"} {
// Single field arg should return MapEcho
c := &Call{
c := &influxql.Call{
Name: fn,
Args: []Expr{
&VarRef{Val: " field1"},
&DurationLiteral{Val: time.Hour},
Args: []influxql.Expr{
&influxql.VarRef{Val: " field1"},
&influxql.DurationLiteral{Val: time.Hour},
},
}
_, err := InitializeMapFunc(c)
_, err := initializeMapFunc(c)
if err != nil {
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
}
// Nested Aggregate func should return the map func for the nested aggregate
c = &Call{
c = &influxql.Call{
Name: fn,
Args: []Expr{
&Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}},
&DurationLiteral{Val: time.Hour},
Args: []influxql.Expr{
&influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}},
&influxql.DurationLiteral{Val: time.Hour},
},
}
_, err = InitializeMapFunc(c)
_, err = initializeMapFunc(c)
if err != nil {
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
}
@ -135,7 +136,7 @@ func TestReducePercentileNil(t *testing.T) {
}
// ReducePercentile should ignore nil values when calculating the percentile
got := ReducePercentile(input, &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 100}}})
got := ReducePercentile(input, &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 100}}})
if got != nil {
t.Fatalf("ReducePercentile(100) returned wrong type. exp nil got %v", got)
}
@ -157,7 +158,7 @@ func TestMapDistinct(t *testing.T) {
)
iter := &testIterator{
values: []point{
values: []testPoint{
{seriesKey1, timeId1, uint64(1), nil},
{seriesKey1, timeId2, uint64(1), nil},
{seriesKey1, timeId3, "1", nil},
@ -188,7 +189,7 @@ func TestMapDistinct(t *testing.T) {
func TestMapDistinctNil(t *testing.T) {
iter := &testIterator{
values: []point{},
values: []testPoint{},
}
values := MapDistinct(iter)
@ -311,7 +312,7 @@ func TestMapCountDistinct(t *testing.T) {
)
iter := &testIterator{
values: []point{
values: []testPoint{
{seriesKey1, timeId1, uint64(1), nil},
{seriesKey1, timeId2, uint64(1), nil},
{seriesKey1, timeId3, "1", nil},
@ -342,7 +343,7 @@ func TestMapCountDistinct(t *testing.T) {
func TestMapCountDistinctNil(t *testing.T) {
iter := &testIterator{
values: []point{},
values: []testPoint{},
}
values := MapCountDistinct(iter)
@ -448,9 +449,9 @@ func TestGetSortedRange(t *testing.T) {
if len(results) != len(tt.expected) {
t.Errorf("Test %s error. Expected getSortedRange to return %v but got %v", tt.name, tt.expected, results)
}
for i, point := range tt.expected {
if point != results[i] {
t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, point, results[i])
for i, testPoint := range tt.expected {
if testPoint != results[i] {
t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, testPoint, results[i])
}
}
}
@ -485,12 +486,12 @@ func TestMapTop(t *testing.T) {
skip bool
iter *testIterator
exp positionOut
call *Call
call *influxql.Call
}{
{
name: "int64 - basic",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, int64(88), map[string]string{"host": "a"}},
@ -502,12 +503,12 @@ func TestMapTop(t *testing.T) {
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - basic with tag",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 20, int64(53), map[string]string{"host": "b"}},
{"", 30, int64(88), map[string]string{"host": "a"}},
@ -520,12 +521,12 @@ func TestMapTop(t *testing.T) {
PositionPoint{20, int64(53), map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - tie on value, resolve based on time",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 20, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "a"}},
{"", 10, int64(99), map[string]string{"host": "a"}},
@ -538,12 +539,12 @@ func TestMapTop(t *testing.T) {
PositionPoint{20, int64(99), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - tie on value, time, resolve based on tags",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "b"}},
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 20, int64(88), map[string]string{"host": "a"}},
@ -556,12 +557,12 @@ func TestMapTop(t *testing.T) {
PositionPoint{10, int64(99), map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 10, int64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, uint64(88), map[string]string{"host": "a"}},
@ -573,12 +574,12 @@ func TestMapTop(t *testing.T) {
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints & floats",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 10, float64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, uint64(88), map[string]string{"host": "a"}},
@ -590,12 +591,12 @@ func TestMapTop(t *testing.T) {
PositionPoint{20, uint64(88), map[string]string{"host": "a"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "mixed numerics - ints, floats, & strings",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 10, float64(99), map[string]string{"host": "a"}},
{"", 10, int64(53), map[string]string{"host": "b"}},
{"", 20, "88", map[string]string{"host": "a"}},
@ -607,12 +608,12 @@ func TestMapTop(t *testing.T) {
PositionPoint{10, int64(53), map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "bools",
iter: &testIterator{
values: []point{
values: []testPoint{
{"", 10, true, map[string]string{"host": "a"}},
{"", 10, true, map[string]string{"host": "b"}},
{"", 20, false, map[string]string{"host": "a"}},
@ -624,7 +625,7 @@ func TestMapTop(t *testing.T) {
PositionPoint{10, true, map[string]string{"host": "b"}},
},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
}
@ -649,7 +650,7 @@ func TestReduceTop(t *testing.T) {
skip bool
values []interface{}
exp PositionPoints
call *Call
call *influxql.Call
}{
{
name: "int64 - single map",
@ -664,7 +665,7 @@ func TestReduceTop(t *testing.T) {
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map",
@ -681,7 +682,7 @@ func TestReduceTop(t *testing.T) {
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map with nil",
@ -697,7 +698,7 @@ func TestReduceTop(t *testing.T) {
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{"host": "a"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
{
name: "int64 - double map with non-matching tags and tag selected",
@ -713,7 +714,7 @@ func TestReduceTop(t *testing.T) {
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(88), map[string]string{}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &VarRef{Val: "host"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.VarRef{Val: "host"}, &influxql.NumberLiteral{Val: 2}}},
},
{
skip: true,
@ -730,7 +731,7 @@ func TestReduceTop(t *testing.T) {
PositionPoint{10, int64(99), map[string]string{"host": "a"}},
PositionPoint{20, int64(55), map[string]string{"host": "b"}},
},
call: &Call{Name: "top", Args: []Expr{&VarRef{Val: "field1"}, &NumberLiteral{Val: 2}}},
call: &influxql.Call{Name: "top", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}, &influxql.NumberLiteral{Val: 2}}},
},
}

View File

@ -59,12 +59,12 @@ type SelectMapper struct {
// The following attributes are only used when mappers are for aggregate queries.
queryTMinWindow int64 // Minimum time of the query floored to start of interval.
intervalSize int64 // Size of each interval.
numIntervals int // Maximum number of intervals to return.
currInterval int // Current interval for which data is being fetched.
mapFuncs []influxql.MapFunc // The mapping functions.
fieldNames []string // the field name being read for mapping.
queryTMinWindow int64 // Minimum time of the query floored to start of interval.
intervalSize int64 // Size of each interval.
numIntervals int // Maximum number of intervals to return.
currInterval int // Current interval for which data is being fetched.
mapFuncs []mapFunc // The mapping functions.
fieldNames []string // the field name being read for mapping.
}
// NewSelectMapper returns a mapper for the given shard, which will return data for the SELECT statement.
@ -500,10 +500,10 @@ func (lm *SelectMapper) initializeMapFunctions() error {
var err error
// Set up each mapping function for this statement.
aggregates := lm.selectStmt.FunctionCalls()
lm.mapFuncs = make([]influxql.MapFunc, len(aggregates))
lm.mapFuncs = make([]mapFunc, len(aggregates))
lm.fieldNames = make([]string, len(lm.mapFuncs))
for i, c := range aggregates {
lm.mapFuncs[i], err = influxql.InitializeMapFunc(c)
lm.mapFuncs[i], err = initializeMapFunc(c)
if err != nil {
return err
}

View File

@ -289,7 +289,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt
switch lit := nested.Args[0].(type) {
case *influxql.VarRef:
if influxql.IsNumeric(nested) {
if IsNumeric(nested) {
f := m.Fields[lit.Val]
if err := validateType(a.Name, f.Name, f.Type); err != nil {
return err
@ -299,7 +299,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt
if nested.Name != "count" {
return fmt.Errorf("aggregate call didn't contain a field %s", a.String())
}
if influxql.IsNumeric(nested) {
if IsNumeric(nested) {
f := m.Fields[lit.Val]
if err := validateType(a.Name, f.Name, f.Type); err != nil {
return err