From c4092d7fc30cea9ccbfb79cc0a04f7dfdccce4ed Mon Sep 17 00:00:00 2001 From: Daniel Morsing Date: Wed, 2 Sep 2015 10:47:58 -0700 Subject: [PATCH] Revert "move aggregate functions" --- {tsdb => influxql}/functions.go | 88 ++++++++++++++------------- {tsdb => influxql}/functions_test.go | 89 ++++++++++++++-------------- tsdb/executor.go | 4 +- tsdb/mapper.go | 6 +- tsdb/shard.go | 4 +- 5 files changed, 94 insertions(+), 97 deletions(-) rename {tsdb => influxql}/functions.go (91%) rename {tsdb => influxql}/functions_test.go (87%) diff --git a/tsdb/functions.go b/influxql/functions.go similarity index 91% rename from tsdb/functions.go rename to influxql/functions.go index 0ce3d0e670..87119441bf 100644 --- a/tsdb/functions.go +++ b/influxql/functions.go @@ -1,10 +1,10 @@ -package tsdb +package influxql // All aggregate and query functions are defined in this file along with any intermediate data objects they need to process. // Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce // paradigm popularized by Google and Hadoop. // -// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function +// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapReduceFuncs function import ( "encoding/json" @@ -13,29 +13,27 @@ import ( "math/rand" "sort" "strings" - - "github.com/influxdb/influxdb/influxql" ) -// iterator represents a forward-only iterator over a set of points. -// These are used by the mapFunctions in this file -type iterator interface { +// Iterator represents a forward-only iterator over a set of points. +// These are used by the MapFunctions in this file +type Iterator interface { Next() (time int64, value interface{}) } -// mapFunc represents a function used for mapping over a sequential series of data. +// MapFunc represents a function used for mapping over a sequential series of data. // The iterator represents a single group by interval -type mapFunc func(iterator) interface{} +type MapFunc func(Iterator) interface{} -// reduceFunc represents a function used for reducing mapper output. -type reduceFunc func([]interface{}) interface{} +// ReduceFunc represents a function used for reducing mapper output. +type ReduceFunc func([]interface{}) interface{} // UnmarshalFunc represents a function that can take bytes from a mapper from remote // server and marshal it into an interface the reducer can use -type unmarshalFunc func([]byte) (interface{}, error) +type UnmarshalFunc func([]byte) (interface{}, error) -// initializemapFunc takes an aggregate call from the query and returns the mapFunc -func initializeMapFunc(c *influxql.Call) (mapFunc, error) { +// InitializeMapFunc takes an aggregate call from the query and returns the MapFunc +func InitializeMapFunc(c *Call) (MapFunc, error) { // see if it's a query for raw data if c == nil { return MapRawQuery, nil @@ -60,12 +58,12 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) { if !strings.HasSuffix(c.Name, "derivative") { // Ensure the argument is appropriate for the aggregate function. switch fc := c.Args[0].(type) { - case *influxql.VarRef: - case *influxql.Distinct: + case *VarRef: + case *Distinct: if c.Name != "count" { return nil, fmt.Errorf("expected field argument in %s()", c.Name) } - case *influxql.Call: + case *Call: if fc.Name != "distinct" { return nil, fmt.Errorf("expected field argument in %s()", c.Name) } @@ -77,10 +75,10 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) { // Retrieve map function by name. switch c.Name { case "count": - if _, ok := c.Args[0].(*influxql.Distinct); ok { + if _, ok := c.Args[0].(*Distinct); ok { return MapCountDistinct, nil } - if c, ok := c.Args[0].(*influxql.Call); ok { + if c, ok := c.Args[0].(*Call); ok { if c.Name == "distinct" { return MapCountDistinct, nil } @@ -107,7 +105,7 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) { case "last": return MapLast, nil case "percentile": - _, ok := c.Args[1].(*influxql.NumberLiteral) + _, ok := c.Args[1].(*NumberLiteral) if !ok { return nil, fmt.Errorf("expected float argument in percentile()") } @@ -115,8 +113,8 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) { case "derivative", "non_negative_derivative": // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate - if fn, ok := c.Args[0].(*influxql.Call); ok { - return initializeMapFunc(fn) + if fn, ok := c.Args[0].(*Call); ok { + return InitializeMapFunc(fn) } return MapRawQuery, nil default: @@ -124,15 +122,15 @@ func initializeMapFunc(c *influxql.Call) (mapFunc, error) { } } -// InitializereduceFunc takes an aggregate call from the query and returns the reduceFunc -func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) { +// InitializeReduceFunc takes an aggregate call from the query and returns the ReduceFunc +func InitializeReduceFunc(c *Call) (ReduceFunc, error) { // Retrieve reduce function by name. switch c.Name { case "count": - if _, ok := c.Args[0].(*influxql.Distinct); ok { + if _, ok := c.Args[0].(*Distinct); ok { return ReduceCountDistinct, nil } - if c, ok := c.Args[0].(*influxql.Call); ok { + if c, ok := c.Args[0].(*Call); ok { if c.Name == "distinct" { return ReduceCountDistinct, nil } @@ -163,7 +161,7 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) { return nil, fmt.Errorf("expected float argument in percentile()") } - lit, ok := c.Args[1].(*influxql.NumberLiteral) + lit, ok := c.Args[1].(*NumberLiteral) if !ok { return nil, fmt.Errorf("expected float argument in percentile()") } @@ -171,8 +169,8 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) { case "derivative", "non_negative_derivative": // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate - if fn, ok := c.Args[0].(*influxql.Call); ok { - return initializeReduceFunc(fn) + if fn, ok := c.Args[0].(*Call); ok { + return InitializeReduceFunc(fn) } return nil, fmt.Errorf("expected function argument to %s", c.Name) default: @@ -180,7 +178,7 @@ func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) { } } -func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) { +func InitializeUnmarshaller(c *Call) (UnmarshalFunc, error) { // if c is nil it's a raw data query if c == nil { return func(b []byte) (interface{}, error) { @@ -244,7 +242,7 @@ func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) { } // MapCount computes the number of values in an iterator. -func MapCount(itr iterator) interface{} { +func MapCount(itr Iterator) interface{} { n := float64(0) for k, _ := itr.Next(); k != -1; k, _ = itr.Next() { n++ @@ -329,7 +327,7 @@ func (d distinctValues) Less(i, j int) bool { } // MapDistinct computes the unique values in an iterator. -func MapDistinct(itr iterator) interface{} { +func MapDistinct(itr Iterator) interface{} { var index = make(map[interface{}]struct{}) for time, value := itr.Next(); time != -1; time, value = itr.Next() { @@ -383,7 +381,7 @@ func ReduceDistinct(values []interface{}) interface{} { } // MapCountDistinct computes the unique count of values in an iterator. -func MapCountDistinct(itr iterator) interface{} { +func MapCountDistinct(itr Iterator) interface{} { var index = make(map[interface{}]struct{}) for time, value := itr.Next(); time != -1; time, value = itr.Next() { @@ -427,7 +425,7 @@ const ( ) // MapSum computes the summation of values in an iterator. -func MapSum(itr iterator) interface{} { +func MapSum(itr Iterator) interface{} { n := float64(0) count := 0 var resultType NumberType @@ -482,7 +480,7 @@ func ReduceSum(values []interface{}) interface{} { } // MapMean computes the count and sum of values in an iterator to be combined by the reducer. -func MapMean(itr iterator) interface{} { +func MapMean(itr Iterator) interface{} { out := &meanMapOutput{} for k, v := itr.Next(); k != -1; k, v = itr.Next() { @@ -688,7 +686,7 @@ type minMaxMapOut struct { } // MapMin collects the values to pass to the reducer -func MapMin(itr iterator) interface{} { +func MapMin(itr Iterator) interface{} { min := &minMaxMapOut{} pointsYielded := false @@ -751,7 +749,7 @@ func ReduceMin(values []interface{}) interface{} { } // MapMax collects the values to pass to the reducer -func MapMax(itr iterator) interface{} { +func MapMax(itr Iterator) interface{} { max := &minMaxMapOut{} pointsYielded := false @@ -819,7 +817,7 @@ type spreadMapOutput struct { } // MapSpread collects the values to pass to the reducer -func MapSpread(itr iterator) interface{} { +func MapSpread(itr Iterator) interface{} { out := &spreadMapOutput{} pointsYielded := false var val float64 @@ -880,7 +878,7 @@ func ReduceSpread(values []interface{}) interface{} { } // MapStddev collects the values to pass to the reducer -func MapStddev(itr iterator) interface{} { +func MapStddev(itr Iterator) interface{} { var values []float64 for k, v := itr.Next(); k != -1; k, v = itr.Next() { @@ -938,7 +936,7 @@ type firstLastMapOutput struct { // MapFirst collects the values to pass to the reducer // This function assumes time ordered input -func MapFirst(itr iterator) interface{} { +func MapFirst(itr Iterator) interface{} { k, v := itr.Next() if k == -1 { return nil @@ -983,7 +981,7 @@ func ReduceFirst(values []interface{}) interface{} { } // MapLast collects the values to pass to the reducer -func MapLast(itr iterator) interface{} { +func MapLast(itr Iterator) interface{} { out := &firstLastMapOutput{} pointsYielded := false @@ -1038,7 +1036,7 @@ func ReduceLast(values []interface{}) interface{} { } // MapEcho emits the data points for each group by interval -func MapEcho(itr iterator) interface{} { +func MapEcho(itr Iterator) interface{} { var values []interface{} for k, v := itr.Next(); k != -1; k, v = itr.Next() { @@ -1048,7 +1046,7 @@ func MapEcho(itr iterator) interface{} { } // ReducePercentile computes the percentile of values for each key. -func ReducePercentile(percentile float64) reduceFunc { +func ReducePercentile(percentile float64) ReduceFunc { return func(values []interface{}) interface{} { var allValues []float64 @@ -1081,7 +1079,7 @@ func ReducePercentile(percentile float64) reduceFunc { } // IsNumeric returns whether a given aggregate can only be run on numeric fields. -func IsNumeric(c *influxql.Call) bool { +func IsNumeric(c *Call) bool { switch c.Name { case "count", "first", "last", "distinct": return false @@ -1091,7 +1089,7 @@ func IsNumeric(c *influxql.Call) bool { } // MapRawQuery is for queries without aggregates -func MapRawQuery(itr iterator) interface{} { +func MapRawQuery(itr Iterator) interface{} { var values []*rawQueryMapOutput for k, v := itr.Next(); k != -1; k, v = itr.Next() { val := &rawQueryMapOutput{k, v} diff --git a/tsdb/functions_test.go b/influxql/functions_test.go similarity index 87% rename from tsdb/functions_test.go rename to influxql/functions_test.go index ea35834dc1..56303d8d9a 100644 --- a/tsdb/functions_test.go +++ b/influxql/functions_test.go @@ -1,4 +1,4 @@ -package tsdb +package influxql import ( "reflect" @@ -6,19 +6,18 @@ import ( "time" "github.com/davecgh/go-spew/spew" - "github.com/influxdb/influxdb/influxql" ) import "sort" -type testPoint struct { +type point struct { seriesKey string time int64 value interface{} } type testIterator struct { - values []testPoint + values []point } func (t *testIterator) Next() (timestamp int64, value interface{}) { @@ -41,17 +40,17 @@ func TestMapMeanNoValues(t *testing.T) { func TestMapMean(t *testing.T) { tests := []struct { - input []testPoint + input []point output *meanMapOutput }{ - { // Single testPoint - input: []testPoint{testPoint{"0", 1, 1.0}}, + { // Single point + input: []point{point{"0", 1, 1.0}}, output: &meanMapOutput{1, 1, Float64Type}, }, - { // Two testPoints - input: []testPoint{ - testPoint{"0", 1, 2.0}, - testPoint{"0", 2, 8.0}, + { // Two points + input: []point{ + point{"0", 1, 2.0}, + point{"0", 2, 8.0}, }, output: &meanMapOutput{2, 5.0, Float64Type}, }, @@ -74,11 +73,11 @@ func TestMapMean(t *testing.T) { } func TestInitializeMapFuncPercentile(t *testing.T) { // No args - c := &influxql.Call{ + c := &Call{ Name: "percentile", - Args: []influxql.Expr{}, + Args: []Expr{}, } - _, err := initializeMapFunc(c) + _, err := InitializeMapFunc(c) if err == nil { t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) } @@ -88,14 +87,14 @@ func TestInitializeMapFuncPercentile(t *testing.T) { } // No percentile arg - c = &influxql.Call{ + c = &Call{ Name: "percentile", - Args: []influxql.Expr{ - &influxql.VarRef{Val: "field1"}, + Args: []Expr{ + &VarRef{Val: "field1"}, }, } - _, err = initializeMapFunc(c) + _, err = InitializeMapFunc(c) if err == nil { t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) } @@ -109,40 +108,40 @@ func TestInitializeMapFuncDerivative(t *testing.T) { for _, fn := range []string{"derivative", "non_negative_derivative"} { // No args should fail - c := &influxql.Call{ + c := &Call{ Name: fn, - Args: []influxql.Expr{}, + Args: []Expr{}, } - _, err := initializeMapFunc(c) + _, err := InitializeMapFunc(c) if err == nil { t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) } // Single field arg should return MapEcho - c = &influxql.Call{ + c = &Call{ Name: fn, - Args: []influxql.Expr{ - &influxql.VarRef{Val: " field1"}, - &influxql.DurationLiteral{Val: time.Hour}, + Args: []Expr{ + &VarRef{Val: " field1"}, + &DurationLiteral{Val: time.Hour}, }, } - _, err = initializeMapFunc(c) + _, err = InitializeMapFunc(c) if err != nil { t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) } // Nested Aggregate func should return the map func for the nested aggregate - c = &influxql.Call{ + c = &Call{ Name: fn, - Args: []influxql.Expr{ - &influxql.Call{Name: "mean", Args: []influxql.Expr{&influxql.VarRef{Val: "field1"}}}, - &influxql.DurationLiteral{Val: time.Hour}, + Args: []Expr{ + &Call{Name: "mean", Args: []Expr{&VarRef{Val: "field1"}}}, + &DurationLiteral{Val: time.Hour}, }, } - _, err = initializeMapFunc(c) + _, err = InitializeMapFunc(c) if err != nil { t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) } @@ -151,11 +150,11 @@ func TestInitializeMapFuncDerivative(t *testing.T) { func TestInitializeReduceFuncPercentile(t *testing.T) { // No args - c := &influxql.Call{ + c := &Call{ Name: "percentile", - Args: []influxql.Expr{}, + Args: []Expr{}, } - _, err := initializeReduceFunc(c) + _, err := InitializeReduceFunc(c) if err == nil { t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) } @@ -165,14 +164,14 @@ func TestInitializeReduceFuncPercentile(t *testing.T) { } // No percentile arg - c = &influxql.Call{ + c = &Call{ Name: "percentile", - Args: []influxql.Expr{ - &influxql.VarRef{Val: "field1"}, + Args: []Expr{ + &VarRef{Val: "field1"}, }, } - _, err = initializeReduceFunc(c) + _, err = InitializeReduceFunc(c) if err == nil { t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) } @@ -212,7 +211,7 @@ func TestMapDistinct(t *testing.T) { ) iter := &testIterator{ - values: []testPoint{ + values: []point{ {seriesKey1, timeId1, uint64(1)}, {seriesKey1, timeId2, uint64(1)}, {seriesKey1, timeId3, "1"}, @@ -243,7 +242,7 @@ func TestMapDistinct(t *testing.T) { func TestMapDistinctNil(t *testing.T) { iter := &testIterator{ - values: []testPoint{}, + values: []point{}, } values := MapDistinct(iter) @@ -366,7 +365,7 @@ func TestMapCountDistinct(t *testing.T) { ) iter := &testIterator{ - values: []testPoint{ + values: []point{ {seriesKey1, timeId1, uint64(1)}, {seriesKey1, timeId2, uint64(1)}, {seriesKey1, timeId3, "1"}, @@ -397,7 +396,7 @@ func TestMapCountDistinct(t *testing.T) { func TestMapCountDistinctNil(t *testing.T) { iter := &testIterator{ - values: []testPoint{}, + values: []point{}, } values := MapCountDistinct(iter) @@ -503,9 +502,9 @@ func TestGetSortedRange(t *testing.T) { if len(results) != len(tt.expected) { t.Errorf("Test %s error. Expected getSortedRange to return %v but got %v", tt.name, tt.expected, results) } - for i, testPoint := range tt.expected { - if testPoint != results[i] { - t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, testPoint, results[i]) + for i, point := range tt.expected { + if point != results[i] { + t.Errorf("Test %s error. getSortedRange returned wrong result for index %v. Expected %v but got %v", tt.name, i, point, results[i]) } } } diff --git a/tsdb/executor.go b/tsdb/executor.go index 92d0ea16c4..72dbeb676e 100644 --- a/tsdb/executor.go +++ b/tsdb/executor.go @@ -343,9 +343,9 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) { // the offsets within the value slices that are returned by the // mapper. aggregates := e.stmt.FunctionCalls() - reduceFuncs := make([]reduceFunc, len(aggregates)) + reduceFuncs := make([]influxql.ReduceFunc, len(aggregates)) for i, c := range aggregates { - reduceFunc, err := initializeReduceFunc(c) + reduceFunc, err := influxql.InitializeReduceFunc(c) if err != nil { out <- &influxql.Row{Err: err} return diff --git a/tsdb/mapper.go b/tsdb/mapper.go index 4e741b98c0..e46b535e1d 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -63,7 +63,7 @@ type SelectMapper struct { intervalSize int64 // Size of each interval. numIntervals int // Maximum number of intervals to return. currInterval int // Current interval for which data is being fetched. - mapFuncs []mapFunc // The mapping functions. + mapFuncs []influxql.MapFunc // The mapping functions. fieldNames []string // the field name being read for mapping. } @@ -445,10 +445,10 @@ func (lm *SelectMapper) initializeMapFunctions() error { var err error // Set up each mapping function for this statement. aggregates := lm.selectStmt.FunctionCalls() - lm.mapFuncs = make([]mapFunc, len(aggregates)) + lm.mapFuncs = make([]influxql.MapFunc, len(aggregates)) lm.fieldNames = make([]string, len(lm.mapFuncs)) for i, c := range aggregates { - lm.mapFuncs[i], err = initializeMapFunc(c) + lm.mapFuncs[i], err = influxql.InitializeMapFunc(c) if err != nil { return err } diff --git a/tsdb/shard.go b/tsdb/shard.go index acb83277b1..d1c4dc1f16 100644 --- a/tsdb/shard.go +++ b/tsdb/shard.go @@ -244,7 +244,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt switch lit := nested.Args[0].(type) { case *influxql.VarRef: - if IsNumeric(nested) { + if influxql.IsNumeric(nested) { f := m.Fields[lit.Val] if err := validateType(a.Name, f.Name, f.Type); err != nil { return err @@ -254,7 +254,7 @@ func (s *Shard) ValidateAggregateFieldsInStatement(measurementName string, stmt if nested.Name != "count" { return fmt.Errorf("aggregate call didn't contain a field %s", a.String()) } - if IsNumeric(nested) { + if influxql.IsNumeric(nested) { f := m.Fields[lit.Val] if err := validateType(a.Name, f.Name, f.Type); err != nil { return err