Factor out aggregate-only code in LocalMapper Open

pull/3389/head
Philip O'Toole 2015-07-18 22:25:52 -07:00
parent 6b5b652f39
commit 4569f3d2ea
1 changed files with 37 additions and 26 deletions

View File

@ -89,32 +89,8 @@ func (lm *LocalMapper) Open() error {
lm.queryTMin, lm.queryTMax = influxql.TimeRangeAsEpochNano(lm.stmt.Condition)
if !lm.rawMode {
// Set up each mapping function for this statement.
aggregates := lm.stmt.FunctionCalls()
lm.mapFuncs = make([]influxql.MapFunc, len(aggregates))
lm.fieldNames = make([]string, len(lm.mapFuncs))
for i, c := range aggregates {
lm.mapFuncs[i], err = influxql.InitializeMapFunc(c)
if err != nil {
return err
}
// Check for calls like `derivative(lmean(value), 1d)`
var nested *influxql.Call = c
if fn, ok := c.Args[0].(*influxql.Call); ok {
nested = fn
}
switch lit := nested.Args[0].(type) {
case *influxql.VarRef:
lm.fieldNames[i] = lit.Val
case *influxql.Distinct:
if c.Name != "count" {
return fmt.Errorf("aggregate call didn't contain a field %s", c.String())
}
lm.fieldNames[i] = lit.Val
default:
return fmt.Errorf("aggregate call didn't contain a field %s", c.String())
}
if err := lm.initializeMapFunctions(); err != nil {
return err
}
// For GROUP BY time queries, limit the number of data points returned by the limit and offset
@ -360,6 +336,41 @@ func (lm *LocalMapper) nextInterval() (start, end int64) {
return
}
// initializeMapFunctions initialize the mapping functions for the mapper. This only applies
// to aggregate queries.
func (lm *LocalMapper) initializeMapFunctions() error {
var err error
// Set up each mapping function for this statement.
aggregates := lm.stmt.FunctionCalls()
lm.mapFuncs = make([]influxql.MapFunc, len(aggregates))
lm.fieldNames = make([]string, len(lm.mapFuncs))
for i, c := range aggregates {
lm.mapFuncs[i], err = influxql.InitializeMapFunc(c)
if err != nil {
return err
}
// Check for calls like `derivative(lmean(value), 1d)`
var nested *influxql.Call = c
if fn, ok := c.Args[0].(*influxql.Call); ok {
nested = fn
}
switch lit := nested.Args[0].(type) {
case *influxql.VarRef:
lm.fieldNames[i] = lit.Val
case *influxql.Distinct:
if c.Name != "count" {
return fmt.Errorf("aggregate call didn't contain a field %s", c.String())
}
lm.fieldNames[i] = lit.Val
default:
return fmt.Errorf("aggregate call didn't contain a field %s", c.String())
}
}
return nil
}
// TagSets returns the list of TagSets for which this mapper has data.
func (lm *LocalMapper) TagSets() []string {
return tagSetCursors(lm.cursors).Keys()