diff --git a/tsdb/executor.go b/tsdb/executor.go index 44c343ceaa..92d0ea16c4 100644 --- a/tsdb/executor.go +++ b/tsdb/executor.go @@ -343,9 +343,9 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) { // the offsets within the value slices that are returned by the // mapper. aggregates := e.stmt.FunctionCalls() - reduceFuncs := make([]ReduceFunc, len(aggregates)) + reduceFuncs := make([]reduceFunc, len(aggregates)) for i, c := range aggregates { - reduceFunc, err := InitializeReduceFunc(c) + reduceFunc, err := initializeReduceFunc(c) if err != nil { out <- &influxql.Row{Err: err} return diff --git a/tsdb/functions.go b/tsdb/functions.go index 205a0b210c..0ce3d0e670 100644 --- a/tsdb/functions.go +++ b/tsdb/functions.go @@ -4,7 +4,7 @@ package tsdb // Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce // paradigm popularized by Google and Hadoop. // -// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapReduceFuncs function +// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function import ( "encoding/json" @@ -17,25 +17,25 @@ import ( "github.com/influxdb/influxdb/influxql" ) -// Iterator represents a forward-only iterator over a set of points. -// These are used by the MapFunctions in this file -type Iterator interface { +// iterator represents a forward-only iterator over a set of points. +// These are used by the mapFunctions in this file +type iterator interface { Next() (time int64, value interface{}) } -// MapFunc represents a function used for mapping over a sequential series of data. +// mapFunc represents a function used for mapping over a sequential series of data. // The iterator represents a single group by interval -type MapFunc func(Iterator) interface{} +type mapFunc func(iterator) interface{} -// ReduceFunc represents a function used for reducing mapper output. -type ReduceFunc func([]interface{}) interface{} +// reduceFunc represents a function used for reducing mapper output. +type reduceFunc func([]interface{}) interface{} // UnmarshalFunc represents a function that can take bytes from a mapper from remote // server and marshal it into an interface the reducer can use -type UnmarshalFunc func([]byte) (interface{}, error) +type unmarshalFunc func([]byte) (interface{}, error) -// InitializeMapFunc takes an aggregate call from the query and returns the MapFunc -func InitializeMapFunc(c *influxql.Call) (MapFunc, error) { +// initializemapFunc takes an aggregate call from the query and returns the mapFunc +func initializeMapFunc(c *influxql.Call) (mapFunc, error) { // see if it's a query for raw data if c == nil { return MapRawQuery, nil @@ -116,7 +116,7 @@ func InitializeMapFunc(c *influxql.Call) (MapFunc, error) { // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate if fn, ok := c.Args[0].(*influxql.Call); ok { - return InitializeMapFunc(fn) + return initializeMapFunc(fn) } return MapRawQuery, nil default: @@ -124,8 +124,8 @@ func InitializeMapFunc(c *influxql.Call) (MapFunc, error) { } } -// InitializeReduceFunc takes an aggregate call from the query and returns the ReduceFunc -func InitializeReduceFunc(c *influxql.Call) (ReduceFunc, error) { +// InitializereduceFunc takes an aggregate call from the query and returns the reduceFunc +func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) { // Retrieve reduce function by name. switch c.Name { case "count": @@ -172,7 +172,7 @@ func InitializeReduceFunc(c *influxql.Call) (ReduceFunc, error) { // If the arg is another aggregate e.g. derivative(mean(value)), then // use the map func for that nested aggregate if fn, ok := c.Args[0].(*influxql.Call); ok { - return InitializeReduceFunc(fn) + return initializeReduceFunc(fn) } return nil, fmt.Errorf("expected function argument to %s", c.Name) default: @@ -180,7 +180,7 @@ func InitializeReduceFunc(c *influxql.Call) (ReduceFunc, error) { } } -func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) { +func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) { // if c is nil it's a raw data query if c == nil { return func(b []byte) (interface{}, error) { @@ -244,7 +244,7 @@ func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) { } // MapCount computes the number of values in an iterator. -func MapCount(itr Iterator) interface{} { +func MapCount(itr iterator) interface{} { n := float64(0) for k, _ := itr.Next(); k != -1; k, _ = itr.Next() { n++ @@ -329,7 +329,7 @@ func (d distinctValues) Less(i, j int) bool { } // MapDistinct computes the unique values in an iterator. -func MapDistinct(itr Iterator) interface{} { +func MapDistinct(itr iterator) interface{} { var index = make(map[interface{}]struct{}) for time, value := itr.Next(); time != -1; time, value = itr.Next() { @@ -383,7 +383,7 @@ func ReduceDistinct(values []interface{}) interface{} { } // MapCountDistinct computes the unique count of values in an iterator. -func MapCountDistinct(itr Iterator) interface{} { +func MapCountDistinct(itr iterator) interface{} { var index = make(map[interface{}]struct{}) for time, value := itr.Next(); time != -1; time, value = itr.Next() { @@ -427,7 +427,7 @@ const ( ) // MapSum computes the summation of values in an iterator. -func MapSum(itr Iterator) interface{} { +func MapSum(itr iterator) interface{} { n := float64(0) count := 0 var resultType NumberType @@ -482,7 +482,7 @@ func ReduceSum(values []interface{}) interface{} { } // MapMean computes the count and sum of values in an iterator to be combined by the reducer. -func MapMean(itr Iterator) interface{} { +func MapMean(itr iterator) interface{} { out := &meanMapOutput{} for k, v := itr.Next(); k != -1; k, v = itr.Next() { @@ -688,7 +688,7 @@ type minMaxMapOut struct { } // MapMin collects the values to pass to the reducer -func MapMin(itr Iterator) interface{} { +func MapMin(itr iterator) interface{} { min := &minMaxMapOut{} pointsYielded := false @@ -751,7 +751,7 @@ func ReduceMin(values []interface{}) interface{} { } // MapMax collects the values to pass to the reducer -func MapMax(itr Iterator) interface{} { +func MapMax(itr iterator) interface{} { max := &minMaxMapOut{} pointsYielded := false @@ -819,7 +819,7 @@ type spreadMapOutput struct { } // MapSpread collects the values to pass to the reducer -func MapSpread(itr Iterator) interface{} { +func MapSpread(itr iterator) interface{} { out := &spreadMapOutput{} pointsYielded := false var val float64 @@ -880,7 +880,7 @@ func ReduceSpread(values []interface{}) interface{} { } // MapStddev collects the values to pass to the reducer -func MapStddev(itr Iterator) interface{} { +func MapStddev(itr iterator) interface{} { var values []float64 for k, v := itr.Next(); k != -1; k, v = itr.Next() { @@ -938,7 +938,7 @@ type firstLastMapOutput struct { // MapFirst collects the values to pass to the reducer // This function assumes time ordered input -func MapFirst(itr Iterator) interface{} { +func MapFirst(itr iterator) interface{} { k, v := itr.Next() if k == -1 { return nil @@ -983,7 +983,7 @@ func ReduceFirst(values []interface{}) interface{} { } // MapLast collects the values to pass to the reducer -func MapLast(itr Iterator) interface{} { +func MapLast(itr iterator) interface{} { out := &firstLastMapOutput{} pointsYielded := false @@ -1038,7 +1038,7 @@ func ReduceLast(values []interface{}) interface{} { } // MapEcho emits the data points for each group by interval -func MapEcho(itr Iterator) interface{} { +func MapEcho(itr iterator) interface{} { var values []interface{} for k, v := itr.Next(); k != -1; k, v = itr.Next() { @@ -1048,7 +1048,7 @@ func MapEcho(itr Iterator) interface{} { } // ReducePercentile computes the percentile of values for each key. -func ReducePercentile(percentile float64) ReduceFunc { +func ReducePercentile(percentile float64) reduceFunc { return func(values []interface{}) interface{} { var allValues []float64 @@ -1091,7 +1091,7 @@ func IsNumeric(c *influxql.Call) bool { } // MapRawQuery is for queries without aggregates -func MapRawQuery(itr Iterator) interface{} { +func MapRawQuery(itr iterator) interface{} { var values []*rawQueryMapOutput for k, v := itr.Next(); k != -1; k, v = itr.Next() { val := &rawQueryMapOutput{k, v} diff --git a/tsdb/functions_test.go b/tsdb/functions_test.go index 7b23b60254..ea35834dc1 100644 --- a/tsdb/functions_test.go +++ b/tsdb/functions_test.go @@ -78,7 +78,7 @@ func TestInitializeMapFuncPercentile(t *testing.T) { Name: "percentile", Args: []influxql.Expr{}, } - _, err := InitializeMapFunc(c) + _, err := initializeMapFunc(c) if err == nil { t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) } @@ -95,7 +95,7 @@ func TestInitializeMapFuncPercentile(t *testing.T) { }, } - _, err = InitializeMapFunc(c) + _, err = initializeMapFunc(c) if err == nil { t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) } @@ -114,7 +114,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) { Args: []influxql.Expr{}, } - _, err := InitializeMapFunc(c) + _, err := initializeMapFunc(c) if err == nil { t.Errorf("InitializeMapFunc(%v) expected error. got nil", c) } @@ -128,7 +128,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) { }, } - _, err = InitializeMapFunc(c) + _, err = initializeMapFunc(c) if err != nil { t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) } @@ -142,7 +142,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) { }, } - _, err = InitializeMapFunc(c) + _, err = initializeMapFunc(c) if err != nil { t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err) } @@ -155,7 +155,7 @@ func TestInitializeReduceFuncPercentile(t *testing.T) { Name: "percentile", Args: []influxql.Expr{}, } - _, err := InitializeReduceFunc(c) + _, err := initializeReduceFunc(c) if err == nil { t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) } @@ -172,7 +172,7 @@ func TestInitializeReduceFuncPercentile(t *testing.T) { }, } - _, err = InitializeReduceFunc(c) + _, err = initializeReduceFunc(c) if err == nil { t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c) } diff --git a/tsdb/mapper.go b/tsdb/mapper.go index e4e75a4a3e..4e741b98c0 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -63,7 +63,7 @@ type SelectMapper struct { intervalSize int64 // Size of each interval. numIntervals int // Maximum number of intervals to return. currInterval int // Current interval for which data is being fetched. - mapFuncs []MapFunc // The mapping functions. + mapFuncs []mapFunc // The mapping functions. fieldNames []string // the field name being read for mapping. } @@ -445,10 +445,10 @@ func (lm *SelectMapper) initializeMapFunctions() error { var err error // Set up each mapping function for this statement. aggregates := lm.selectStmt.FunctionCalls() - lm.mapFuncs = make([]MapFunc, len(aggregates)) + lm.mapFuncs = make([]mapFunc, len(aggregates)) lm.fieldNames = make([]string, len(lm.mapFuncs)) for i, c := range aggregates { - lm.mapFuncs[i], err = InitializeMapFunc(c) + lm.mapFuncs[i], err = initializeMapFunc(c) if err != nil { return err }