Revert "move aggregate functions"

pull/3952/head
Daniel Morsing 2015-09-02 10:47:58 -07:00
parent 3eb1804740
commit c4092d7fc3
5 changed files with 94 additions and 97 deletions

View File

@ -1,10 +1,10 @@
package tsdb package influxql
// All aggregate and query functions are defined in this file along with any intermediate data objects they need to process. // All aggregate and query functions are defined in this file along with any intermediate data objects they need to process.
// Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce // Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce
// paradigm popularized by Google and Hadoop. // paradigm popularized by Google and Hadoop.
// //
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function // When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapReduceFuncs function
import ( import (
"encoding/json" "encoding/json"
@ -13,29 +13,27 @@ import (
"math/rand" "math/rand"
"sort" "sort"
"strings" "strings"
"github.com/influxdb/influxdb/influxql"
) )
// iterator represents a forward-only iterator over a set of points. // Iterator represents a forward-only iterator over a set of points.
// These are used by the mapFunctions in this file // These are used by the MapFunctions in this file
type iterator interface { type Iterator interface {
Next() (time int64, value interface{}) Next() (time int64, value interface{})
} }
// mapFunc represents a function used for mapping over a sequential series of data. // MapFunc represents a function used for mapping over a sequential series of data.
// The iterator represents a single group by interval // The iterator represents a single group by interval
type mapFunc func(iterator) interface{} type MapFunc func(Iterator) interface{}
// reduceFunc represents a function used for reducing mapper output. // ReduceFunc represents a function used for reducing mapper output.
type reduceFunc func([]interface{}) interface{} type ReduceFunc func([]interface{}) interface{}
// UnmarshalFunc represents a function that can take bytes from a mapper from remote // UnmarshalFunc represents a function that can take bytes from a mapper from remote
// server and marshal it into an interface the reducer can use // server and marshal it into an interface the reducer can use
type unmarshalFunc func([]byte) (interface{}, error) type UnmarshalFunc func([]byte) (interface{}, error)
// initializemapFunc takes an aggregate call from the query and returns the mapFunc // InitializeMapFunc takes an aggregate call from the query and returns the MapFunc
func initializeMapFunc(c *influxql.Call) (mapFunc, error) { func InitializeMapFunc(c *Call) (MapFunc, error) {
// see if it's a query for raw data // see if it's a query for raw data
if c == nil { if c == nil {
return MapRawQuery, nil return MapRawQuery, nil
@ -60,12 +58,12 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
if !strings.HasSuffix(c.Name, "derivative") { if !strings.HasSuffix(c.Name, "derivative") {
// Ensure the argument is appropriate for the aggregate function. // Ensure the argument is appropriate for the aggregate function.
switch fc := c.Args[0].(type) { switch fc := c.Args[0].(type) {
case *influxql.VarRef: case *VarRef:
case *influxql.Distinct: case *Distinct:
if c.Name != "count" { if c.Name != "count" {
return nil, fmt.Errorf("expected field argument in %s()", c.Name) return nil, fmt.Errorf("expected field argument in %s()", c.Name)
} }
case *influxql.Call: case *Call:
if fc.Name != "distinct" { if fc.Name != "distinct" {
return nil, fmt.Errorf("expected field argument in %s()", c.Name) return nil, fmt.Errorf("expected field argument in %s()", c.Name)
} }
@ -77,10 +75,10 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
// Retrieve map function by name. // Retrieve map function by name.
switch c.Name { switch c.Name {
case "count": case "count":
if _, ok := c.Args[0].(*influxql.Distinct); ok { if _, ok := c.Args[0].(*Distinct); ok {
return MapCountDistinct, nil return MapCountDistinct, nil
} }
if c, ok := c.Args[0].(*influxql.Call); ok { if c, ok := c.Args[0].(*Call); ok {
if c.Name == "distinct" { if c.Name == "distinct" {
return MapCountDistinct, nil return MapCountDistinct, nil
} }
@ -107,7 +105,7 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
case "last": case "last":
return MapLast, nil return MapLast, nil
case "percentile": case "percentile":
_, ok := c.Args[1].(*influxql.NumberLiteral) _, ok := c.Args[1].(*NumberLiteral)
if !ok { if !ok {
return nil, fmt.Errorf("expected float argument in percentile()") return nil, fmt.Errorf("expected float argument in percentile()")
} }
@ -115,8 +113,8 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
case "derivative", "non_negative_derivative": case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then // If the arg is another aggregate e.g. derivative(mean(value)), then
// use the map func for that nested aggregate // use the map func for that nested aggregate
if fn, ok := c.Args[0].(*influxql.Call); ok { if fn, ok := c.Args[0].(*Call); ok {
return initializeMapFunc(fn) return InitializeMapFunc(fn)
} }
return MapRawQuery, nil return MapRawQuery, nil
default: default:
@ -124,15 +122,15 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
} }
} }
// InitializereduceFunc takes an aggregate call from the query and returns the reduceFunc // InitializeReduceFunc takes an aggregate call from the query and returns the ReduceFunc
func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) { func InitializeReduceFunc(c *Call) (ReduceFunc, error) {
// Retrieve reduce function by name. // Retrieve reduce function by name.
switch c.Name { switch c.Name {
case "count": case "count":
if _, ok := c.Args[0].(*influxql.Distinct); ok { if _, ok := c.Args[0].(*Distinct); ok {
return ReduceCountDistinct, nil return ReduceCountDistinct, nil
} }
if c, ok := c.Args[0].(*influxql.Call); ok { if c, ok := c.Args[0].(*Call); ok {
if c.Name == "distinct" { if c.Name == "distinct" {
return ReduceCountDistinct, nil return ReduceCountDistinct, nil
} }
@ -163,7 +161,7 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) {
return nil, fmt.Errorf("expected float argument in percentile()") return nil, fmt.Errorf("expected float argument in percentile()")
} }
lit, ok := c.Args[1].(*influxql.NumberLiteral) lit, ok := c.Args[1].(*NumberLiteral)
if !ok { if !ok {
return nil, fmt.Errorf("expected float argument in percentile()") return nil, fmt.Errorf("expected float argument in percentile()")
} }
@ -171,8 +169,8 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) {
case "derivative", "non_negative_derivative": case "derivative", "non_negative_derivative":
// If the arg is another aggregate e.g. derivative(mean(value)), then // If the arg is another aggregate e.g. derivative(mean(value)), then
// use the map func for that nested aggregate // use the map func for that nested aggregate
if fn, ok := c.Args[0].(*influxql.Call); ok { if fn, ok := c.Args[0].(*Call); ok {
return initializeReduceFunc(fn) return InitializeReduceFunc(fn)
} }
return nil, fmt.Errorf("expected function argument to %s", c.Name) return nil, fmt.Errorf("expected function argument to %s", c.Name)
default: default:
@ -180,7 +178,7 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) {
} }
} }
func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) { func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) {
// if c is nil it's a raw data query // if c is nil it's a raw data query
if c == nil { if c == nil {
return func(b []byte) (interface{}, error) { return func(b []byte) (interface{}, error) {
@ -244,7 +242,7 @@ func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) {
} }
// MapCount computes the number of values in an iterator. // MapCount computes the number of values in an iterator.
func MapCount(itr iterator) interface{} { func MapCount(itr Iterator) interface{} {
n := float64(0) n := float64(0)
for k, _ := itr.Next(); k != -1; k, _ = itr.Next() { for k, _ := itr.Next(); k != -1; k, _ = itr.Next() {
n++ n++
@ -329,7 +327,7 @@ func (d distinctValues) Less(i, j int) bool {
} }
// MapDistinct computes the unique values in an iterator. // MapDistinct computes the unique values in an iterator.
func MapDistinct(itr iterator) interface{} { func MapDistinct(itr Iterator) interface{} {
var index = make(map[interface{}]struct{}) var index = make(map[interface{}]struct{})
for time, value := itr.Next(); time != -1; time, value = itr.Next() { for time, value := itr.Next(); time != -1; time, value = itr.Next() {
@ -383,7 +381,7 @@ func ReduceDistinct(values []interface{}) interface{} {
} }
// MapCountDistinct computes the unique count of values in an iterator. // MapCountDistinct computes the unique count of values in an iterator.
func MapCountDistinct(itr iterator) interface{} { func MapCountDistinct(itr Iterator) interface{} {
var index = make(map[interface{}]struct{}) var index = make(map[interface{}]struct{})
for time, value := itr.Next(); time != -1; time, value = itr.Next() { for time, value := itr.Next(); time != -1; time, value = itr.Next() {
@ -427,7 +425,7 @@ const (
) )
// MapSum computes the summation of values in an iterator. // MapSum computes the summation of values in an iterator.
func MapSum(itr iterator) interface{} { func MapSum(itr Iterator) interface{} {
n := float64(0) n := float64(0)
count := 0 count := 0
var resultType NumberType var resultType NumberType
@ -482,7 +480,7 @@ func ReduceSum(values []interface{}) interface{} {
} }
// MapMean computes the count and sum of values in an iterator to be combined by the reducer. // MapMean computes the count and sum of values in an iterator to be combined by the reducer.
func MapMean(itr iterator) interface{} { func MapMean(itr Iterator) interface{} {
out := &meanMapOutput{} out := &meanMapOutput{}
for k, v := itr.Next(); k != -1; k, v = itr.Next() { for k, v := itr.Next(); k != -1; k, v = itr.Next() {
@ -688,7 +686,7 @@ type minMaxMapOut struct {
} }
// MapMin collects the values to pass to the reducer // MapMin collects the values to pass to the reducer
func MapMin(itr iterator) interface{} { func MapMin(itr Iterator) interface{} {
min := &minMaxMapOut{} min := &minMaxMapOut{}
pointsYielded := false pointsYielded := false
@ -751,7 +749,7 @@ func ReduceMin(values []interface{}) interface{} {
} }
// MapMax collects the values to pass to the reducer // MapMax collects the values to pass to the reducer
func MapMax(itr iterator) interface{} { func MapMax(itr Iterator) interface{} {
max := &minMaxMapOut{} max := &minMaxMapOut{}
pointsYielded := false pointsYielded := false
@ -819,7 +817,7 @@ type spreadMapOutput struct {
} }
// MapSpread collects the values to pass to the reducer // MapSpread collects the values to pass to the reducer
func MapSpread(itr iterator) interface{} { func MapSpread(itr Iterator) interface{} {
out := &spreadMapOutput{} out := &spreadMapOutput{}
pointsYielded := false pointsYielded := false
var val float64 var val float64
@ -880,7 +878,7 @@ func ReduceSpread(values []interface{}) interface{} {
} }
// MapStddev collects the values to pass to the reducer // MapStddev collects the values to pass to the reducer
func MapStddev(itr iterator) interface{} { func MapStddev(itr Iterator) interface{} {
var values []float64 var values []float64
for k, v := itr.Next(); k != -1; k, v = itr.Next() { for k, v := itr.Next(); k != -1; k, v = itr.Next() {
@ -938,7 +936,7 @@ type firstLastMapOutput struct {
// MapFirst collects the values to pass to the reducer // MapFirst collects the values to pass to the reducer
// This function assumes time ordered input // This function assumes time ordered input
func MapFirst(itr iterator) interface{} { func MapFirst(itr Iterator) interface{} {
k, v := itr.Next() k, v := itr.Next()
if k == -1 { if k == -1 {
return nil return nil
@ -983,7 +981,7 @@ func ReduceFirst(values []interface{}) interface{} {
} }
// MapLast collects the values to pass to the reducer // MapLast collects the values to pass to the reducer
func MapLast(itr iterator) interface{} { func MapLast(itr Iterator) interface{} {
out := &firstLastMapOutput{} out := &firstLastMapOutput{}
pointsYielded := false pointsYielded := false
@ -1038,7 +1036,7 @@ func ReduceLast(values []interface{}) interface{} {
} }
// MapEcho emits the data points for each group by interval // MapEcho emits the data points for each group by interval
func MapEcho(itr iterator) interface{} { func MapEcho(itr Iterator) interface{} {
var values []interface{} var values []interface{}
for k, v := itr.Next(); k != -1; k, v = itr.Next() { for k, v := itr.Next(); k != -1; k, v = itr.Next() {
@ -1048,7 +1046,7 @@ func MapEcho(itr iterator) interface{} {
} }
// ReducePercentile computes the percentile of values for each key. // ReducePercentile computes the percentile of values for each key.
func ReducePercentile(percentile float64) reduceFunc { func ReducePercentile(percentile float64) ReduceFunc {
return func(values []interface{}) interface{} { return func(values []interface{}) interface{} {
var allValues []float64 var allValues []float64
@ -1081,7 +1079,7 @@ func ReducePercentile(percentile float64) reduceFunc {
} }
// IsNumeric returns whether a given aggregate can only be run on numeric fields. // IsNumeric returns whether a given aggregate can only be run on numeric fields.
func IsNumeric(c *influxql.Call) bool { func IsNumeric(c *Call) bool {
switch c.Name { switch c.Name {
case "count", "first", "last", "distinct": case "count", "first", "last", "distinct":
return false return false
@ -1091,7 +1089,7 @@ func IsNumeric(c *influxql.Call) bool {
} }
// MapRawQuery is for queries without aggregates // MapRawQuery is for queries without aggregates
func MapRawQuery(itr iterator) interface{} { func MapRawQuery(itr Iterator) interface{} {
var values []*rawQueryMapOutput var values []*rawQueryMapOutput
for k, v := itr.Next(); k != -1; k, v = itr.Next() { for k, v := itr.Next(); k != -1; k, v = itr.Next() {
val := &rawQueryMapOutput{k, v} val := &rawQueryMapOutput{k, v}

View File

@ -1,4 +1,4 @@
package tsdb package influxql
import ( import (
"reflect" "reflect"
@ -6,19 +6,18 @@ import (
"time" "time"
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/influxdb/influxdb/influxql"
) )
import "sort" import "sort"
type testPoint struct { type point struct {
seriesKey string seriesKey string
time int64 time int64
value interface{} value interface{}
} }
type testIterator struct { type testIterator struct {
values []testPoint values []point
} }
func (t *testIterator) Next() (timestamp int64, value interface{}) { func (t *testIterator) Next() (timestamp int64, value interface{}) {
@ -41,17 +40,17 @@ func TestMapMeanNoValues(t *testing.T) {
func TestMapMean(t *testing.T) { func TestMapMean(t *testing.T) {
tests := []struct { tests := []struct {
input []testPoint input []point
output *meanMapOutput output *meanMapOutput
}{ }{
{ // Single testPoint { // Single point
input: []testPoint{testPoint{"0", 1, 1.0}}, input: []point{point{"0", 1, 1.0}},
output: &meanMapOutput{1, 1, Float64Type}, output: &meanMapOutput{1, 1, Float64Type},
}, },
{ // Two testPoints { // Two points
input: []testPoint{ input: []point{
testPoint{"0", 1, 2.0}, point{"0", 1, 2.0},
testPoint{"0", 2, 8.0}, point{"0", 2, 8.0},
}, },
output: &meanMapOutput{2, 5.0, Float64Type}, output: &meanMapOutput{2, 5.0, Float64Type},
}, },
@ -74,11 +73,11 @@ func TestMapMean(t *testing.T) {
} }
func TestInitializeMapFuncPercentile(t *testing.T) { func TestInitializeMapFuncPercentile(t *testing.T) {
// No args // No args
c := &influxql.Call{ c := &Call{
Name: "percentile", Name: "percentile",
Args: []influxql.Expr{}, Args: []Expr{},
} }
_, err := initializeMapFunc(c) _, err := InitializeMapFunc(c)
if err == nil { if err == nil {
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
} }
@ -88,14 +87,14 @@ func TestInitializeMapFuncPercentile(t *testing.T) {
} }
// No percentile arg // No percentile arg
c = &influxql.Call{ c = &Call{
Name: "percentile", Name: "percentile",
Args: []influxql.Expr{ Args: []Expr{
&influxql.VarRef{Val: "field1"}, &VarRef{Val: "field1"},
}, },
} }
_, err = initializeMapFunc(c) _, err = InitializeMapFunc(c)
if err == nil { if err == nil {
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
} }
@ -109,40 +108,40 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
for _, fn := range []string{"derivative", "non_negative_derivative"} { for _, fn := range []string{"derivative", "non_negative_derivative"} {
// No args should fail // No args should fail
c := &influxql.Call{ c := &Call{
Name: fn, Name: fn,
Args: []influxql.Expr{}, Args: []Expr{},
} }
_, err := initializeMapFunc(c) _, err := InitializeMapFunc(c)
if err == nil { if err == nil {
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
} }
// Single field arg should return MapEcho // Single field arg should return MapEcho
c = &influxql.Call{ c = &Call{
Name: fn, Name: fn,
Args: []influxql.Expr{ Args: []Expr{
&influxql.VarRef{Val: " field1"}, &VarRef{Val: " field1"},
&influxql.DurationLiteral{Val: time.Hour}, &DurationLiteral{Val: time.Hour},
}, },
} }
_, err = initializeMapFunc(c) _, err = InitializeMapFunc(c)
if err != nil { if err != nil {
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
} }
// Nested Aggregate func should return the map func for the nested aggregate // Nested Aggregate func should return the map func for the nested aggregate
c = &influxql.Call{ c = &Call{
Name: fn, Name: fn,
Args: []influxql.Expr{ Args: []Expr{
&influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}, &Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}},
&influxql.DurationLiteral{Val: time.Hour}, &DurationLiteral{Val: time.Hour},
}, },
} }
_, err = initializeMapFunc(c) _, err = InitializeMapFunc(c)
if err != nil { if err != nil {
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
} }
@ -151,11 +150,11 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
func TestInitializeReduceFuncPercentile(t *testing.T) { func TestInitializeReduceFuncPercentile(t *testing.T) {
// No args // No args
c := &influxql.Call{ c := &Call{
Name: "percentile", Name: "percentile",
Args: []influxql.Expr{}, Args: []Expr{},
} }
_, err := initializeReduceFunc(c) _, err := InitializeReduceFunc(c)
if err == nil { if err == nil {
t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c)
} }
@ -165,14 +164,14 @@ func TestInitializeReduceFuncPercentile(t *testing.T) {
} }
// No percentile arg // No percentile arg
c = &influxql.Call{ c = &Call{
Name: "percentile", Name: "percentile",
Args: []influxql.Expr{ Args: []Expr{
&influxql.VarRef{Val: "field1"}, &VarRef{Val: "field1"},
}, },
} }
_, err = initializeReduceFunc(c) _, err = InitializeReduceFunc(c)
if err == nil { if err == nil {
t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c)
} }
@ -212,7 +211,7 @@ func TestMapDistinct(t *testing.T) {
) )
iter := &testIterator{ iter := &testIterator{
values: []testPoint{ values: []point{
{seriesKey1, timeId1, uint64(1)}, {seriesKey1, timeId1, uint64(1)},
{seriesKey1, timeId2, uint64(1)}, {seriesKey1, timeId2, uint64(1)},
{seriesKey1, timeId3, "1"}, {seriesKey1, timeId3, "1"},
@ -243,7 +242,7 @@ func TestMapDistinct(t *testing.T) {
func TestMapDistinctNil(t *testing.T) { func TestMapDistinctNil(t *testing.T) {
iter := &testIterator{ iter := &testIterator{
values: []testPoint{}, values: []point{},
} }
values := MapDistinct(iter) values := MapDistinct(iter)
@ -366,7 +365,7 @@ func TestMapCountDistinct(t *testing.T) {
) )
iter := &testIterator{ iter := &testIterator{
values: []testPoint{ values: []point{
{seriesKey1, timeId1, uint64(1)}, {seriesKey1, timeId1, uint64(1)},
{seriesKey1, timeId2, uint64(1)}, {seriesKey1, timeId2, uint64(1)},
{seriesKey1, timeId3, "1"}, {seriesKey1, timeId3, "1"},
@ -397,7 +396,7 @@ func TestMapCountDistinct(t *testing.T) {
func TestMapCountDistinctNil(t *testing.T) { func TestMapCountDistinctNil(t *testing.T) {
iter := &testIterator{ iter := &testIterator{
values: []testPoint{}, values: []point{},
} }
values := MapCountDistinct(iter) values := MapCountDistinct(iter)
@ -503,9 +502,9 @@ func TestGetSortedRange(t *testing.T) {
if len(results) != len(tt.expected) { if len(results) != len(tt.expected) {
t.Errorf("Test %s error. Expected getSortedRange to return %v but got %v", tt.name, tt.expected, results) t.Errorf("Test %s error. Expected getSortedRange to return %v but got %v", tt.name, tt.expected, results)
} }
for i, testPoint := range tt.expected { for i, point := range tt.expected {
if testPoint != results[i] { if point != results[i] {
t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, testPoint, results[i]) t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, point, results[i])
} }
} }
} }

View File

@ -343,9 +343,9 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) {
// the offsets within the value slices that are returned by the // the offsets within the value slices that are returned by the
// mapper. // mapper.
aggregates := e.stmt.FunctionCalls() aggregates := e.stmt.FunctionCalls()
reduceFuncs := make([]reduceFunc, len(aggregates)) reduceFuncs := make([]influxql.ReduceFunc, len(aggregates))
for i, c := range aggregates { for i, c := range aggregates {
reduceFunc, err := initializeReduceFunc(c) reduceFunc, err := influxql.InitializeReduceFunc(c)
if err != nil { if err != nil {
out <- &influxql.Row{Err: err} out <- &influxql.Row{Err: err}
return return

View File

@ -63,7 +63,7 @@ type SelectMapper struct {
intervalSize int64 // Size of each interval. intervalSize int64 // Size of each interval.
numIntervals int // Maximum number of intervals to return. numIntervals int // Maximum number of intervals to return.
currInterval int // Current interval for which data is being fetched. currInterval int // Current interval for which data is being fetched.
mapFuncs []mapFunc // The mapping functions. mapFuncs []influxql.MapFunc // The mapping functions.
fieldNames []string // the field name being read for mapping. fieldNames []string // the field name being read for mapping.
} }
@ -445,10 +445,10 @@ func (lm *SelectMapper) initializeMapFunctions() error {
var err error var err error
// Set up each mapping function for this statement. // Set up each mapping function for this statement.
aggregates := lm.selectStmt.FunctionCalls() aggregates := lm.selectStmt.FunctionCalls()
lm.mapFuncs = make([]mapFunc, len(aggregates)) lm.mapFuncs = make([]influxql.MapFunc, len(aggregates))
lm.fieldNames = make([]string, len(lm.mapFuncs)) lm.fieldNames = make([]string, len(lm.mapFuncs))
for i, c := range aggregates { for i, c := range aggregates {
lm.mapFuncs[i], err = initializeMapFunc(c) lm.mapFuncs[i], err = influxql.InitializeMapFunc(c)
if err != nil { if err != nil {
return err return err
} }

View File

@ -244,7 +244,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt
switch lit := nested.Args[0].(type) { switch lit := nested.Args[0].(type) {
case *influxql.VarRef: case *influxql.VarRef:
if IsNumeric(nested) { if influxql.IsNumeric(nested) {
f := m.Fields[lit.Val] f := m.Fields[lit.Val]
if err := validateType(a.Name, f.Name, f.Type); err != nil { if err := validateType(a.Name, f.Name, f.Type); err != nil {
return err return err
@ -254,7 +254,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt
if nested.Name != "count" { if nested.Name != "count" {
return fmt.Errorf("aggregate call didn't contain a field %s", a.String()) return fmt.Errorf("aggregate call didn't contain a field %s", a.String())
} }
if IsNumeric(nested) { if influxql.IsNumeric(nested) {
f := m.Fields[lit.Val] f := m.Fields[lit.Val]
if err := validateType(a.Name, f.Name, f.Type); err != nil { if err := validateType(a.Name, f.Name, f.Type); err != nil {
return err return err