diff --git a/tsdb/mapper.go b/tsdb/mapper.go index e6d78673b5..85bb07977a 100644 --- a/tsdb/mapper.go +++ b/tsdb/mapper.go @@ -89,32 +89,8 @@ func (lm *LocalMapper) Open() error { lm.queryTMin, lm.queryTMax = influxql.TimeRangeAsEpochNano(lm.stmt.Condition) if !lm.rawMode { - // Set up each mapping function for this statement. - aggregates := lm.stmt.FunctionCalls() - lm.mapFuncs = make([]influxql.MapFunc, len(aggregates)) - lm.fieldNames = make([]string, len(lm.mapFuncs)) - for i, c := range aggregates { - lm.mapFuncs[i], err = influxql.InitializeMapFunc(c) - if err != nil { - return err - } - - // Check for calls like `derivative(lmean(value), 1d)` - var nested *influxql.Call = c - if fn, ok := c.Args[0].(*influxql.Call); ok { - nested = fn - } - switch lit := nested.Args[0].(type) { - case *influxql.VarRef: - lm.fieldNames[i] = lit.Val - case *influxql.Distinct: - if c.Name != "count" { - return fmt.Errorf("aggregate call didn't contain a field %s", c.String()) - } - lm.fieldNames[i] = lit.Val - default: - return fmt.Errorf("aggregate call didn't contain a field %s", c.String()) - } + if err := lm.initializeMapFunctions(); err != nil { + return err } // For GROUP BY time queries, limit the number of data points returned by the limit and offset @@ -360,6 +336,41 @@ func (lm *LocalMapper) nextInterval() (start, end int64) { return } +// initializeMapFunctions initialize the mapping functions for the mapper. This only applies +// to aggregate queries. +func (lm *LocalMapper) initializeMapFunctions() error { + var err error + // Set up each mapping function for this statement. + aggregates := lm.stmt.FunctionCalls() + lm.mapFuncs = make([]influxql.MapFunc, len(aggregates)) + lm.fieldNames = make([]string, len(lm.mapFuncs)) + for i, c := range aggregates { + lm.mapFuncs[i], err = influxql.InitializeMapFunc(c) + if err != nil { + return err + } + + // Check for calls like `derivative(lmean(value), 1d)` + var nested *influxql.Call = c + if fn, ok := c.Args[0].(*influxql.Call); ok { + nested = fn + } + switch lit := nested.Args[0].(type) { + case *influxql.VarRef: + lm.fieldNames[i] = lit.Val + case *influxql.Distinct: + if c.Name != "count" { + return fmt.Errorf("aggregate call didn't contain a field %s", c.String()) + } + lm.fieldNames[i] = lit.Val + default: + return fmt.Errorf("aggregate call didn't contain a field %s", c.String()) + } + } + + return nil +} + // TagSets returns the list of TagSets for which this mapper has data. func (lm *LocalMapper) TagSets() []string { return tagSetCursors(lm.cursors).Keys()