unexport some functions
For good measure basically. Not needed for correctness, but it keeps people from using these.pull/3951/head
parent
65340a023b
commit
34744b647d
|
@ -343,9 +343,9 @@ func (e *SelectExecutor) executeAggregate(out chan *influxql.Row) {
|
|||
// the offsets within the value slices that are returned by the
|
||||
// mapper.
|
||||
aggregates := e.stmt.FunctionCalls()
|
||||
reduceFuncs := make([]ReduceFunc, len(aggregates))
|
||||
reduceFuncs := make([]reduceFunc, len(aggregates))
|
||||
for i, c := range aggregates {
|
||||
reduceFunc, err := InitializeReduceFunc(c)
|
||||
reduceFunc, err := initializeReduceFunc(c)
|
||||
if err != nil {
|
||||
out <- &influxql.Row{Err: err}
|
||||
return
|
||||
|
|
|
@ -4,7 +4,7 @@ package tsdb
|
|||
// Query functions are represented as two discreet functions: Map and Reduce. These roughly follow the MapReduce
|
||||
// paradigm popularized by Google and Hadoop.
|
||||
//
|
||||
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapReduceFuncs function
|
||||
// When adding an aggregate function, define a mapper, a reducer, and add them in the switch statement in the MapreduceFuncs function
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
@ -17,25 +17,25 @@ import (
|
|||
"github.com/influxdb/influxdb/influxql"
|
||||
)
|
||||
|
||||
// Iterator represents a forward-only iterator over a set of points.
|
||||
// These are used by the MapFunctions in this file
|
||||
type Iterator interface {
|
||||
// iterator represents a forward-only iterator over a set of points.
|
||||
// These are used by the mapFunctions in this file
|
||||
type iterator interface {
|
||||
Next() (time int64, value interface{})
|
||||
}
|
||||
|
||||
// MapFunc represents a function used for mapping over a sequential series of data.
|
||||
// mapFunc represents a function used for mapping over a sequential series of data.
|
||||
// The iterator represents a single group by interval
|
||||
type MapFunc func(Iterator) interface{}
|
||||
type mapFunc func(iterator) interface{}
|
||||
|
||||
// ReduceFunc represents a function used for reducing mapper output.
|
||||
type ReduceFunc func([]interface{}) interface{}
|
||||
// reduceFunc represents a function used for reducing mapper output.
|
||||
type reduceFunc func([]interface{}) interface{}
|
||||
|
||||
// UnmarshalFunc represents a function that can take bytes from a mapper from remote
|
||||
// server and marshal it into an interface the reducer can use
|
||||
type UnmarshalFunc func([]byte) (interface{}, error)
|
||||
type unmarshalFunc func([]byte) (interface{}, error)
|
||||
|
||||
// InitializeMapFunc takes an aggregate call from the query and returns the MapFunc
|
||||
func InitializeMapFunc(c *influxql.Call) (MapFunc, error) {
|
||||
// initializemapFunc takes an aggregate call from the query and returns the mapFunc
|
||||
func initializeMapFunc(c *influxql.Call) (mapFunc, error) {
|
||||
// see if it's a query for raw data
|
||||
if c == nil {
|
||||
return MapRawQuery, nil
|
||||
|
@ -116,7 +116,7 @@ func InitializeMapFunc(c *influxql.Call) (MapFunc, error) {
|
|||
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
||||
// use the map func for that nested aggregate
|
||||
if fn, ok := c.Args[0].(*influxql.Call); ok {
|
||||
return InitializeMapFunc(fn)
|
||||
return initializeMapFunc(fn)
|
||||
}
|
||||
return MapRawQuery, nil
|
||||
default:
|
||||
|
@ -124,8 +124,8 @@ func InitializeMapFunc(c *influxql.Call) (MapFunc, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// InitializeReduceFunc takes an aggregate call from the query and returns the ReduceFunc
|
||||
func InitializeReduceFunc(c *influxql.Call) (ReduceFunc, error) {
|
||||
// InitializereduceFunc takes an aggregate call from the query and returns the reduceFunc
|
||||
func initializeReduceFunc(c *influxql.Call) (reduceFunc, error) {
|
||||
// Retrieve reduce function by name.
|
||||
switch c.Name {
|
||||
case "count":
|
||||
|
@ -172,7 +172,7 @@ func InitializeReduceFunc(c *influxql.Call) (ReduceFunc, error) {
|
|||
// If the arg is another aggregate e.g. derivative(mean(value)), then
|
||||
// use the map func for that nested aggregate
|
||||
if fn, ok := c.Args[0].(*influxql.Call); ok {
|
||||
return InitializeReduceFunc(fn)
|
||||
return initializeReduceFunc(fn)
|
||||
}
|
||||
return nil, fmt.Errorf("expected function argument to %s", c.Name)
|
||||
default:
|
||||
|
@ -180,7 +180,7 @@ func InitializeReduceFunc(c *influxql.Call) (ReduceFunc, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) {
|
||||
func initializeUnmarshaller(c *influxql.Call) (unmarshalFunc, error) {
|
||||
// if c is nil it's a raw data query
|
||||
if c == nil {
|
||||
return func(b []byte) (interface{}, error) {
|
||||
|
@ -244,7 +244,7 @@ func InitializeUnmarshaller(c *influxql.Call) (UnmarshalFunc, error) {
|
|||
}
|
||||
|
||||
// MapCount computes the number of values in an iterator.
|
||||
func MapCount(itr Iterator) interface{} {
|
||||
func MapCount(itr iterator) interface{} {
|
||||
n := float64(0)
|
||||
for k, _ := itr.Next(); k != -1; k, _ = itr.Next() {
|
||||
n++
|
||||
|
@ -329,7 +329,7 @@ func (d distinctValues) Less(i, j int) bool {
|
|||
}
|
||||
|
||||
// MapDistinct computes the unique values in an iterator.
|
||||
func MapDistinct(itr Iterator) interface{} {
|
||||
func MapDistinct(itr iterator) interface{} {
|
||||
var index = make(map[interface{}]struct{})
|
||||
|
||||
for time, value := itr.Next(); time != -1; time, value = itr.Next() {
|
||||
|
@ -383,7 +383,7 @@ func ReduceDistinct(values []interface{}) interface{} {
|
|||
}
|
||||
|
||||
// MapCountDistinct computes the unique count of values in an iterator.
|
||||
func MapCountDistinct(itr Iterator) interface{} {
|
||||
func MapCountDistinct(itr iterator) interface{} {
|
||||
var index = make(map[interface{}]struct{})
|
||||
|
||||
for time, value := itr.Next(); time != -1; time, value = itr.Next() {
|
||||
|
@ -427,7 +427,7 @@ const (
|
|||
)
|
||||
|
||||
// MapSum computes the summation of values in an iterator.
|
||||
func MapSum(itr Iterator) interface{} {
|
||||
func MapSum(itr iterator) interface{} {
|
||||
n := float64(0)
|
||||
count := 0
|
||||
var resultType NumberType
|
||||
|
@ -482,7 +482,7 @@ func ReduceSum(values []interface{}) interface{} {
|
|||
}
|
||||
|
||||
// MapMean computes the count and sum of values in an iterator to be combined by the reducer.
|
||||
func MapMean(itr Iterator) interface{} {
|
||||
func MapMean(itr iterator) interface{} {
|
||||
out := &meanMapOutput{}
|
||||
|
||||
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
||||
|
@ -688,7 +688,7 @@ type minMaxMapOut struct {
|
|||
}
|
||||
|
||||
// MapMin collects the values to pass to the reducer
|
||||
func MapMin(itr Iterator) interface{} {
|
||||
func MapMin(itr iterator) interface{} {
|
||||
min := &minMaxMapOut{}
|
||||
|
||||
pointsYielded := false
|
||||
|
@ -751,7 +751,7 @@ func ReduceMin(values []interface{}) interface{} {
|
|||
}
|
||||
|
||||
// MapMax collects the values to pass to the reducer
|
||||
func MapMax(itr Iterator) interface{} {
|
||||
func MapMax(itr iterator) interface{} {
|
||||
max := &minMaxMapOut{}
|
||||
|
||||
pointsYielded := false
|
||||
|
@ -819,7 +819,7 @@ type spreadMapOutput struct {
|
|||
}
|
||||
|
||||
// MapSpread collects the values to pass to the reducer
|
||||
func MapSpread(itr Iterator) interface{} {
|
||||
func MapSpread(itr iterator) interface{} {
|
||||
out := &spreadMapOutput{}
|
||||
pointsYielded := false
|
||||
var val float64
|
||||
|
@ -880,7 +880,7 @@ func ReduceSpread(values []interface{}) interface{} {
|
|||
}
|
||||
|
||||
// MapStddev collects the values to pass to the reducer
|
||||
func MapStddev(itr Iterator) interface{} {
|
||||
func MapStddev(itr iterator) interface{} {
|
||||
var values []float64
|
||||
|
||||
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
||||
|
@ -938,7 +938,7 @@ type firstLastMapOutput struct {
|
|||
|
||||
// MapFirst collects the values to pass to the reducer
|
||||
// This function assumes time ordered input
|
||||
func MapFirst(itr Iterator) interface{} {
|
||||
func MapFirst(itr iterator) interface{} {
|
||||
k, v := itr.Next()
|
||||
if k == -1 {
|
||||
return nil
|
||||
|
@ -983,7 +983,7 @@ func ReduceFirst(values []interface{}) interface{} {
|
|||
}
|
||||
|
||||
// MapLast collects the values to pass to the reducer
|
||||
func MapLast(itr Iterator) interface{} {
|
||||
func MapLast(itr iterator) interface{} {
|
||||
out := &firstLastMapOutput{}
|
||||
pointsYielded := false
|
||||
|
||||
|
@ -1038,7 +1038,7 @@ func ReduceLast(values []interface{}) interface{} {
|
|||
}
|
||||
|
||||
// MapEcho emits the data points for each group by interval
|
||||
func MapEcho(itr Iterator) interface{} {
|
||||
func MapEcho(itr iterator) interface{} {
|
||||
var values []interface{}
|
||||
|
||||
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
||||
|
@ -1048,7 +1048,7 @@ func MapEcho(itr Iterator) interface{} {
|
|||
}
|
||||
|
||||
// ReducePercentile computes the percentile of values for each key.
|
||||
func ReducePercentile(percentile float64) ReduceFunc {
|
||||
func ReducePercentile(percentile float64) reduceFunc {
|
||||
return func(values []interface{}) interface{} {
|
||||
var allValues []float64
|
||||
|
||||
|
@ -1091,7 +1091,7 @@ func IsNumeric(c *influxql.Call) bool {
|
|||
}
|
||||
|
||||
// MapRawQuery is for queries without aggregates
|
||||
func MapRawQuery(itr Iterator) interface{} {
|
||||
func MapRawQuery(itr iterator) interface{} {
|
||||
var values []*rawQueryMapOutput
|
||||
for k, v := itr.Next(); k != -1; k, v = itr.Next() {
|
||||
val := &rawQueryMapOutput{k, v}
|
||||
|
|
|
@ -78,7 +78,7 @@ func TestInitializeMapFuncPercentile(t *testing.T) {
|
|||
Name: "percentile",
|
||||
Args: []influxql.Expr{},
|
||||
}
|
||||
_, err := InitializeMapFunc(c)
|
||||
_, err := initializeMapFunc(c)
|
||||
if err == nil {
|
||||
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
|
||||
}
|
||||
|
@ -95,7 +95,7 @@ func TestInitializeMapFuncPercentile(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
_, err = InitializeMapFunc(c)
|
||||
_, err = initializeMapFunc(c)
|
||||
if err == nil {
|
||||
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
|
||||
}
|
||||
|
@ -114,7 +114,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
|||
Args: []influxql.Expr{},
|
||||
}
|
||||
|
||||
_, err := InitializeMapFunc(c)
|
||||
_, err := initializeMapFunc(c)
|
||||
if err == nil {
|
||||
t.Errorf("InitializeMapFunc(%v) expected error. got nil", c)
|
||||
}
|
||||
|
@ -128,7 +128,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
_, err = InitializeMapFunc(c)
|
||||
_, err = initializeMapFunc(c)
|
||||
if err != nil {
|
||||
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
|
||||
}
|
||||
|
@ -142,7 +142,7 @@ func TestInitializeMapFuncDerivative(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
_, err = InitializeMapFunc(c)
|
||||
_, err = initializeMapFunc(c)
|
||||
if err != nil {
|
||||
t.Errorf("InitializeMapFunc(%v) unexpected error. got %v", c, err)
|
||||
}
|
||||
|
@ -155,7 +155,7 @@ func TestInitializeReduceFuncPercentile(t *testing.T) {
|
|||
Name: "percentile",
|
||||
Args: []influxql.Expr{},
|
||||
}
|
||||
_, err := InitializeReduceFunc(c)
|
||||
_, err := initializeReduceFunc(c)
|
||||
if err == nil {
|
||||
t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c)
|
||||
}
|
||||
|
@ -172,7 +172,7 @@ func TestInitializeReduceFuncPercentile(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
_, err = InitializeReduceFunc(c)
|
||||
_, err = initializeReduceFunc(c)
|
||||
if err == nil {
|
||||
t.Errorf("InitializedReduceFunc(%v) expected error. got nil", c)
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ type SelectMapper struct {
|
|||
intervalSize int64 // Size of each interval.
|
||||
numIntervals int // Maximum number of intervals to return.
|
||||
currInterval int // Current interval for which data is being fetched.
|
||||
mapFuncs []MapFunc // The mapping functions.
|
||||
mapFuncs []mapFunc // The mapping functions.
|
||||
fieldNames []string // the field name being read for mapping.
|
||||
}
|
||||
|
||||
|
@ -445,10 +445,10 @@ func (lm *SelectMapper) initializeMapFunctions() error {
|
|||
var err error
|
||||
// Set up each mapping function for this statement.
|
||||
aggregates := lm.selectStmt.FunctionCalls()
|
||||
lm.mapFuncs = make([]MapFunc, len(aggregates))
|
||||
lm.mapFuncs = make([]mapFunc, len(aggregates))
|
||||
lm.fieldNames = make([]string, len(lm.mapFuncs))
|
||||
for i, c := range aggregates {
|
||||
lm.mapFuncs[i], err = InitializeMapFunc(c)
|
||||
lm.mapFuncs[i], err = initializeMapFunc(c)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue